Skip to content

Commit

Permalink
Merge pull request #183 from bjohnson5/182-high-cpu-fix
Browse files Browse the repository at this point in the history
Issue #182 Fixes a problematic loop in produce_simulation_results
  • Loading branch information
carlaKC authored Apr 29, 2024
2 parents a574b1e + 489e497 commit 5838d4a
Showing 1 changed file with 47 additions and 20 deletions.
67 changes: 47 additions & 20 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,21 @@ impl Simulation {
self.run_data_collection(event_receiver, &mut tasks);

// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
let activities = self.activity_executors().await?;
let activities = match self.activity_executors().await {
Ok(a) => a,
Err(e) => {
// If we encounter an error while setting up the activity_executors,
// we need to shutdown and wait for tasks to finish. We have started background tasks in the
// run_data_collection function, so we should shut those down before returning.
self.shutdown();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
}
}
return Err(e);
},
};
let consumer_channels = self.dispatch_consumers(
activities
.iter()
Expand All @@ -572,8 +586,24 @@ impl Simulation {
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
let mut producer_tasks = JoinSet::new();
self.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
.await?;
match self
.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
.await
{
Ok(_) => {},
Err(e) => {
// If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish.
// We have started background tasks in the run_data_collection function,
// so we should shut those down before returning.
self.shutdown();
while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
log::error!("Task exited with error: {e}.");
}
}
return Err(e);
},
}

// Start a task that waits for the producers to finish.
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
Expand Down Expand Up @@ -1154,11 +1184,11 @@ async fn produce_simulation_results(
) -> Result<(), SimulationError> {
let mut set = tokio::task::JoinSet::new();

loop {
let result = loop {
tokio::select! {
biased;
_ = listener.clone() => {
return Ok(())
break Ok(())
},
output = output_receiver.recv() => {
match output {
Expand All @@ -1170,35 +1200,32 @@ async fn produce_simulation_results(
source_node.clone(), results.clone(), payment, listener.clone()
));
} else {
return Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
break Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
}
},
SimulationOutput::SendPaymentFailure(payment, result) => {
if results.send((payment, result.clone())).await.is_err() {
return Err(SimulationError::MpscChannelError(
break Err(SimulationError::MpscChannelError(
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time),
));
}
}
};
},
None => return Ok(())
}
},
track_payment = set.join_next() => {
if let Some(res) = track_payment {
match res {
Ok(track_payment_res) => {
track_payment_res?
},
Err(_) => {
return Err(SimulationError::TaskError);
},
}
None => break Ok(())
}
}
}
};

log::debug!("Simulation results producer exiting.");
while let Some(res) = set.join_next().await {
if let Err(e) = res {
log::error!("Simulation results producer task exited with error: {e}.");
}
}

result
}

async fn track_payment_result(
Expand Down

0 comments on commit 5838d4a

Please sign in to comment.