From 0caf68d5f402c404f525e24633602d52b3dab7c1 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Wed, 21 Aug 2024 20:53:51 +0300 Subject: [PATCH] fix: resolved events sending --- src/party.rs | 86 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/src/party.rs b/src/party.rs index ba5b16a..057c5d1 100644 --- a/src/party.rs +++ b/src/party.rs @@ -269,21 +269,17 @@ impl> Party { self.value_selector.select(&self.parties_voted_before) } - /// Start the next ballot. It's expected from the external system to re-run ballot protocol in - /// case of failed ballot. pub async fn launch_ballot(&mut self) -> Result, BallotError> { self.prepare_next_ballot(); - self.status = PartyStatus::Launched; let launch1a_timer = time::sleep(self.cfg.launch1a_timeout); let launch1b_timer = time::sleep(self.cfg.launch1b_timeout); let launch2a_timer = time::sleep(self.cfg.launch2a_timeout); let launch2av_timer = time::sleep(self.cfg.launch2av_timeout); - let launch2b_timer = time::sleep(self.cfg.launch2a_timeout); + let launch2b_timer = time::sleep(self.cfg.launch2b_timeout); let finalize_timer = time::sleep(self.cfg.finalize_timeout); - // Prevent the timers from firing immediately tokio::pin!( launch1a_timer, launch1b_timer, @@ -293,43 +289,56 @@ impl> Party { finalize_timer ); + let mut launch1a_fired = false; + let mut launch1b_fired = false; + let mut launch2a_fired = false; + let mut launch2av_fired = false; + let mut launch2b_fired = false; + let mut finalize_fired = false; + while self.is_launched() { tokio::select! { - _ = &mut launch1a_timer => { + _ = &mut launch1a_timer, if !launch1a_fired => { self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch1a event".into()) })?; + launch1a_fired = true; }, - _ = &mut launch1b_timer => { + _ = &mut launch1b_timer, if !launch1b_fired => { self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch1b event".into()) })?; + launch1b_fired = true; }, - _ = &mut launch2a_timer => { + _ = &mut launch2a_timer, if !launch2a_fired => { self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch2a event".into()) })?; + launch2a_fired = true; }, - _ = &mut launch2av_timer => { + _ = &mut launch2av_timer, if !launch2av_fired => { self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch2av event".into()) })?; + launch2av_fired = true; }, - _ = &mut launch2b_timer => { + _ = &mut launch2b_timer, if !launch2b_fired => { self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch2b event".into()) })?; + launch2b_fired = true; }, - _ = &mut finalize_timer => { + _ = &mut finalize_timer, if !finalize_fired => { self.event_sender.send(PartyEvent::Finalize).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Finalize event".into()) })?; + finalize_fired = true; }, msg_wire = self.msg_in_receiver.recv() => { if let Some(msg_wire) = msg_wire { @@ -337,10 +346,9 @@ impl> Party { self.status = PartyStatus::Failed; return Err(err); } - } else { - // Handle the case where the channel has been closed - self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Message channel closed".into())); + }else if self.msg_in_receiver.is_closed(){ + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("msg-in channel closed".into())); } }, event = self.event_receiver.recv() => { @@ -349,10 +357,9 @@ impl> Party { self.status = PartyStatus::Failed; return Err(err); } - } else { - // Handle the case where the channel has been closed + }else if self.event_receiver.is_closed(){ self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Event channel closed".into())); + return Err(BallotError::Communication("event receiver channel closed".into())); } }, } @@ -1066,48 +1073,55 @@ mod tests { } #[tokio::test] - #[ignore] // This test is unfinished, turning off temporarily. - async fn test_launch_ballot_timeouts() { + async fn test_launch_ballot_events() { // Pause the Tokio time so we can manipulate it time::pause(); // Set up the Party with necessary configuration - let cfg = default_config(); - - let mut party = - Party::::new(0, cfg.clone(), MockValueSelector).0; + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + launch1a_timeout: Duration::from_secs(10), + launch1b_timeout: Duration::from_secs(20), + launch2a_timeout: Duration::from_secs(30), + launch2av_timeout: Duration::from_secs(40), + launch2b_timeout: Duration::from_secs(50), + finalize_timeout: Duration::from_secs(60), + }; - // Create the event and message channels to substitute for testing. let (event_sender, mut event_receiver) = unbounded_channel(); + // Need to return all 3 values, so that they don't get dropped + // and associated channels don't get closed. + let (mut party, _msg_out_receiver, _msg_in_sender) = + Party::::new(0, cfg.clone(), MockValueSelector); + + // Same here, we would like to not lose party's event_receiver, so that test doesn't fail. + let _event_sender = party.event_sender; party.event_sender = event_sender; - // Note that we don't change party.event_receiver // Spawn the launch_ballot function in a separate task - let ballot_task = tokio::spawn(async move { + let _ballot_task = tokio::spawn(async move { party.launch_ballot().await.unwrap(); }); - // Fast-forward time and check that the correct event is sent after each interval + // Sequential time advance and event check time::advance(cfg.launch1a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a); - time::advance(cfg.launch1b_timeout).await; + time::advance(cfg.launch1b_timeout - cfg.launch1a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b); - time::advance(cfg.launch2a_timeout).await; + time::advance(cfg.launch2a_timeout - cfg.launch1b_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a); - time::advance(cfg.launch2av_timeout).await; + time::advance(cfg.launch2av_timeout - cfg.launch2a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av); - time::advance(cfg.launch2b_timeout).await; + time::advance(cfg.launch2b_timeout - cfg.launch2av_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b); - time::advance(cfg.finalize_timeout).await; + time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); - - // Ensure that the task completes successfully - ballot_task.await.unwrap(); } }