Skip to content

Commit

Permalink
Leverage async stream (#977)
Browse files Browse the repository at this point in the history
* Leverage async stream

When reading data from `GetObject` request in `ObjectPartStream`, return
them as `Stream` instead of writing into a channel. This makes the flow
easier to follow.

Signed-off-by: Monthon Klongklaew <[email protected]>

* PR comments

Signed-off-by: Monthon Klongklaew <[email protected]>

---------

Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk authored Aug 9, 2024
1 parent 8869934 commit 264d28e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 66 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ time = { version = "0.3.17", features = ["macros", "formatting"] }
tracing = { version = "0.1.35", features = ["log"] }
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
async-stream = "0.3.5"

[target.'cfg(target_os = "linux")'.dependencies]
procfs = { version = "0.16.0", default-features = false }
Expand Down
22 changes: 9 additions & 13 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::time::Instant;
use std::{ops::Range, sync::Arc};

use async_channel::{unbounded, Receiver};
use bytes::Bytes;
use futures::task::{Spawn, SpawnExt};
use futures::{join, pin_mut, StreamExt};
use futures::{pin_mut, Stream, StreamExt};
use mountpoint_s3_client::ObjectClient;
use tracing::{debug_span, trace, warn, Instrument};

Expand All @@ -14,7 +13,7 @@ use crate::object::ObjectId;
use crate::prefetch::part::Part;
use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer};
use crate::prefetch::part_stream::{
try_read_from_request, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig,
read_from_request, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig,
};
use crate::prefetch::task::RequestTask;
use crate::prefetch::PrefetchReadError;
Expand Down Expand Up @@ -161,16 +160,13 @@ where
cache: self.cache.clone(),
};

let (body_sender, body_receiver) = unbounded();
let request_reader_future = try_read_from_request(
body_sender,
let request_stream = read_from_request(
self.client.clone(),
bucket.clone(),
cache_key.clone(),
block_aligned_byte_range,
);
let part_composer_future = part_composer.try_compose_parts(body_receiver);
join!(request_reader_future, part_composer_future);
part_composer.try_compose_parts(request_stream).await;
}

