From 845517fdeabd9b6d4b7e38b8406c045ddb4f511b Mon Sep 17 00:00:00 2001 From: Tor Arvid Lund Date: Mon, 22 Apr 2024 20:22:47 +0200 Subject: [PATCH 1/7] Remove async from functions that don't await The functions from_tail_response and from_head_response were marked async, but did no calls to .await. --- src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 786f454..960a429 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,7 +72,7 @@ pub use error::AsyncHttpRangeReaderError; /// if response.status() == reqwest::StatusCode::NOT_MODIFIED { /// Ok(None) /// } else { -/// let reader = AsyncHttpRangeReader::from_head_response(client, response, HeaderMap::default()).await?; +/// let reader = AsyncHttpRangeReader::from_head_response(client, response, HeaderMap::default())?; /// Ok(Some(reader)) /// } /// } @@ -156,7 +156,7 @@ impl AsyncHttpRangeReader { ) .await?; let response_headers = response.headers().clone(); - let self_ = Self::from_tail_response(client, response, extra_headers).await?; + let self_ = Self::from_tail_response(client, response, extra_headers)?; Ok((self_, response_headers)) } CheckSupportMethod::Head => { @@ -164,7 +164,7 @@ impl AsyncHttpRangeReader { Self::initial_head_request(client.clone(), url.clone(), HeaderMap::default()) .await?; let response_headers = response.headers().clone(); - let self_ = Self::from_head_response(client, response, extra_headers).await?; + let self_ = Self::from_head_response(client, response, extra_headers)?; Ok((self_, response_headers)) } } @@ -197,7 +197,7 @@ impl AsyncHttpRangeReader { /// Initialize the reader from [`AsyncHttpRangeReader::initial_tail_request`] (or a user /// provided response that also has a range of bytes from the end as body) - pub async fn from_tail_response( + pub fn from_tail_response( client: impl Into, tail_request_response: Response, extra_headers: HeaderMap, @@ -297,7 +297,7 @@ impl AsyncHttpRangeReader { /// Initialize the reader from [`AsyncHttpRangeReader::initial_head_request`] (or a user /// provided response the) - pub async fn from_head_response( + pub fn from_head_response( client: impl Into, head_response: Response, extra_headers: HeaderMap, From c2c960f5de16d56f410985c2b04f9c3ad2e43bc4 Mon Sep 17 00:00:00 2001 From: Tor Arvid Lund Date: Mon, 22 Apr 2024 22:41:51 +0200 Subject: [PATCH 2/7] Make a builder --- src/error.rs | 16 +++++++ src/lib.rs | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) diff --git a/src/error.rs b/src/error.rs index 0609817..8224785 100644 --- a/src/error.rs +++ b/src/error.rs @@ -49,3 +49,19 @@ impl From for AsyncHttpRangeReaderError { AsyncHttpRangeReaderError::TransportError(Arc::new(err.into())) } } + +/// Error type used for [`crate::AsyncHttpRangeReaderBuilder`] +#[derive(Clone, Debug, thiserror::Error)] +pub enum AsyncHttpRangeReaderBuilderError { + /// Required field 'client' is missing + #[error("required field 'client' is missing")] + MissingClient, + + /// Required field 'url' is missing + #[error("required field 'url' is missing")] + MissingUrl, + + /// Memory mapping the file failed + #[error("memory mapping the file failed")] + MemoryMapError(#[source] Arc), +} diff --git a/src/lib.rs b/src/lib.rs index 960a429..087e375 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ mod error; mod sparse_range; +use error::AsyncHttpRangeReaderBuilderError; use futures::{FutureExt, Stream, StreamExt}; use http_content_range::{ContentRange, ContentRangeBytes}; use memmap2::MmapMut; @@ -170,6 +171,11 @@ impl AsyncHttpRangeReader { } } + // Make a builder for AsyncHttpRangeReader + pub fn builder() -> AsyncHttpRangeReaderBuilder { + AsyncHttpRangeReaderBuilder::default() + } + /// Send an initial range request to determine if the remote accepts range /// requests. This will return a number of bytes from the end of the stream. Use the /// `initial_chunk_size` parameter to define how many bytes should be requested from the end. @@ -406,6 +412,105 @@ impl AsyncHttpRangeReader { } } +#[derive(Default)] +pub struct AsyncHttpRangeReaderBuilder { + client: Option, + url: Option, + extra_headers: HeaderMap, + requested_range: SparseRange, + streamer_state: StreamerState, + initial_tail_response: Option<(Response, u64)>, + content_length: usize, +} + +impl AsyncHttpRangeReaderBuilder { + pub fn client(mut self, client: reqwest_middleware::ClientWithMiddleware) -> Self { + self.client = Some(client); + self + } + + pub fn url(mut self, url: Url) -> Self { + self.url = Some(url); + self + } + + pub fn extra_headers(mut self, extra_headers: HeaderMap) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn requested_range(mut self, requested_range: SparseRange) -> Self { + self.requested_range = requested_range; + self + } + + fn streamer_state(mut self, streamer_state: StreamerState) -> Self { + self.streamer_state = streamer_state; + self + } + + pub fn initial_tail_response(mut self, initial_tail_response: Option<(Response, u64)>) -> Self { + self.initial_tail_response = initial_tail_response; + self + } + + pub fn content_length(mut self, content_length: usize) -> Self { + self.content_length = content_length; + self + } + + pub fn build(self) -> Result { + let Some(client) = self.client else { + return Err(AsyncHttpRangeReaderBuilderError::MissingClient); + }; + let Some(url) = self.url else { + return Err(AsyncHttpRangeReaderBuilderError::MissingUrl); + }; + + let memory_map = memmap2::MmapOptions::new() + .len(self.content_length) + .map_anon() + .map_err(Arc::new) + .map_err(AsyncHttpRangeReaderBuilderError::MemoryMapError)?; + + // SAFETY: Get a read-only slice to the memory. This is safe because the memory map is never + // reallocated and we keep track of the initialized part. + let memory_map_slice = + unsafe { std::slice::from_raw_parts(memory_map.as_ptr(), memory_map.len()) }; + + // adding more than 2 entries to the channel would block the sender. I assumed two would + // suffice because I would want to 1) prefetch a certain range and 2) read stuff via the + // AsyncRead implementation. Any extra would simply have to wait for one of these to + // succeed. I eventually used 10 because who cares. + let (request_tx, request_rx) = tokio::sync::mpsc::channel(10); + let (state_tx, state_rx) = watch::channel(StreamerState::default()); + + tokio::spawn(run_streamer( + client, + url.clone(), + self.extra_headers, + self.initial_tail_response, + memory_map, + state_tx, + request_rx, + )); + + let reader = AsyncHttpRangeReader { + len: memory_map_slice.len() as u64, + inner: Mutex::new(Inner { + data: memory_map_slice, + pos: 0, + requested_range: self.requested_range, + streamer_state: self.streamer_state, + streamer_state_rx: WatchStream::new(state_rx), + request_tx, + poll_request_tx: None, + }), + }; + Ok(reader) + } +} + /// A task that will download parts from the remote archive and "send" them to the frontend as they /// become available. #[tracing::instrument(name = "fetch_ranges", skip_all, fields(url))] @@ -840,4 +945,18 @@ mod test { err, AsyncHttpRangeReaderError::HttpError(err) if err.status() == Some(StatusCode::NOT_FOUND) ); } + + #[tokio::test] + async fn test_builder_happy_path() { + let path = Path::new(&std::env::var("CARGO_MANIFEST_DIR").unwrap()).join("test-data"); + let server = StaticDirectoryServer::new(&path) + .await + .expect("could not initialize server"); + + AsyncHttpRangeReader::builder() + .client(Client::new().into()) + .url(server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap()) + .build() + .expect("could not build reader"); + } } From eebfe56efe38218d1e5627949c5926ff36b9aad4 Mon Sep 17 00:00:00 2001 From: Tor Arvid Lund Date: Mon, 22 Apr 2024 23:01:55 +0200 Subject: [PATCH 3/7] Use the builder in from_[tail|head]_response --- src/error.rs | 10 +++++ src/lib.rs | 106 +++++++++------------------------------------------ 2 files changed, 27 insertions(+), 89 deletions(-) diff --git a/src/error.rs b/src/error.rs index 8224785..e80849b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -30,6 +30,10 @@ pub enum AsyncHttpRangeReaderError { /// Memory mapping the file failed #[error("memory mapping the file failed")] MemoryMapError(#[source] Arc), + + /// Error building the reader + #[error("error building the reader: {0}")] + BuilderError(#[source] Arc), } impl From for AsyncHttpRangeReaderError { @@ -50,6 +54,12 @@ impl From for AsyncHttpRangeReaderError { } } +impl From for AsyncHttpRangeReaderError { + fn from(err: AsyncHttpRangeReaderBuilderError) -> Self { + AsyncHttpRangeReaderError::BuilderError(Arc::new(err)) + } +} + /// Error type used for [`crate::AsyncHttpRangeReaderBuilder`] #[derive(Clone, Debug, thiserror::Error)] pub enum AsyncHttpRangeReaderBuilderError { diff --git a/src/lib.rs b/src/lib.rs index 087e375..5f5eedd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,8 +208,6 @@ impl AsyncHttpRangeReader { tail_request_response: Response, extra_headers: HeaderMap, ) -> Result { - let client = client.into(); - // Get the size of the file from this initial request let content_range = ContentRange::parse( tail_request_response @@ -228,56 +226,25 @@ impl AsyncHttpRangeReader { _ => return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported), }; - // Allocate a memory map to hold the data - let memory_map = memmap2::MmapOptions::new() - .len(complete_length as usize) - .map_anon() - .map_err(Arc::new) - .map_err(AsyncHttpRangeReaderError::MemoryMapError)?; - - // SAFETY: Get a read-only slice to the memory. This is safe because the memory map is never - // reallocated and we keep track of the initialized part. - let memory_map_slice = - unsafe { std::slice::from_raw_parts(memory_map.as_ptr(), memory_map.len()) }; - let requested_range = SparseRange::from_range(complete_length - (finish - start)..complete_length); - // adding more than 2 entries to the channel would block the sender. I assumed two would - // suffice because I would want to 1) prefetch a certain range and 2) read stuff via the - // AsyncRead implementation. Any extra would simply have to wait for one of these to - // succeed. I eventually used 10 because who cares. - let (request_tx, request_rx) = tokio::sync::mpsc::channel(10); - let (state_tx, state_rx) = watch::channel(StreamerState::default()); - tokio::spawn(run_streamer( - client, - tail_request_response.url().clone(), - extra_headers, - Some((tail_request_response, start)), - memory_map, - state_tx, - request_rx, - )); - // Configure the initial state of the streamer. let mut streamer_state = StreamerState::default(); streamer_state .requested_ranges .push(complete_length - (finish - start)..complete_length); - let reader = Self { - len: memory_map_slice.len() as u64, - inner: Mutex::new(Inner { - data: memory_map_slice, - pos: 0, - requested_range, - streamer_state, - streamer_state_rx: WatchStream::new(state_rx), - request_tx, - poll_request_tx: None, - }), - }; - Ok(reader) + Self::builder() + .client(client.into()) + .url(tail_request_response.url().clone()) + .extra_headers(extra_headers) + .initial_tail_response(Some((tail_request_response, start))) + .content_length(complete_length as usize) + .requested_range(requested_range) + .streamer_state(streamer_state) + .build() + .map_err(AsyncHttpRangeReaderError::from) } /// Send an initial range request to determine if the remote accepts range @@ -329,52 +296,13 @@ impl AsyncHttpRangeReader { .parse() .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?; - // Allocate a memory map to hold the data - let memory_map = memmap2::MmapOptions::new() - .len(content_length as _) - .map_anon() - .map_err(Arc::new) - .map_err(AsyncHttpRangeReaderError::MemoryMapError)?; - - // SAFETY: Get a read-only slice to the memory. This is safe because the memory map is never - // reallocated and we keep track of the initialized part. - let memory_map_slice = - unsafe { std::slice::from_raw_parts(memory_map.as_ptr(), memory_map.len()) }; - - let requested_range = SparseRange::default(); - - // adding more than 2 entries to the channel would block the sender. I assumed two would - // suffice because I would want to 1) prefetch a certain range and 2) read stuff via the - // AsyncRead implementation. Any extra would simply have to wait for one of these to - // succeed. I eventually used 10 because who cares. - let (request_tx, request_rx) = tokio::sync::mpsc::channel(10); - let (state_tx, state_rx) = watch::channel(StreamerState::default()); - tokio::spawn(run_streamer( - client, - head_response.url().clone(), - extra_headers, - None, - memory_map, - state_tx, - request_rx, - )); - - // Configure the initial state of the streamer. - let streamer_state = StreamerState::default(); - - let reader = Self { - len: memory_map_slice.len() as u64, - inner: Mutex::new(Inner { - data: memory_map_slice, - pos: 0, - requested_range, - streamer_state, - streamer_state_rx: WatchStream::new(state_rx), - request_tx, - poll_request_tx: None, - }), - }; - Ok(reader) + Self::builder() + .client(client) + .url(head_response.url().clone()) + .extra_headers(extra_headers) + .content_length(content_length as usize) + .build() + .map_err(AsyncHttpRangeReaderError::from) } /// Returns the ranges that this instance actually performed HTTP requests for. From 8b6ca0849c9caf28845a814669441fcdbdf9c45a Mon Sep 17 00:00:00 2001 From: Tor Arvid Lund Date: Mon, 22 Apr 2024 23:29:37 +0200 Subject: [PATCH 4/7] Simplify from_[head|tail]_response functions more --- src/lib.rs | 123 ++++++++++++++++++++++++++++------------------------- 1 file changed, 65 insertions(+), 58 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5f5eedd..2ff489f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,41 +208,9 @@ impl AsyncHttpRangeReader { tail_request_response: Response, extra_headers: HeaderMap, ) -> Result { - // Get the size of the file from this initial request - let content_range = ContentRange::parse( - tail_request_response - .headers() - .get(reqwest::header::CONTENT_RANGE) - .ok_or(AsyncHttpRangeReaderError::ContentRangeMissing)? - .to_str() - .map_err(|_err| AsyncHttpRangeReaderError::ContentRangeMissing)?, - ); - let (start, finish, complete_length) = match content_range { - ContentRange::Bytes(ContentRangeBytes { - first_byte, - last_byte, - complete_length, - }) => (first_byte, last_byte, complete_length), - _ => return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported), - }; - - let requested_range = - SparseRange::from_range(complete_length - (finish - start)..complete_length); - - // Configure the initial state of the streamer. - let mut streamer_state = StreamerState::default(); - streamer_state - .requested_ranges - .push(complete_length - (finish - start)..complete_length); - - Self::builder() + AsyncHttpRangeReaderBuilder::from_tail_response(tail_request_response)? .client(client.into()) - .url(tail_request_response.url().clone()) .extra_headers(extra_headers) - .initial_tail_response(Some((tail_request_response, start))) - .content_length(complete_length as usize) - .requested_range(requested_range) - .streamer_state(streamer_state) .build() .map_err(AsyncHttpRangeReaderError::from) } @@ -275,32 +243,9 @@ impl AsyncHttpRangeReader { head_response: Response, extra_headers: HeaderMap, ) -> Result { - let client = client.into(); - - // Are range requests supported? - if head_response - .headers() - .get(reqwest::header::ACCEPT_RANGES) - .and_then(|h| h.to_str().ok()) - != Some("bytes") - { - return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported); - } - - let content_length: u64 = head_response - .headers() - .get(reqwest::header::CONTENT_LENGTH) - .ok_or(AsyncHttpRangeReaderError::ContentLengthMissing)? - .to_str() - .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)? - .parse() - .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?; - - Self::builder() - .client(client) - .url(head_response.url().clone()) + AsyncHttpRangeReaderBuilder::from_head_response(head_response)? + .client(client.into()) .extra_headers(extra_headers) - .content_length(content_length as usize) .build() .map_err(AsyncHttpRangeReaderError::from) } @@ -387,6 +332,68 @@ impl AsyncHttpRangeReaderBuilder { self } + pub fn from_head_response(head_response: Response) -> Result { + // Are range requests supported? + if head_response + .headers() + .get(reqwest::header::ACCEPT_RANGES) + .and_then(|h| h.to_str().ok()) + != Some("bytes") + { + return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported); + } + + let content_length: u64 = head_response + .headers() + .get(reqwest::header::CONTENT_LENGTH) + .ok_or(AsyncHttpRangeReaderError::ContentLengthMissing)? + .to_str() + .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)? + .parse() + .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?; + + let builder = Self::default() + .url(head_response.url().clone()) + .content_length(content_length as usize); + + Ok(builder) + } + + pub fn from_tail_response(tail_response: Response) -> Result { + let content_range = ContentRange::parse( + tail_response + .headers() + .get(reqwest::header::CONTENT_RANGE) + .ok_or(AsyncHttpRangeReaderError::ContentRangeMissing)? + .to_str() + .map_err(|_err| AsyncHttpRangeReaderError::ContentRangeMissing)?, + ); + let (start, finish, complete_length) = match content_range { + ContentRange::Bytes(ContentRangeBytes { + first_byte, + last_byte, + complete_length, + }) => (first_byte, last_byte, complete_length), + _ => return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported), + }; + + let requested_range = + SparseRange::from_range(complete_length - (finish - start)..complete_length); + + // Configure the initial state of the streamer. + let mut streamer_state = StreamerState::default(); + streamer_state + .requested_ranges + .push(complete_length - (finish - start)..complete_length); + let builder = Self::default() + .url(tail_response.url().clone()) + .initial_tail_response(Some((tail_response, start))) + .requested_range(requested_range) + .content_length(complete_length as usize) + .streamer_state(streamer_state); + Ok(builder) + } + pub fn build(self) -> Result { let Some(client) = self.client else { return Err(AsyncHttpRangeReaderBuilderError::MissingClient); From 996b83c70922b21fb48b334ae8b29beb8cb4f578 Mon Sep 17 00:00:00 2001 From: Tor Arvid Lund Date: Sat, 27 Apr 2024 11:38:53 +0200 Subject: [PATCH 5/7] Attempt to address code review comments This commit: * Removes derive(Default) from the builder * Adds builder::new that requires a client * Changes a few builder methods from pub to private * Makes builder from_head_response and from_tail_response methods instead of static functions * Adds a check that content_length is non-zero in the build() method --- src/error.rs | 6 ++--- src/lib.rs | 72 ++++++++++++++++++++++++++++++++++------------------ 2 files changed, 51 insertions(+), 27 deletions(-) diff --git a/src/error.rs b/src/error.rs index e80849b..14b60af 100644 --- a/src/error.rs +++ b/src/error.rs @@ -63,9 +63,9 @@ impl From for AsyncHttpRangeReaderError { /// Error type used for [`crate::AsyncHttpRangeReaderBuilder`] #[derive(Clone, Debug, thiserror::Error)] pub enum AsyncHttpRangeReaderBuilderError { - /// Required field 'client' is missing - #[error("required field 'client' is missing")] - MissingClient, + /// Required field 'content_length' is zero + #[error("required field 'content_length' is zero")] + InvalidContentLength, /// Required field 'url' is missing #[error("required field 'url' is missing")] diff --git a/src/lib.rs b/src/lib.rs index 2ff489f..60c4c8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,8 +172,10 @@ impl AsyncHttpRangeReader { } // Make a builder for AsyncHttpRangeReader - pub fn builder() -> AsyncHttpRangeReaderBuilder { - AsyncHttpRangeReaderBuilder::default() + pub fn builder( + client: reqwest_middleware::ClientWithMiddleware, + ) -> AsyncHttpRangeReaderBuilder { + AsyncHttpRangeReaderBuilder::new(client) } /// Send an initial range request to determine if the remote accepts range @@ -208,8 +210,8 @@ impl AsyncHttpRangeReader { tail_request_response: Response, extra_headers: HeaderMap, ) -> Result { - AsyncHttpRangeReaderBuilder::from_tail_response(tail_request_response)? - .client(client.into()) + AsyncHttpRangeReaderBuilder::new(client.into()) + .from_tail_response(tail_request_response)? .extra_headers(extra_headers) .build() .map_err(AsyncHttpRangeReaderError::from) @@ -243,8 +245,8 @@ impl AsyncHttpRangeReader { head_response: Response, extra_headers: HeaderMap, ) -> Result { - AsyncHttpRangeReaderBuilder::from_head_response(head_response)? - .client(client.into()) + AsyncHttpRangeReaderBuilder::new(client.into()) + .from_head_response(head_response)? .extra_headers(extra_headers) .build() .map_err(AsyncHttpRangeReaderError::from) @@ -285,9 +287,8 @@ impl AsyncHttpRangeReader { } } -#[derive(Default)] pub struct AsyncHttpRangeReaderBuilder { - client: Option, + client: reqwest_middleware::ClientWithMiddleware, url: Option, extra_headers: HeaderMap, requested_range: SparseRange, @@ -297,9 +298,16 @@ pub struct AsyncHttpRangeReaderBuilder { } impl AsyncHttpRangeReaderBuilder { - pub fn client(mut self, client: reqwest_middleware::ClientWithMiddleware) -> Self { - self.client = Some(client); - self + pub fn new(client: reqwest_middleware::ClientWithMiddleware) -> Self { + Self { + client, + url: None, + extra_headers: HeaderMap::default(), + requested_range: SparseRange::default(), + streamer_state: StreamerState::default(), + initial_tail_response: None, + content_length: 0, + } } pub fn url(mut self, url: Url) -> Self { @@ -312,7 +320,7 @@ impl AsyncHttpRangeReaderBuilder { self } - pub fn requested_range(mut self, requested_range: SparseRange) -> Self { + fn requested_range(mut self, requested_range: SparseRange) -> Self { self.requested_range = requested_range; self } @@ -322,7 +330,7 @@ impl AsyncHttpRangeReaderBuilder { self } - pub fn initial_tail_response(mut self, initial_tail_response: Option<(Response, u64)>) -> Self { + fn initial_tail_response(mut self, initial_tail_response: Option<(Response, u64)>) -> Self { self.initial_tail_response = initial_tail_response; self } @@ -332,7 +340,10 @@ impl AsyncHttpRangeReaderBuilder { self } - pub fn from_head_response(head_response: Response) -> Result { + pub fn from_head_response( + self, + head_response: Response, + ) -> Result { // Are range requests supported? if head_response .headers() @@ -352,14 +363,17 @@ impl AsyncHttpRangeReaderBuilder { .parse() .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?; - let builder = Self::default() + let builder = self .url(head_response.url().clone()) .content_length(content_length as usize); Ok(builder) } - pub fn from_tail_response(tail_response: Response) -> Result { + pub fn from_tail_response( + self, + tail_response: Response, + ) -> Result { let content_range = ContentRange::parse( tail_response .headers() @@ -385,7 +399,7 @@ impl AsyncHttpRangeReaderBuilder { streamer_state .requested_ranges .push(complete_length - (finish - start)..complete_length); - let builder = Self::default() + let builder = self .url(tail_response.url().clone()) .initial_tail_response(Some((tail_response, start))) .requested_range(requested_range) @@ -395,13 +409,14 @@ impl AsyncHttpRangeReaderBuilder { } pub fn build(self) -> Result { - let Some(client) = self.client else { - return Err(AsyncHttpRangeReaderBuilderError::MissingClient); - }; let Some(url) = self.url else { return Err(AsyncHttpRangeReaderBuilderError::MissingUrl); }; + if self.content_length == 0 { + return Err(AsyncHttpRangeReaderBuilderError::InvalidContentLength); + } + let memory_map = memmap2::MmapOptions::new() .len(self.content_length) .map_anon() @@ -421,7 +436,7 @@ impl AsyncHttpRangeReaderBuilder { let (state_tx, state_rx) = watch::channel(StreamerState::default()); tokio::spawn(run_streamer( - client, + self.client, url.clone(), self.extra_headers, self.initial_tail_response, @@ -888,9 +903,18 @@ mod test { .await .expect("could not initialize server"); - AsyncHttpRangeReader::builder() - .client(Client::new().into()) - .url(server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap()) + let url = server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(); + let head_response = AsyncHttpRangeReader::initial_head_request( + Client::new(), + url.clone(), + HeaderMap::default(), + ) + .await + .expect("could not perform head request"); + + AsyncHttpRangeReader::builder(Client::new().into()) + .from_head_response(head_response) + .expect("could not build reader") .build() .expect("could not build reader"); } From b584600085b9a8b1bf78bec33bc7a15b9e98eb5d Mon Sep 17 00:00:00 2001 From: Tor Arvid Lund Date: Sun, 28 Apr 2024 13:13:11 +0200 Subject: [PATCH 6/7] Add additional tests --- src/lib.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 60c4c8f..1a2abdd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ mod sparse_range; use error::AsyncHttpRangeReaderBuilderError; use futures::{FutureExt, Stream, StreamExt}; use http_content_range::{ContentRange, ContentRangeBytes}; +use itertools::Itertools; use memmap2::MmapMut; use reqwest::header::HeaderMap; use reqwest::{Response, Url}; @@ -714,7 +715,7 @@ mod test { use reqwest::{Client, StatusCode}; use rstest::*; use std::path::Path; - use tokio::io::AsyncReadExt as _; + use tokio::{fs::File, io::AsyncReadExt as _}; use tokio_util::compat::TokioAsyncReadCompatExt; #[rstest] @@ -838,7 +839,7 @@ mod test { .expect("could not initialize server"); // Construct an AsyncRangeReader - let (mut range, _) = AsyncHttpRangeReader::new( + let (range, _) = AsyncHttpRangeReader::new( Client::new(), server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(), check_method, @@ -848,10 +849,14 @@ mod test { .expect("bla"); // Also open a simple file reader - let mut file = tokio::fs::File::open(path.join("andes-1.8.3-pyhd8ed1ab_0.conda")) + let file = tokio::fs::File::open(path.join("andes-1.8.3-pyhd8ed1ab_0.conda")) .await .unwrap(); + assert_range_and_file_contents_match(range, file).await; + } + + async fn assert_range_and_file_contents_match(mut range: AsyncHttpRangeReader, mut file: File) { // Read until the end and make sure that the contents matches let mut range_read = vec![0; 64 * 1024]; let mut file_read = vec![0; 64 * 1024]; @@ -904,6 +909,8 @@ mod test { .expect("could not initialize server"); let url = server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(); + + // First, let's try to build the reader from a head response let head_response = AsyncHttpRangeReader::initial_head_request( Client::new(), url.clone(), @@ -912,10 +919,50 @@ mod test { .await .expect("could not perform head request"); - AsyncHttpRangeReader::builder(Client::new().into()) + let builder = AsyncHttpRangeReader::builder(Client::new().into()) .from_head_response(head_response) - .expect("could not build reader") + .expect("could not build reader from head response") + .build() + .expect("could not build reader"); + let file = tokio::fs::File::open(path.join("andes-1.8.3-pyhd8ed1ab_0.conda")) + .await + .unwrap(); + + // And let's make sure that the contents match the file + assert_range_and_file_contents_match(builder, file).await; + + // Now let's try to build the reader from a tail response + let tail_response = AsyncHttpRangeReader::initial_tail_request( + Client::new(), + url.clone(), + 8192, + HeaderMap::default(), + ) + .await + .expect("could not perform tail request"); + + let builder = AsyncHttpRangeReader::builder(Client::new().into()) + .from_tail_response(tail_response) + .expect("could not build reader from tail response") .build() .expect("could not build reader"); + let file = tokio::fs::File::open(path.join("andes-1.8.3-pyhd8ed1ab_0.conda")) + .await + .unwrap(); + + // And again let's make sure that the contents match the file + assert_range_and_file_contents_match(builder, file).await; + } + + #[test] + fn test_builder_fails_on_missing_content_length() { + let url = Url::parse("http://localhost").unwrap(); + let result = AsyncHttpRangeReader::builder(Client::new().into()) + .url(url) + .build(); + assert_matches!( + result, + Err(AsyncHttpRangeReaderBuilderError::InvalidContentLength) + ); } } From ff8c6210fde83cfb887c81d95df8b4f8258945d7 Mon Sep 17 00:00:00 2001 From: Tor Arvid Lund Date: Sun, 28 Apr 2024 20:32:48 +0200 Subject: [PATCH 7/7] Remove unused import --- src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 1a2abdd..bfc94bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,6 @@ mod sparse_range; use error::AsyncHttpRangeReaderBuilderError; use futures::{FutureExt, Stream, StreamExt}; use http_content_range::{ContentRange, ContentRangeBytes}; -use itertools::Itertools; use memmap2::MmapMut; use reqwest::header::HeaderMap; use reqwest::{Response, Url};