Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement linger for AppendRecordStream #48

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ secrecy = "0.8.0"
serde = { version = "1.0.214", optional = true, features = ["derive"] }
sync_docs = { path = "sync_docs" }
thiserror = "1.0.67"
tokio = { version = "1.41.1", features = ["time"] }
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }

[build-dependencies]
tonic-build = { version = "0.12.3", features = ["prost"] }

[dev-dependencies]
tokio = { version = "*", features = ["full"] }
rstest = "0.23.0"
tokio = { version = "1.41.1", features = ["full", "test-util"] }
tokio-stream = "0.1.16"

[features]
serde = ["dep:serde"]
157 changes: 154 additions & 3 deletions src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use bytesize::ByteSize;
use futures::{Stream, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
use tokio::time::Sleep;

use crate::types::{self, MeteredSize as _};

Expand All @@ -21,6 +23,8 @@ pub struct AppendRecordStreamOpts {
pub match_seq_num: Option<u64>,
/// Enforce a fencing token.
pub fencing_token: Option<Vec<u8>>,
/// Linger duration for ready records to send together as a batch.
pub linger: Option<Duration>,
}

impl Default for AppendRecordStreamOpts {
Expand All @@ -30,6 +34,7 @@ impl Default for AppendRecordStreamOpts {
max_batch_size: ByteSize::mib(1),
match_seq_num: None,
fencing_token: None,
linger: None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's default to 5ms (batching is generally helpful), and let it be disabled by making with_linger take an Option<Duration>.

}
}
}
Expand All @@ -41,9 +46,9 @@ impl AppendRecordStreamOpts {
}

/// Construct from existing options with the new maximum batch records.
pub fn with_max_batch_records(self, max_batch_records: impl Into<usize>) -> Self {
pub fn with_max_batch_records(self, max_batch_records: usize) -> Self {
Self {
max_batch_records: max_batch_records.into(),
max_batch_records,
..self
}
}
Expand Down Expand Up @@ -71,6 +76,19 @@ impl AppendRecordStreamOpts {
..self
}
}

/// Construct from existing options with the linger time.
pub fn with_linger(self, linger_duration: impl Into<Duration>) -> Self {
Self {
linger: Some(linger_duration.into()),
..self
}
}

fn linger_sleep_fut(&self) -> Option<Pin<Box<Sleep>>> {
self.linger
.map(|duration| Box::pin(tokio::time::sleep(duration)))
}
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -89,6 +107,7 @@ where
stream: S,
peeked_record: Option<types::AppendRecord>,
terminated: bool,
linger_sleep: Option<Pin<Box<Sleep>>>,
opts: AppendRecordStreamOpts,
}

Expand All @@ -107,6 +126,7 @@ where
stream,
peeked_record: None,
terminated: false,
linger_sleep: opts.linger_sleep_fut(),
opts,
})
}
Expand Down Expand Up @@ -140,6 +160,14 @@ where
return Poll::Ready(None);
}

if self
.linger_sleep
.as_mut()
.is_some_and(|fut| fut.poll_unpin(cx).is_pending())
{
return Poll::Pending;
}
Comment on lines +163 to +169
Copy link
Member

@shikhar shikhar Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that we want to trigger an append if any of these conditions can be satisfied:

  • max_batch_records accumulated from underlying stream
  • max_batch_size accumulated from underlying stream
  • linger sleep expired - and there is at least 1 accumulated record

So waiting upfront for linger time to expire when the underlying record stream could potentially satisfy the first 2 conditions, does not seem right to me...

BTW, I have a feeling this adaptor from AppendRecord to batched AppendInput given this combination of triggers may be clearest to implement using async_stream::stream!, and let that generate the state machine. Hand-written futures and streams tend to be hard to write correctly (and review)...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bugged me at first thinking linger is used to rate limit in many cases but makes sense if it matches the other conditions.

Will use async_stream::stream in that case, would make it much clearer.


let mut batch = Vec::with_capacity(self.opts.max_batch_records);
let mut batch_size = ByteSize::b(0);

Expand Down Expand Up @@ -169,6 +197,9 @@ where
if self.terminated {
Poll::Ready(None)
} else {
// Since we don't have any batches to send, we want to ignore the linger
// interval for the next poll.
self.linger_sleep = None;
Poll::Pending
}
} else {
Expand All @@ -182,6 +213,9 @@ where
*m += batch.len() as u64
}

// Reset the linger sleep future since the old one is polled ready.
self.linger_sleep = self.opts.linger_sleep_fut();
Comment on lines +216 to +217
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Poll::Ready(Some(types::AppendInput {
records: batch,
match_seq_num,
Expand All @@ -190,3 +224,120 @@ where
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use bytesize::ByteSize;
use futures::StreamExt as _;
use rstest::rstest;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::{
streams::{AppendRecordStream, AppendRecordStreamOpts},
types,
};

#[rstest]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! can you run these in the CI?

#[case(Some(2), None)]
#[case(None, Some(ByteSize::b(30)))]
#[case(Some(2), Some(ByteSize::b(100)))]
#[case(Some(10), Some(ByteSize::b(30)))]
#[tokio::test]
async fn test_append_record_stream_batching(
#[case] max_batch_records: Option<usize>,
#[case] max_batch_size: Option<ByteSize>,
) {
let stream_iter = (0..100).map(|i| types::AppendRecord::new(format!("r_{i}")));
let stream = futures::stream::iter(stream_iter);

let mut opts = AppendRecordStreamOpts::new();
if let Some(max_batch_records) = max_batch_records {
opts = opts.with_max_batch_records(max_batch_records);
}
if let Some(max_batch_size) = max_batch_size {
opts = opts.with_max_batch_size(max_batch_size);
}

let batch_stream = AppendRecordStream::new(stream, opts).unwrap();

let batches = batch_stream
.map(|batch| batch.records)
.collect::<Vec<_>>()
.await;

let mut i = 0;
for batch in batches {
assert_eq!(batch.len(), 2);
for record in batch {
assert_eq!(record.body, format!("r_{i}").into_bytes());
i += 1;
}
}
}

#[tokio::test(start_paused = true)]
async fn test_append_record_stream_linger() {
let (stream_tx, stream_rx) = mpsc::unbounded_channel::<types::AppendRecord>();
let mut i = 0;

let collect_batches_handle = tokio::spawn(async move {
let batch_stream = AppendRecordStream::new(
UnboundedReceiverStream::new(stream_rx),
AppendRecordStreamOpts::new().with_linger(Duration::from_secs(2)),
)
.unwrap();

batch_stream
.map(|batch| {
batch
.records
.into_iter()
.map(|rec| rec.body)
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
.await
});

let mut send_next = || {
stream_tx
.send(types::AppendRecord::new(format!("r_{i}")))
.unwrap();
i += 1;
};

async fn sleep_secs(secs: u64) {
let dur = Duration::from_secs(secs) + Duration::from_millis(10);
tokio::time::sleep(dur).await;
}

send_next();
send_next();

sleep_secs(2).await;

send_next();

sleep_secs(1).await;

send_next();

sleep_secs(1).await;

send_next();
std::mem::drop(stream_tx); // Should close the stream

let batches = collect_batches_handle.await.unwrap();

let expected_batches = vec![
vec![b"r_0".to_owned(), b"r_1".to_owned()],
vec![b"r_2".to_owned(), b"r_3".to_owned()],
vec![b"r_4".to_owned()],
];

assert_eq!(batches, expected_batches);
}
}