fn block_indices_for_byte_range(&self, range: &RequestRange) -> Range<BlockIndex> {
Expand Down Expand Up @@ -200,8 +196,8 @@ where
E: std::error::Error + Send + Sync,
Cache: DataCache + Send + Sync,
{
async fn try_compose_parts(&mut self, body_receiver: Receiver<RequestReaderOutput<E>>) {
if let Err(e) = self.compose_parts(body_receiver).await {
async fn try_compose_parts(&mut self, request_stream: impl Stream<Item = RequestReaderOutput<E>>) {
if let Err(e) = self.compose_parts(request_stream).await {
trace!(error=?e, "part stream task failed");
self.part_queue_producer.push(Err(e));
}
Expand All @@ -210,13 +206,13 @@ where

async fn compose_parts(
&mut self,
body_receiver: Receiver<RequestReaderOutput<E>>,
request_stream: impl Stream<Item = RequestReaderOutput<E>>,
) -> Result<(), PrefetchReadError<E>> {
let key = self.cache_key.key();
let block_size = self.cache.block_size();

pin_mut!(body_receiver);
while let Some(next) = body_receiver.next().await {
pin_mut!(request_stream);
while let Some(next) = request_stream.next().await {
assert!(
self.buffer.len() < block_size as usize,
"buffer should be flushed when we get a full block"
Expand Down
85 changes: 32 additions & 53 deletions mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_channel::{unbounded, Receiver, Sender};
use async_stream::try_stream;
use bytes::Bytes;
use futures::task::SpawnExt;
use futures::{join, pin_mut, task::Spawn, StreamExt};
use futures::task::{Spawn, SpawnExt};
use futures::{pin_mut, Stream, StreamExt};
use mountpoint_s3_client::ObjectClient;
use std::marker::{Send, Sync};
use std::{fmt::Debug, ops::Range};
Expand Down Expand Up @@ -196,17 +196,14 @@ where
.runtime
.spawn_with_handle(
async move {
let (body_sender, body_receiver) = unbounded();
let part_composer = ClientPartComposer {
part_queue_producer,
object_id: config.object_id.clone(),
preferred_part_size: config.preferred_part_size,
};

let request_reader_future =
try_read_from_request(body_sender, client, bucket, config.object_id, request_range.into());
let part_composer_future = part_composer.try_compose_parts(body_receiver);
join!(request_reader_future, part_composer_future);
let request_stream = read_from_request(client, bucket, config.object_id, request_range.into());
part_composer.try_compose_parts(request_stream).await;
}
.instrument(span),
)
Expand All @@ -223,17 +220,20 @@ struct ClientPartComposer<E: std::error::Error> {
}

impl<E: std::error::Error + Send + Sync> ClientPartComposer<E> {
async fn try_compose_parts(&self, body_receiver: Receiver<RequestReaderOutput<E>>) {
if let Err(e) = self.compose_parts(body_receiver).await {
async fn try_compose_parts(&self, request_stream: impl Stream<Item = RequestReaderOutput<E>>) {
if let Err(e) = self.compose_parts(request_stream).await {
trace!(error=?e, "part stream task failed");
self.part_queue_producer.push(Err(e));
}
trace!("part composer finished");
}

async fn compose_parts(&self, body_receiver: Receiver<RequestReaderOutput<E>>) -> Result<(), PrefetchReadError<E>> {
pin_mut!(body_receiver);
while let Some(next) = body_receiver.next().await {
async fn compose_parts(
&self,
request_stream: impl Stream<Item = RequestReaderOutput<E>>,
) -> Result<(), PrefetchReadError<E>> {
pin_mut!(request_stream);
while let Some(next) = request_stream.next().await {
let (offset, body) = next?;
// pre-split the body into multiple parts as suggested by preferred part size
// in order to avoid validating checksum on large parts at read.
Expand All @@ -254,54 +254,33 @@ impl<E: std::error::Error + Send + Sync> ClientPartComposer<E> {
}
}

/// Creates a `GetObject` request with the specified range and sends received body parts to the channel
/// pointed by `body_sender`. Once the request was finished (`None` returned), this future closes the
/// sending part of the channel and returns. After this the receiving part of the channel will still
/// be able to receive pending chunks. If the receiving part of the channel is closed before the request
/// was finished, future completes itself early canceling the request.
pub async fn try_read_from_request<Client: ObjectClient>(
body_sender: Sender<RequestReaderOutput<Client::ClientError>>,
client: Client,
bucket: String,
id: ObjectId,
request_range: Range<u64>,
) {
if let Err(e) = read_from_request(body_sender.clone(), client, bucket, id, request_range).await {
trace!(error=?e, "part stream request failed");
if body_sender.send(Err(e)).await.is_err() {
trace!("body channel closed");
}
}
trace!("request finished");
}

async fn read_from_request<Client: ObjectClient>(
body_sender: Sender<RequestReaderOutput<Client::ClientError>>,
/// Creates a `GetObject` request with the specified range and sends received body parts to the stream.
/// A [PrefetchReadError] is returned when the request cannot be completed.
pub fn read_from_request<Client: ObjectClient>(
client: Client,
bucket: String,
id: ObjectId,
request_range: Range<u64>,
) -> Result<(), PrefetchReadError<Client::ClientError>> {
let request = client
.get_object(&bucket, id.key(), Some(request_range), Some(id.etag().clone()))
.await
.inspect_err(|e| error!(key=id.key(), error=?e, "GetObject request failed"))
.map_err(PrefetchReadError::GetRequestFailed)?;

pin_mut!(request);
while let Some(next) = request.next().await {
let (offset, body) = next
.inspect_err(|e| error!(key=id.key(), error=?e, "GetObject body part failed"))
) -> impl Stream<Item = RequestReaderOutput<Client::ClientError>> {
try_stream! {
let request = client
.get_object(&bucket, id.key(), Some(request_range), Some(id.etag().clone()))
.await
.inspect_err(|e| error!(key=id.key(), error=?e, "GetObject request failed"))
.map_err(PrefetchReadError::GetRequestFailed)?;

trace!(offset, length = body.len(), "received GetObject part");
metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64);
if body_sender.send(Ok((offset, body))).await.is_err() {
trace!("body channel closed");
break;
pin_mut!(request);
while let Some(next) = request.next().await {
let (offset, body) = next
.inspect_err(|e| error!(key=id.key(), error=?e, "GetObject body part failed"))
.map_err(PrefetchReadError::GetRequestFailed)?;

trace!(offset, length = body.len(), "received GetObject part");
metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64);
yield(offset, body);
}
trace!("request finished");
}
Ok(())
}

#[cfg(test)]
Expand Down

1 comment on commit 264d28e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 264d28e Previous: 8869934 Ratio
sequential_read_direct_io_small_file 768.5240234375 MiB/s 1588.81240234375 MiB/s 2.07

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.