Skip to content

Commit

Permalink
Proper caching support (#2)
Browse files Browse the repository at this point in the history
* Proper caching support

* Add docs and remove url parameter
  • Loading branch information
konstin authored Nov 10, 2023
1 parent 4cafe5a commit 8dab2c0
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 25 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async_http_range_reader"
authors = ["Bas Zalmstra <[email protected]>"]
version = "0.3.0"
version = "0.4.0"
edition = "2021"
description = "A library for streaming reading of files over HTTP using range requests"
license = "MIT"
Expand All @@ -28,6 +28,7 @@ tower-http = { version = "0.4.4", default-features = false, features = ["fs"] }
async_zip = { version = "0.0.15", default-features = false, features = ["tokio"] }
assert_matches = "1.5.0"
rstest = { version = "0.18.2" }
url = { version = "2.4.1" }

# The profile that 'cargo dist' will build with
[profile.dist]
Expand Down
119 changes: 95 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,36 @@ pub use error::AsyncHttpRangeReaderError;
/// An `AsyncRangeReader` enables reading from a file over HTTP using range requests.
///
/// See the [`crate`] level documentation for more information.
///
/// The general entrypoint is [`AsyncHttpRangeReader::new`]. Depending on the
/// [`CheckSupportMethod`], this will either call [`AsyncHttpRangeReader::initial_tail_request`] or
/// [`AsyncHttpRangeReader::initial_head_request`] to send the initial request and then
/// [`AsyncHttpRangeReader::from_tail_response`] or [`AsyncHttpRangeReader::from_head_response`] to
/// initialize the async reader. If you want to apply a caching layer, you can send the initial head
/// (or tail) request yourself with your cache headers (e.g. through the
/// [http-cache-semantics](https://docs.rs/http-cache-semantics) crate):
///
/// ```rust
/// # use url::Url;
/// # use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
/// async fn get_reader_cached(
/// url: Url,
/// ) -> Result<Option<AsyncHttpRangeReader>, AsyncHttpRangeReaderError> {
/// let etag = "63c550e8-5ae";
/// let client = reqwest::Client::new();
/// let response = client
/// .head(url.clone())
/// .header(reqwest::header::IF_NONE_MATCH, etag)
/// .send()
/// .await?;
/// if response.status() == reqwest::StatusCode::NOT_MODIFIED {
/// Ok(None)
/// } else {
/// let reader = AsyncHttpRangeReader::from_head_response(client, response).await?;
/// Ok(Some(reader))
/// }
/// }
/// ```
#[derive(Debug)]
pub struct AsyncHttpRangeReader {
inner: Mutex<Inner>,
Expand Down Expand Up @@ -89,14 +119,17 @@ struct Inner {
poll_request_tx: Option<PollSender<Range<u64>>>,
}

/// For the initial request, we support either directly requesting N bytes from the end for file
/// or, if you the server doesn't support negative byte offsets, starting with a HEAD request
/// instead
pub enum CheckSupportMethod {
// Perform a range request with a negative byte range. This will return the N bytes from the
// *end* of the file as well as the file-size. This is especially useful to also immediately
// get some bytes from the end of the file.
/// Perform a range request with a negative byte range. This will return the N bytes from the
/// *end* of the file as well as the file-size. This is especially useful to also immediately
/// get some bytes from the end of the file.
NegativeRangeRequest(u64),

// Perform a head request to get the length of the file and check if the server supports range
// requests.
/// Perform a head request to get the length of the file and check if the server supports range
/// requests.
Head,
}

Expand All @@ -109,35 +142,58 @@ impl AsyncHttpRangeReader {
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
match check_method {
CheckSupportMethod::NegativeRangeRequest(initial_chunk_size) => {
Self::new_tail_request(client, url, initial_chunk_size).await
let response = Self::initial_tail_request(
client.clone(),
url.clone(),
initial_chunk_size,
HeaderMap::default(),
)
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_tail_response(client, response).await?;
Ok((self_, response_headers))
}
CheckSupportMethod::Head => {
let response =
Self::initial_head_request(client.clone(), url.clone(), HeaderMap::default())
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_head_response(client, response).await?;
Ok((self_, response_headers))
}
CheckSupportMethod::Head => Self::new_head(client, url).await,
}
}

/// An initial range request is performed to the server to determine if the remote accepts range
/// Send an initial range request to determine if the remote accepts range
/// requests. This will return a number of bytes from the end of the stream. Use the
/// `initial_chunk_size` paramter to define how many bytes should be requested from the end.
pub async fn new_tail_request(
/// `initial_chunk_size` parameter to define how many bytes should be requested from the end.
pub async fn initial_tail_request(
client: reqwest::Client,
url: reqwest::Url,
initial_chunk_size: u64,
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
// Perform an initial range request to get the size of the file
let tail_request_response = client
.get(url.clone())
extra_headers: HeaderMap,
) -> Result<Response, AsyncHttpRangeReaderError> {
let tail_response = client
.get(url)
.header(
reqwest::header::RANGE,
format!("bytes=-{initial_chunk_size}"),
)
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.headers(extra_headers)
.send()
.await
.and_then(Response::error_for_status)
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::HttpError)?;
let response_header = tail_request_response.headers().clone();
Ok(tail_response)
}

/// Initialize the reader from [`AsyncHttpRangeReader::initial_tail_request`] (or a user
/// provided response that also has a range of bytes from the end as body)
pub async fn from_tail_response(
client: reqwest::Client,
tail_request_response: Response,
) -> Result<Self, AsyncHttpRangeReaderError> {
// Get the size of the file from this initial request
let content_range = ContentRange::parse(
tail_request_response
Expand Down Expand Up @@ -179,7 +235,7 @@ impl AsyncHttpRangeReader {
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client,
url,
tail_request_response.url().clone(),
Some((tail_request_response, start)),
memory_map,
state_tx,
Expand All @@ -204,23 +260,34 @@ impl AsyncHttpRangeReader {
poll_request_tx: None,
}),
};
Ok((reader, response_header))
Ok(reader)
}

async fn new_head(
/// Send an initial range request to determine if the remote accepts range
/// requests and get the content length
pub async fn initial_head_request(
client: reqwest::Client,
url: reqwest::Url,
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
extra_headers: HeaderMap,
) -> Result<Response, AsyncHttpRangeReaderError> {
// Perform a HEAD request to get the content-length.
let head_response = client
.head(url.clone())
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.headers(extra_headers)
.send()
.await
.and_then(Response::error_for_status)
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::HttpError)?;
Ok(head_response)
}

/// Initialize the reader from [`AsyncHttpRangeReader::initial_head_request`] (or a user
/// provided response the)
pub async fn from_head_response(
client: reqwest::Client,
head_response: Response,
) -> Result<Self, AsyncHttpRangeReaderError> {
// Are range requests supported?
if head_response
.headers()
Expand Down Expand Up @@ -261,7 +328,12 @@ impl AsyncHttpRangeReader {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(10);
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client, url, None, memory_map, state_tx, request_rx,
client,
head_response.url().clone(),
None,
memory_map,
state_tx,
request_rx,
));

// Configure the initial state of the streamer.
Expand All @@ -279,7 +351,7 @@ impl AsyncHttpRangeReader {
poll_request_tx: None,
}),
};
Ok((reader, head_response.headers().clone()))
Ok(reader)
}

/// Returns the ranges that this instance actually performed HTTP requests for.
Expand Down Expand Up @@ -378,7 +450,6 @@ async fn run_streamer(
let response = match client
.get(url.clone())
.header(reqwest::header::RANGE, range_string)
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.send()
.instrument(span)
.await
Expand Down

0 comments on commit 8dab2c0

Please sign in to comment.