Skip to content

Commit

Permalink
Added unit test for batching
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed May 1, 2024
1 parent af3b099 commit 54998fd
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 15 deletions.
20 changes: 15 additions & 5 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,24 @@ impl Chitchat {
key_prefix: impl ToString,
callback: impl Fn(KeyChangeEventRef) + 'static + Send + Sync,
) -> ListenerHandle {
self.cluster_state()
.listeners
.subscribe_events(key_prefix, move |events| {
self.subscribe_event_batch(key_prefix, move |events| {
for event in events {
callback(*event)
}
// callback
})
})
}

/// Same a `subscribe_event` but receives events in batch upon
/// a `reset_node` or the reception of a chitchat update.
#[must_use]
pub fn subscribe_event_batch(
&self,
key_prefix: impl ToString,
callback: impl Fn(&[KeyChangeEventRef]) + 'static + Send + Sync,
) -> ListenerHandle {
self.cluster_state()
.listeners
.subscribe(key_prefix, callback)
}
}

Expand Down
18 changes: 9 additions & 9 deletions chitchat/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ pub(crate) struct Listeners {

impl Listeners {
#[must_use]
pub(crate) fn subscribe_events(
pub(crate) fn subscribe(
&self,
key_prefix: impl ToString,
callback: impl Fn(&[KeyChangeEventRef]) + 'static + Send + Sync,
) -> ListenerHandle {
let key_prefix = key_prefix.to_string();
let boxed_listener = Box::new(callback);
self.subscribe_event_for_ligher_monomorphization(key_prefix, boxed_listener)
self.subscribe_events_for_ligher_monomorphization(key_prefix, boxed_listener)
}

fn subscribe_event_for_ligher_monomorphization(
fn subscribe_events_for_ligher_monomorphization(
&self,
key_prefix: String,
boxed_listener: BoxedListener,
Expand All @@ -67,7 +67,7 @@ impl Listeners {
inner_listener_guard
.callbacks
.insert(new_idx, callback_entry);
inner_listener_guard.subscribe_event(&key_prefix, new_idx);
inner_listener_guard.subscribe_events(&key_prefix, new_idx);
ListenerHandle {
listener_id: new_idx,
listeners: weak_listeners,
Expand Down Expand Up @@ -107,7 +107,7 @@ struct InnerListeners {

impl InnerListeners {
// We don't inline this to make sure monomorphization generates as little code as possible.
fn subscribe_event(&mut self, key_prefix: &str, idx: CallbackId) {
fn subscribe_events(&mut self, key_prefix: &str, idx: CallbackId) {
self.listeners
.entry(key_prefix.to_string())
.or_default()
Expand Down Expand Up @@ -239,7 +239,7 @@ mod tests {
let listeners = Listeners::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let handle = listeners.subscribe_events("prefix:", move |events| {
let handle = listeners.subscribe("prefix:", move |events| {
assert_eq!(events.len(), 1);
let key_change_event = events[0];
assert_eq!(key_change_event.key, "strippedprefix");
Expand Down Expand Up @@ -270,7 +270,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
listeners
.subscribe_events("", move |events| {
.subscribe("", move |events| {
assert_eq!(events.len(), 1);
let key_change_event = &events[0];
assert_eq!(key_change_event.key, "prefix:strippedprefix");
Expand All @@ -294,7 +294,7 @@ mod tests {
let listeners = Listeners::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let handle = listeners.subscribe_events("prefix:", move |evts| {
let handle = listeners.subscribe("prefix:", move |evts| {
assert_eq!(evts.len(), 1);
let evt = evts[0];
assert_eq!(evt.key, "strippedprefix");
Expand Down Expand Up @@ -326,7 +326,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
listeners
.subscribe_events(prefix, move |events| {
.subscribe(prefix, move |events| {
assert_eq!(events.len(), 1);
counter_clone.fetch_add(events.len(), Ordering::Relaxed);
})
Expand Down
56 changes: 55 additions & 1 deletion chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,60 @@ mod tests {
assert_eq!(versioned_b.value, "val_b2");
}

#[tokio::test]
async fn test_node_apply_delta_batches_events() {
use std::fmt::Write as _;
let mut cluster_state = ClusterState::default();
let (events_tx, mut events_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let _listener = cluster_state.listeners.subscribe("key", move |events| {
let mut events_str: String = String::new();
for evt in events {
write!(&mut events_str, "{}={},", evt.key, evt.value).unwrap();
}
events_tx.send(events_str).unwrap();
});
let node1 = ChitchatId::for_local_test(10_001);
let node2 = ChitchatId::for_local_test(10_002);
cluster_state.node_state_mut(&node1);

let mut delta_serializer = DeltaSerializer::with_mtu(100_000);
delta_serializer.try_add_node(node1.clone(), 0, 0);
delta_serializer.try_add_kv("key", VersionedValue {
value: "value".to_string(),
version: 1,
status: DeletionStatus::Set,
});
delta_serializer.try_add_kv("key1", VersionedValue {
value: "value1".to_string(),
version: 2,
status: DeletionStatus::Set,
});
delta_serializer.try_add_kv("key2", VersionedValue {
value: "deleted".to_string(),
version: 3,
status: DeletionStatus::Deleted(Instant::now()),
});
delta_serializer.try_add_kv("key3", VersionedValue {
value: "value3".to_string(),
version: 4,
status: DeletionStatus::DeleteAfterTtl(Instant::now()),
});
// we add another node to make sure we are batching events across nodes.
delta_serializer.try_add_node(node2.clone(), 0, 0);
delta_serializer.try_add_kv("key3", VersionedValue {
value: "value3".to_string(),
version: 1,
status: DeletionStatus::DeleteAfterTtl(Instant::now()),
});
let delta: Delta = delta_serializer.finish();
cluster_state.apply_delta(delta);

let event = events_rx.recv().await.unwrap();
assert!(events_rx.try_recv().is_err());

assert_eq!(&event, "=value,1=value1,3=value3,3=value3,");
}

#[test]
fn test_node_set_delete() {
let mut cluster_state = ClusterState::default();
Expand Down Expand Up @@ -1894,7 +1948,7 @@ mod tests {
async fn test_listener_batch() {
let mut cluster_state = ClusterState::default();
let (key_change_tx, mut key_change_rx) = tokio::sync::mpsc::unbounded_channel();
let listen_handle = cluster_state.listeners.subscribe_events(
let listen_handle = cluster_state.listeners.subscribe(
"prefix",
move |key_changes: &[KeyChangeEventRef]| {
for key_change in key_changes {
Expand Down

0 comments on commit 54998fd

Please sign in to comment.