Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper caching support #2

Merged
merged 2 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async_http_range_reader"
authors = ["Bas Zalmstra <[email protected]>"]
version = "0.3.0"
version = "0.4.0"
edition = "2021"
description = "A library for streaming reading of files over HTTP using range requests"
license = "MIT"
Expand All @@ -28,6 +28,7 @@ tower-http = { version = "0.4.4", default-features = false, features = ["fs"] }
async_zip = { version = "0.0.15", default-features = false, features = ["tokio"] }
assert_matches = "1.5.0"
rstest = { version = "0.18.2" }
url = { version = "2.4.1" }

# The profile that 'cargo dist' will build with
[profile.dist]
Expand Down
119 changes: 95 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,36 @@ pub use error::AsyncHttpRangeReaderError;
/// An `AsyncRangeReader` enables reading from a file over HTTP using range requests.
///
/// See the [`crate`] level documentation for more information.
///
/// The general entrypoint is [`AsyncHttpRangeReader::new`]. Depending on the
/// [`CheckSupportMethod`], this will either call [`AsyncHttpRangeReader::initial_tail_request`] or
/// [`AsyncHttpRangeReader::initial_head_request`] to send the initial request and then
/// [`AsyncHttpRangeReader::from_tail_response`] or [`AsyncHttpRangeReader::from_head_response`] to
/// initialize the async reader. If you want to apply a caching layer, you can send the initial head
/// (or tail) request yourself with your cache headers (e.g. through the
/// [http-cache-semantics](https://docs.rs/http-cache-semantics) crate):
///
/// ```rust
/// # use url::Url;
/// # use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
/// async fn get_reader_cached(
/// url: Url,
/// ) -> Result<Option<AsyncHttpRangeReader>, AsyncHttpRangeReaderError> {
/// let etag = "63c550e8-5ae";
/// let client = reqwest::Client::new();
/// let response = client
/// .head(url.clone())
/// .header(reqwest::header::IF_NONE_MATCH, etag)
/// .send()
/// .await?;
/// if response.status() == reqwest::StatusCode::NOT_MODIFIED {
/// Ok(None)
/// } else {
/// let reader = AsyncHttpRangeReader::from_head_response(client, response).await?;
/// Ok(Some(reader))
/// }
/// }
/// ```
#[derive(Debug)]
pub struct AsyncHttpRangeReader {
inner: Mutex<Inner>,
Expand Down Expand Up @@ -89,14 +119,17 @@ struct Inner {
poll_request_tx: Option<PollSender<Range<u64>>>,
}

/// For the initial request, we support either directly requesting N bytes from the end for file
/// or, if you the server doesn't support negative byte offsets, starting with a HEAD request
/// instead
pub enum CheckSupportMethod {
// Perform a range request with a negative byte range. This will return the N bytes from the
// *end* of the file as well as the file-size. This is especially useful to also immediately
// get some bytes from the end of the file.
/// Perform a range request with a negative byte range. This will return the N bytes from the
/// *end* of the file as well as the file-size. This is especially useful to also immediately
/// get some bytes from the end of the file.
NegativeRangeRequest(u64),

// Perform a head request to get the length of the file and check if the server supports range
// requests.
/// Perform a head request to get the length of the file and check if the server supports range
/// requests.
Head,
}

Expand All @@ -109,35 +142,58 @@ impl AsyncHttpRangeReader {
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
match check_method {
CheckSupportMethod::NegativeRangeRequest(initial_chunk_size) => {
Self::new_tail_request(client, url, initial_chunk_size).await
let response = Self::initial_tail_request(
client.clone(),
url.clone(),
initial_chunk_size,
HeaderMap::default(),
)
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_tail_response(client, response).await?;
Ok((self_, response_headers))
}
CheckSupportMethod::Head => {
let response =
Self::initial_head_request(client.clone(), url.clone(), HeaderMap::default())
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_head_response(client, response).await?;
Ok((self_, response_headers))
}
CheckSupportMethod::Head => Self::new_head(client, url).await,
}
}

/// An initial range request is performed to the server to determine if the remote accepts range
/// Send an initial range request to determine if the remote accepts range
/// requests. This will return a number of bytes from the end of the stream. Use the
/// `initial_chunk_size` paramter to define how many bytes should be requested from the end.
pub async fn new_tail_request(
/// `initial_chunk_size` parameter to define how many bytes should be requested from the end.
pub async fn initial_tail_request(
client: reqwest::Client,
url: reqwest::Url,
initial_chunk_size: u64,
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
// Perform an initial range request to get the size of the file
let tail_request_response = client
.get(url.clone())
extra_headers: HeaderMap,
) -> Result<Response, AsyncHttpRangeReaderError> {
let tail_response = client
.get(url)
.header(
reqwest::header::RANGE,
format!("bytes=-{initial_chunk_size}"),
)
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.headers(extra_headers)
.send()
.await
.and_then(Response::error_for_status)
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::HttpError)?;
let response_header = tail_request_response.headers().clone();
Ok(tail_response)
}

/// Initialize the reader from [`AsyncHttpRangeReader::initial_tail_request`] (or a user
/// provided response that also has a range of bytes from the end as body)
pub async fn from_tail_response(
client: reqwest::Client,
tail_request_response: Response,
) -> Result<Self, AsyncHttpRangeReaderError> {
// Get the size of the file from this initial request
let content_range = ContentRange::parse(
tail_request_response
Expand Down Expand Up @@ -179,7 +235,7 @@ impl AsyncHttpRangeReader {
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client,
url,
tail_request_response.url().clone(),
Some((tail_request_response, start)),
memory_map,
state_tx,
Expand All @@ -204,23 +260,34 @@ impl AsyncHttpRangeReader {
poll_request_tx: None,
}),
};
Ok((reader, response_header))
Ok(reader)
}

async fn new_head(
/// Send an initial range request to determine if the remote accepts range
/// requests and get the content length
pub async fn initial_head_request(
client: reqwest::Client,
url: reqwest::Url,
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
extra_headers: HeaderMap,
) -> Result<Response, AsyncHttpRangeReaderError> {
// Perform a HEAD request to get the content-length.
let head_response = client
.head(url.clone())
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.headers(extra_headers)
.send()
.await
.and_then(Response::error_for_status)
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::HttpError)?;
Ok(head_response)
}

/// Initialize the reader from [`AsyncHttpRangeReader::initial_head_request`] (or a user
/// provided response the)
pub async fn from_head_response(
client: reqwest::Client,
head_response: Response,
) -> Result<Self, AsyncHttpRangeReaderError> {
// Are range requests supported?
if head_response
.headers()
Expand Down Expand Up @@ -261,7 +328,12 @@ impl AsyncHttpRangeReader {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(10);
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client, url, None, memory_map, state_tx, request_rx,
client,
head_response.url().clone(),
None,
memory_map,
state_tx,
request_rx,
));

// Configure the initial state of the streamer.
Expand All @@ -279,7 +351,7 @@ impl AsyncHttpRangeReader {
poll_request_tx: None,
}),
};
Ok((reader, head_response.headers().clone()))
Ok(reader)
}

/// Returns the ranges that this instance actually performed HTTP requests for.
Expand Down Expand Up @@ -378,7 +450,6 @@ async fn run_streamer(
let response = match client
.get(url.clone())
.header(reqwest::header::RANGE, range_string)
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.send()
.instrument(span)
.await
Expand Down