Skip to content

Commit

Permalink
Expose live nodes watch::Receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 23, 2024
1 parent fcdd0d0 commit a99d609
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
18 changes: 11 additions & 7 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ impl Chitchat {

/// Trigggers the catchup callback if the self node is included in the nodes to reset.
fn maybe_trigger_catchup_callback(&self, delta: &Delta) {
if let Some(catchup_callback) = &self.config.catchup_callback {
if delta.nodes_to_reset.contains(self.self_chitchat_id()) {
if !delta.nodes_to_reset.is_empty() {
if let Some(catchup_callback) = &self.config.catchup_callback {
info!("triggering catchup callback");
catchup_callback();
}
Expand Down Expand Up @@ -257,10 +257,14 @@ impl Chitchat {
/// - updates its max version
///
/// Heartbeats are not notified.
pub fn live_nodes_watcher(&self) -> WatchStream<BTreeMap<ChitchatId, NodeState>> {
pub fn live_nodes_watch_stream(&self) -> WatchStream<BTreeMap<ChitchatId, NodeState>> {
WatchStream::new(self.live_nodes_watcher_rx.clone())
}

pub fn live_nodes_watcher(&self) -> watch::Receiver<BTreeMap<ChitchatId, NodeState>> {
self.live_nodes_watcher_rx.clone()
}

/// Returns the set of nodes considered dead by the failure detector.
pub fn dead_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
self.failure_detector.dead_nodes()
Expand Down Expand Up @@ -475,7 +479,7 @@ mod tests {
chitchat
.lock()
.await
.live_nodes_watcher()
.live_nodes_watch_stream()
.skip_while(|live_nodes| {
if live_nodes.len() != expected_nodes.len() {
return true;
Expand Down Expand Up @@ -530,7 +534,7 @@ mod tests {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let nodes = setup_nodes(20001..=20005, &transport).await;
let node2 = nodes.get(1).unwrap();
let mut live_rx = node2.chitchat().lock().await.live_nodes_watcher();
let mut live_rx = node2.chitchat().lock().await.live_nodes_watch_stream();
let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 5 {
Expand Down Expand Up @@ -574,7 +578,7 @@ mod tests {

let mut num_nodes = 0;
assert!(tokio::time::timeout(Duration::from_secs(1), async {
let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher();
let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watch_stream();
loop {
let live_members = live_rx.next().await.unwrap();
num_nodes = live_members.len();
Expand Down Expand Up @@ -606,7 +610,7 @@ mod tests {
.self_node_state()
.set("READY", "true");

let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher();
let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watch_stream();
let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 3 {
Expand Down
2 changes: 1 addition & 1 deletion chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ mod tests {
.chitchat()
.lock()
.await
.live_nodes_watcher()
.live_nodes_watch_stream()
.skip_while(|live_nodes| live_nodes.is_empty());

{
Expand Down
2 changes: 1 addition & 1 deletion chitchat/tests/perf_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn wait_until<P: Fn(&BTreeMap<ChitchatId, NodeState>) -> bool>(
predicate: P,
) -> Duration {
let start = Instant::now();
let mut node_watcher = handle.chitchat().lock().await.live_nodes_watcher();
let mut node_watcher = handle.chitchat().lock().await.live_nodes_watch_stream();
while let Some(nodes) = node_watcher.next().await {
if predicate(&nodes) {
break;
Expand Down

0 comments on commit a99d609

Please sign in to comment.