Skip to content

Commit

Permalink
Issue #182: Fixing high cpu usage while waiting for next payment
Browse files Browse the repository at this point in the history
  • Loading branch information
bjohnson5 committed Apr 26, 2024
1 parent a574b1e commit 4b6873f
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,21 @@ impl Simulation {
self.run_data_collection(event_receiver, &mut tasks);

// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
let activities = self.activity_executors().await?;
let activities = match self.activity_executors().await {
Ok(a) => a,
Err(e) => {
// If we encounter an error while setting up the activity_executors,
// we need to shutdown and wait for tasks to finish. We have started background tasks in the
// run_data_collection function, so we should shut those down before returning.
self.shutdown();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
}
}
return Err(e);
},
};
let consumer_channels = self.dispatch_consumers(
activities
.iter()
Expand All @@ -572,8 +586,24 @@ impl Simulation {
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
let mut producer_tasks = JoinSet::new();
self.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
.await?;
match self
.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
.await
{
Ok(_) => {},
Err(e) => {
// If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
// We have started background tasks in the run_data_collection function,
// so we should shut those down before returning.
self.shutdown();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
}
}
return Err(e);
},
}

// Start a task that waits for the producers to finish.
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
Expand Down Expand Up @@ -1158,7 +1188,7 @@ async fn produce_simulation_results(
tokio::select! {
biased;
_ = listener.clone() => {
return Ok(())
break;
},
output = output_receiver.recv() => {
match output {
Expand All @@ -1184,21 +1214,17 @@ async fn produce_simulation_results(
},
None => return Ok(())
}
},
track_payment = set.join_next() => {
if let Some(res) = track_payment {
match res {
Ok(track_payment_res) => {
track_payment_res?
},
Err(_) => {
return Err(SimulationError::TaskError);
},
}
}
}
}
}

log::debug!("Simulation results producer exiting.");
while let Some(res) = set.join_next().await {
if let Err(e) = res {
log::error!("Simulation results producer task exited with error: {e}.");
}
}
Ok(())
}

async fn track_payment_result(
Expand Down

0 comments on commit 4b6873f

Please sign in to comment.