From 6955475a23ed03d62e8aeaf546a6b42706e39db1 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Fri, 8 Nov 2024 14:37:59 +0530 Subject: [PATCH 1/6] feat: Implement `linger_time` for `AppendRecordStream` Resolves: #37 Signed-off-by: Vaibhav Rabber --- Cargo.toml | 3 ++- src/streams.rs | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8762e37..88e395f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,13 +19,14 @@ secrecy = "0.8.0" serde = { version = "1.0.214", optional = true, features = ["derive"] } sync_docs = { path = "sync_docs" } thiserror = "1.0.67" +tokio = { version = "1.41.1", features = ["time"] } tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] } [build-dependencies] tonic-build = { version = "0.12.3", features = ["prost"] } [dev-dependencies] -tokio = { version = "*", features = ["full"] } +tokio = { version = "1.41.1", features = ["full"] } [features] serde = ["dep:serde"] diff --git a/src/streams.rs b/src/streams.rs index 2656491..ffde7d2 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -1,10 +1,12 @@ use std::{ pin::Pin, task::{Context, Poll}, + time::Duration, }; use bytesize::ByteSize; use futures::{Stream, StreamExt}; +use tokio::time::{Interval, MissedTickBehavior}; use crate::types::{self, MeteredSize as _}; @@ -21,6 +23,8 @@ pub struct AppendRecordStreamOpts { pub match_seq_num: Option, /// Enforce a fencing token. pub fencing_token: Option>, + /// Linger time for ready records to send together as a batch. + pub linger_time: Option, } impl Default for AppendRecordStreamOpts { @@ -30,6 +34,7 @@ impl Default for AppendRecordStreamOpts { max_batch_size: ByteSize::mib(1), match_seq_num: None, fencing_token: None, + linger_time: None, } } } @@ -71,6 +76,14 @@ impl AppendRecordStreamOpts { ..self } } + + /// Construct from existing options with the linger time. + pub fn with_linger_time(self, linger_time: impl Into) -> Self { + Self { + linger_time: Some(linger_time.into()), + ..self + } + } } #[derive(Debug, thiserror::Error)] @@ -89,6 +102,7 @@ where stream: S, peeked_record: Option, terminated: bool, + linger_interval: Option, opts: AppendRecordStreamOpts, } @@ -103,10 +117,17 @@ where return Err(AppendRecordStreamError::BatchSizeTooLarge); } + let linger_interval = opts.linger_time.map(|duration| { + let mut int = tokio::time::interval(duration); + int.set_missed_tick_behavior(MissedTickBehavior::Delay); + int + }); + Ok(Self { stream, peeked_record: None, terminated: false, + linger_interval, opts, }) } @@ -140,6 +161,14 @@ where return Poll::Ready(None); } + if self + .linger_interval + .as_mut() + .is_some_and(|int| int.poll_tick(cx).is_pending()) + { + return Poll::Pending; + } + let mut batch = Vec::with_capacity(self.opts.max_batch_records); let mut batch_size = ByteSize::b(0); @@ -169,6 +198,9 @@ where if self.terminated { Poll::Ready(None) } else { + // Since we don't have any batches to send, reset the linger + // interval so it ticks immediately the next time. + self.linger_interval.as_mut().map(Interval::reset); Poll::Pending } } else { From 1ac3dfaa5b89f90d716262a20080bc76700d9dd7 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Fri, 8 Nov 2024 15:10:11 +0530 Subject: [PATCH 2/6] ignore linger Signed-off-by: Vaibhav Rabber --- src/streams.rs | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/src/streams.rs b/src/streams.rs index ffde7d2..057ebaf 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -84,6 +84,14 @@ impl AppendRecordStreamOpts { ..self } } + + fn new_linger_interval(&self) -> Option { + self.linger_time.map(|duration| { + let mut int = tokio::time::interval(duration); + int.set_missed_tick_behavior(MissedTickBehavior::Delay); + int + }) + } } #[derive(Debug, thiserror::Error)] @@ -103,6 +111,7 @@ where peeked_record: Option, terminated: bool, linger_interval: Option, + ignore_linger: bool, opts: AppendRecordStreamOpts, } @@ -117,17 +126,12 @@ where return Err(AppendRecordStreamError::BatchSizeTooLarge); } - let linger_interval = opts.linger_time.map(|duration| { - let mut int = tokio::time::interval(duration); - int.set_missed_tick_behavior(MissedTickBehavior::Delay); - int - }); - Ok(Self { stream, peeked_record: None, terminated: false, - linger_interval, + linger_interval: opts.new_linger_interval(), + ignore_linger: false, opts, }) } @@ -161,12 +165,16 @@ where return Poll::Ready(None); } - if self - .linger_interval - .as_mut() - .is_some_and(|int| int.poll_tick(cx).is_pending()) - { - return Poll::Pending; + if self.ignore_linger { + self.ignore_linger = false; + } else { + if self + .linger_interval + .as_mut() + .is_some_and(|int| int.poll_tick(cx).is_pending()) + { + return Poll::Pending; + } } let mut batch = Vec::with_capacity(self.opts.max_batch_records); @@ -198,9 +206,9 @@ where if self.terminated { Poll::Ready(None) } else { - // Since we don't have any batches to send, reset the linger - // interval so it ticks immediately the next time. - self.linger_interval.as_mut().map(Interval::reset); + // Since we don't have any batches to send, we want to ignore the linger + // interval for the next poll. + self.ignore_linger = true; Poll::Pending } } else { From eec927f00a7e33d25b4efb0e70b895ee954033a3 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Fri, 8 Nov 2024 15:15:58 +0530 Subject: [PATCH 3/6] clippy Signed-off-by: Vaibhav Rabber --- src/streams.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/streams.rs b/src/streams.rs index 057ebaf..562f5c1 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -165,16 +165,19 @@ where return Poll::Ready(None); } + // Only poll the interval if it shouldn't be ignored, else reset the + // flag and we can decide if we want to ignore later. This works since + // the linger interval has `MissedTickBehaviour::Delay`. So the moment + // we poll and it's ready, we'll get the next tick after the specified + // duration period as `linger_time`. if self.ignore_linger { self.ignore_linger = false; - } else { - if self - .linger_interval - .as_mut() - .is_some_and(|int| int.poll_tick(cx).is_pending()) - { - return Poll::Pending; - } + } else if self + .linger_interval + .as_mut() + .is_some_and(|int| int.poll_tick(cx).is_pending()) + { + return Poll::Pending; } let mut batch = Vec::with_capacity(self.opts.max_batch_records); From 982b3c9224b260f2a774cfd17f151795a9e0c6fe Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 12 Nov 2024 16:28:23 +0530 Subject: [PATCH 4/6] .. --- src/streams.rs | 47 +++++++++++++++++++---------------------------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/src/streams.rs b/src/streams.rs index 562f5c1..ceb827e 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -5,8 +5,8 @@ use std::{ }; use bytesize::ByteSize; -use futures::{Stream, StreamExt}; -use tokio::time::{Interval, MissedTickBehavior}; +use futures::{FutureExt, Stream, StreamExt}; +use tokio::time::Sleep; use crate::types::{self, MeteredSize as _}; @@ -23,8 +23,8 @@ pub struct AppendRecordStreamOpts { pub match_seq_num: Option, /// Enforce a fencing token. pub fencing_token: Option>, - /// Linger time for ready records to send together as a batch. - pub linger_time: Option, + /// Linger duration for ready records to send together as a batch. + pub linger: Option, } impl Default for AppendRecordStreamOpts { @@ -34,7 +34,7 @@ impl Default for AppendRecordStreamOpts { max_batch_size: ByteSize::mib(1), match_seq_num: None, fencing_token: None, - linger_time: None, + linger: None, } } } @@ -78,19 +78,16 @@ impl AppendRecordStreamOpts { } /// Construct from existing options with the linger time. - pub fn with_linger_time(self, linger_time: impl Into) -> Self { + pub fn with_linger(self, linger_duration: impl Into) -> Self { Self { - linger_time: Some(linger_time.into()), + linger: Some(linger_duration.into()), ..self } } - fn new_linger_interval(&self) -> Option { - self.linger_time.map(|duration| { - let mut int = tokio::time::interval(duration); - int.set_missed_tick_behavior(MissedTickBehavior::Delay); - int - }) + fn linger_sleep_fut(&self) -> Option>> { + self.linger + .map(|duration| Box::pin(tokio::time::sleep(duration))) } } @@ -110,8 +107,7 @@ where stream: S, peeked_record: Option, terminated: bool, - linger_interval: Option, - ignore_linger: bool, + linger_sleep: Option>>, opts: AppendRecordStreamOpts, } @@ -130,8 +126,7 @@ where stream, peeked_record: None, terminated: false, - linger_interval: opts.new_linger_interval(), - ignore_linger: false, + linger_sleep: opts.linger_sleep_fut(), opts, }) } @@ -165,17 +160,10 @@ where return Poll::Ready(None); } - // Only poll the interval if it shouldn't be ignored, else reset the - // flag and we can decide if we want to ignore later. This works since - // the linger interval has `MissedTickBehaviour::Delay`. So the moment - // we poll and it's ready, we'll get the next tick after the specified - // duration period as `linger_time`. - if self.ignore_linger { - self.ignore_linger = false; - } else if self - .linger_interval + if self + .linger_sleep .as_mut() - .is_some_and(|int| int.poll_tick(cx).is_pending()) + .is_some_and(|fut| fut.poll_unpin(cx).is_pending()) { return Poll::Pending; } @@ -211,7 +199,7 @@ where } else { // Since we don't have any batches to send, we want to ignore the linger // interval for the next poll. - self.ignore_linger = true; + self.linger_sleep = None; Poll::Pending } } else { @@ -225,6 +213,9 @@ where *m += batch.len() as u64 } + // Reset the linger sleep future since the old one is polled ready. + self.linger_sleep = self.opts.linger_sleep_fut(); + Poll::Ready(Some(types::AppendInput { records: batch, match_seq_num, From b12765a4d1661cdef8900ad8afefcc745d63e9fb Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 12 Nov 2024 18:49:54 +0530 Subject: [PATCH 5/6] test Signed-off-by: Vaibhav Rabber --- Cargo.toml | 4 +- src/streams.rs | 119 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 88e395f..1abd709 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,9 @@ tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] } tonic-build = { version = "0.12.3", features = ["prost"] } [dev-dependencies] -tokio = { version = "1.41.1", features = ["full"] } +rstest = "0.23.0" +tokio = { version = "1.41.1", features = ["full", "test-util"] } +tokio-stream = "0.1.16" [features] serde = ["dep:serde"] diff --git a/src/streams.rs b/src/streams.rs index ceb827e..bd6d347 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -46,7 +46,7 @@ impl AppendRecordStreamOpts { } /// Construct from existing options with the new maximum batch records. - pub fn with_max_batch_records(self, max_batch_records: impl Into) -> Self { + pub fn with_max_batch_records(self, max_batch_records: usize) -> Self { Self { max_batch_records: max_batch_records.into(), ..self @@ -224,3 +224,120 @@ where } } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use bytesize::ByteSize; + use futures::StreamExt as _; + use rstest::rstest; + use tokio::sync::mpsc; + use tokio_stream::wrappers::UnboundedReceiverStream; + + use crate::{ + streams::{AppendRecordStream, AppendRecordStreamOpts}, + types, + }; + + #[rstest] + #[case(Some(2), None)] + #[case(None, Some(ByteSize::b(30)))] + #[case(Some(2), Some(ByteSize::b(100)))] + #[case(Some(10), Some(ByteSize::b(30)))] + #[tokio::test] + async fn test_append_record_stream_batching( + #[case] max_batch_records: Option, + #[case] max_batch_size: Option, + ) { + let stream_iter = (0..100).map(|i| types::AppendRecord::new(format!("r_{i}"))); + let stream = futures::stream::iter(stream_iter); + + let mut opts = AppendRecordStreamOpts::new(); + if let Some(max_batch_records) = max_batch_records { + opts = opts.with_max_batch_records(max_batch_records); + } + if let Some(max_batch_size) = max_batch_size { + opts = opts.with_max_batch_size(max_batch_size); + } + + let batch_stream = AppendRecordStream::new(stream, opts).unwrap(); + + let batches = batch_stream + .map(|batch| batch.records) + .collect::>() + .await; + + let mut i = 0; + for batch in batches { + assert_eq!(batch.len(), 2); + for record in batch { + assert_eq!(record.body, format!("r_{i}").into_bytes()); + i += 1; + } + } + } + + #[tokio::test(start_paused = true)] + async fn test_append_record_stream_linger() { + let (stream_tx, stream_rx) = mpsc::unbounded_channel::(); + let mut i = 0; + + let collect_batches_handle = tokio::spawn(async move { + let batch_stream = AppendRecordStream::new( + UnboundedReceiverStream::new(stream_rx), + AppendRecordStreamOpts::new().with_linger(Duration::from_secs(2)), + ) + .unwrap(); + + batch_stream + .map(|batch| { + batch + .records + .into_iter() + .map(|rec| rec.body) + .collect::>() + }) + .collect::>() + .await + }); + + let mut send_next = || { + stream_tx + .send(types::AppendRecord::new(format!("r_{i}"))) + .unwrap(); + i += 1; + }; + + async fn sleep_secs(secs: u64) { + let dur = Duration::from_secs(secs) + Duration::from_millis(10); + tokio::time::sleep(dur).await; + } + + send_next(); + send_next(); + + sleep_secs(2).await; + + send_next(); + + sleep_secs(1).await; + + send_next(); + + sleep_secs(1).await; + + send_next(); + std::mem::drop(stream_tx); // Should close the stream + + let batches = collect_batches_handle.await.unwrap(); + + let expected_batches = vec![ + vec![b"r_0".to_owned(), b"r_1".to_owned()], + vec![b"r_2".to_owned(), b"r_3".to_owned()], + vec![b"r_4".to_owned()], + ]; + + assert_eq!(batches, expected_batches); + } +} From 2c25cd0bddbc457f677cb5f0324ff867d90d889c Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Tue, 12 Nov 2024 18:50:55 +0530 Subject: [PATCH 6/6] clippy Signed-off-by: Vaibhav Rabber --- src/streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/streams.rs b/src/streams.rs index bd6d347..0358e80 100644 --- a/src/streams.rs +++ b/src/streams.rs @@ -48,7 +48,7 @@ impl AppendRecordStreamOpts { /// Construct from existing options with the new maximum batch records. pub fn with_max_batch_records(self, max_batch_records: usize) -> Self { Self { - max_batch_records: max_batch_records.into(), + max_batch_records, ..self } }