From fa7cd6d96cf54ddc7ed74bb43bedcf48f3f3295b Mon Sep 17 00:00:00 2001 From: konstin Date: Wed, 8 Nov 2023 21:21:35 +0100 Subject: [PATCH 1/2] Proper caching support --- Cargo.toml | 2 +- src/lib.rs | 61 ++++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6f05790..8adcc14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "async_http_range_reader" authors = ["Bas Zalmstra "] -version = "0.3.0" +version = "0.4.0" edition = "2021" description = "A library for streaming reading of files over HTTP using range requests" license = "MIT" diff --git a/src/lib.rs b/src/lib.rs index 67b7b0a..d070043 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,35 +109,57 @@ impl AsyncHttpRangeReader { ) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> { match check_method { CheckSupportMethod::NegativeRangeRequest(initial_chunk_size) => { - Self::new_tail_request(client, url, initial_chunk_size).await + let response = Self::initial_tail_request( + client.clone(), + url.clone(), + initial_chunk_size, + HeaderMap::default(), + ) + .await?; + let response_headers = response.headers().clone(); + let self_ = Self::new_tail_response(client, url, response).await?; + Ok((self_, response_headers)) + } + CheckSupportMethod::Head => { + let response = + Self::new_head_request(client.clone(), url.clone(), HeaderMap::default()) + .await?; + let response_headers = response.headers().clone(); + let self_ = Self::new_head_response(client, url, response).await?; + Ok((self_, response_headers)) } - CheckSupportMethod::Head => Self::new_head(client, url).await, } } /// An initial range request is performed to the server to determine if the remote accepts range /// requests. This will return a number of bytes from the end of the stream. Use the - /// `initial_chunk_size` paramter to define how many bytes should be requested from the end. - pub async fn new_tail_request( + /// `initial_chunk_size` parameter to define how many bytes should be requested from the end. + pub async fn initial_tail_request( client: reqwest::Client, url: reqwest::Url, initial_chunk_size: u64, - ) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> { - // Perform an initial range request to get the size of the file - let tail_request_response = client - .get(url.clone()) + extra_headers: HeaderMap, + ) -> Result { + let tail_response = client + .get(url) .header( reqwest::header::RANGE, format!("bytes=-{initial_chunk_size}"), ) - .header(reqwest::header::CACHE_CONTROL, "no-cache") + .headers(extra_headers) .send() .await .and_then(Response::error_for_status) .map_err(Arc::new) .map_err(AsyncHttpRangeReaderError::HttpError)?; - let response_header = tail_request_response.headers().clone(); + Ok(tail_response) + } + pub async fn new_tail_response( + client: reqwest::Client, + url: reqwest::Url, + tail_request_response: Response, + ) -> Result { // Get the size of the file from this initial request let content_range = ContentRange::parse( tail_request_response @@ -204,23 +226,31 @@ impl AsyncHttpRangeReader { poll_request_tx: None, }), }; - Ok((reader, response_header)) + Ok(reader) } - async fn new_head( + pub async fn new_head_request( client: reqwest::Client, url: reqwest::Url, - ) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> { + extra_headers: HeaderMap, + ) -> Result { // Perform a HEAD request to get the content-length. let head_response = client .head(url.clone()) - .header(reqwest::header::CACHE_CONTROL, "no-cache") + .headers(extra_headers) .send() .await .and_then(Response::error_for_status) .map_err(Arc::new) .map_err(AsyncHttpRangeReaderError::HttpError)?; + Ok(head_response) + } + pub async fn new_head_response( + client: reqwest::Client, + url: reqwest::Url, + head_response: Response, + ) -> Result { // Are range requests supported? if head_response .headers() @@ -279,7 +309,7 @@ impl AsyncHttpRangeReader { poll_request_tx: None, }), }; - Ok((reader, head_response.headers().clone())) + Ok(reader) } /// Returns the ranges that this instance actually performed HTTP requests for. @@ -378,7 +408,6 @@ async fn run_streamer( let response = match client .get(url.clone()) .header(reqwest::header::RANGE, range_string) - .header(reqwest::header::CACHE_CONTROL, "no-cache") .send() .instrument(span) .await From 2db4792b182b17c97def08ef6f77c9b029dabad6 Mon Sep 17 00:00:00 2001 From: konstin Date: Fri, 10 Nov 2023 11:47:45 +0100 Subject: [PATCH 2/2] Add docs and remove url parameter --- Cargo.toml | 1 + src/lib.rs | 74 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 59 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8adcc14..039e1eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ tower-http = { version = "0.4.4", default-features = false, features = ["fs"] } async_zip = { version = "0.0.15", default-features = false, features = ["tokio"] } assert_matches = "1.5.0" rstest = { version = "0.18.2" } +url = { version = "2.4.1" } # The profile that 'cargo dist' will build with [profile.dist] diff --git a/src/lib.rs b/src/lib.rs index d070043..6b33bbe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,36 @@ pub use error::AsyncHttpRangeReaderError; /// An `AsyncRangeReader` enables reading from a file over HTTP using range requests. /// /// See the [`crate`] level documentation for more information. +/// +/// The general entrypoint is [`AsyncHttpRangeReader::new`]. Depending on the +/// [`CheckSupportMethod`], this will either call [`AsyncHttpRangeReader::initial_tail_request`] or +/// [`AsyncHttpRangeReader::initial_head_request`] to send the initial request and then +/// [`AsyncHttpRangeReader::from_tail_response`] or [`AsyncHttpRangeReader::from_head_response`] to +/// initialize the async reader. If you want to apply a caching layer, you can send the initial head +/// (or tail) request yourself with your cache headers (e.g. through the +/// [http-cache-semantics](https://docs.rs/http-cache-semantics) crate): +/// +/// ```rust +/// # use url::Url; +/// # use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError}; +/// async fn get_reader_cached( +/// url: Url, +/// ) -> Result, AsyncHttpRangeReaderError> { +/// let etag = "63c550e8-5ae"; +/// let client = reqwest::Client::new(); +/// let response = client +/// .head(url.clone()) +/// .header(reqwest::header::IF_NONE_MATCH, etag) +/// .send() +/// .await?; +/// if response.status() == reqwest::StatusCode::NOT_MODIFIED { +/// Ok(None) +/// } else { +/// let reader = AsyncHttpRangeReader::from_head_response(client, response).await?; +/// Ok(Some(reader)) +/// } +/// } +/// ``` #[derive(Debug)] pub struct AsyncHttpRangeReader { inner: Mutex, @@ -89,14 +119,17 @@ struct Inner { poll_request_tx: Option>>, } +/// For the initial request, we support either directly requesting N bytes from the end for file +/// or, if you the server doesn't support negative byte offsets, starting with a HEAD request +/// instead pub enum CheckSupportMethod { - // Perform a range request with a negative byte range. This will return the N bytes from the - // *end* of the file as well as the file-size. This is especially useful to also immediately - // get some bytes from the end of the file. + /// Perform a range request with a negative byte range. This will return the N bytes from the + /// *end* of the file as well as the file-size. This is especially useful to also immediately + /// get some bytes from the end of the file. NegativeRangeRequest(u64), - // Perform a head request to get the length of the file and check if the server supports range - // requests. + /// Perform a head request to get the length of the file and check if the server supports range + /// requests. Head, } @@ -117,21 +150,21 @@ impl AsyncHttpRangeReader { ) .await?; let response_headers = response.headers().clone(); - let self_ = Self::new_tail_response(client, url, response).await?; + let self_ = Self::from_tail_response(client, response).await?; Ok((self_, response_headers)) } CheckSupportMethod::Head => { let response = - Self::new_head_request(client.clone(), url.clone(), HeaderMap::default()) + Self::initial_head_request(client.clone(), url.clone(), HeaderMap::default()) .await?; let response_headers = response.headers().clone(); - let self_ = Self::new_head_response(client, url, response).await?; + let self_ = Self::from_head_response(client, response).await?; Ok((self_, response_headers)) } } } - /// An initial range request is performed to the server to determine if the remote accepts range + /// Send an initial range request to determine if the remote accepts range /// requests. This will return a number of bytes from the end of the stream. Use the /// `initial_chunk_size` parameter to define how many bytes should be requested from the end. pub async fn initial_tail_request( @@ -155,9 +188,10 @@ impl AsyncHttpRangeReader { Ok(tail_response) } - pub async fn new_tail_response( + /// Initialize the reader from [`AsyncHttpRangeReader::initial_tail_request`] (or a user + /// provided response that also has a range of bytes from the end as body) + pub async fn from_tail_response( client: reqwest::Client, - url: reqwest::Url, tail_request_response: Response, ) -> Result { // Get the size of the file from this initial request @@ -201,7 +235,7 @@ impl AsyncHttpRangeReader { let (state_tx, state_rx) = watch::channel(StreamerState::default()); tokio::spawn(run_streamer( client, - url, + tail_request_response.url().clone(), Some((tail_request_response, start)), memory_map, state_tx, @@ -229,7 +263,9 @@ impl AsyncHttpRangeReader { Ok(reader) } - pub async fn new_head_request( + /// Send an initial range request to determine if the remote accepts range + /// requests and get the content length + pub async fn initial_head_request( client: reqwest::Client, url: reqwest::Url, extra_headers: HeaderMap, @@ -246,9 +282,10 @@ impl AsyncHttpRangeReader { Ok(head_response) } - pub async fn new_head_response( + /// Initialize the reader from [`AsyncHttpRangeReader::initial_head_request`] (or a user + /// provided response the) + pub async fn from_head_response( client: reqwest::Client, - url: reqwest::Url, head_response: Response, ) -> Result { // Are range requests supported? @@ -291,7 +328,12 @@ impl AsyncHttpRangeReader { let (request_tx, request_rx) = tokio::sync::mpsc::channel(10); let (state_tx, state_rx) = watch::channel(StreamerState::default()); tokio::spawn(run_streamer( - client, url, None, memory_map, state_tx, request_rx, + client, + head_response.url().clone(), + None, + memory_map, + state_tx, + request_rx, )); // Configure the initial state of the streamer.