Skip to content

Commit

Permalink
Await termination of partition processors when shutting PPM down
Browse files Browse the repository at this point in the history
This commit makes the PPM await the termination of all currently running
partition processors when shutting down.
  • Loading branch information
tillrohrmann committed Nov 27, 2024
1 parent 926c104 commit 3fb9019
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ impl PartitionProcessorManager {
}

pub async fn run(mut self) -> anyhow::Result<()> {
let shutdown = cancellation_watcher();
tokio::pin!(shutdown);
let mut shutdown = std::pin::pin!(cancellation_watcher());

let (persisted_lsns_tx, persisted_lsns_rx) = watch::channel(BTreeMap::default());
self.persisted_lsns_rx = Some(persisted_lsns_rx);
Expand Down Expand Up @@ -275,14 +274,43 @@ impl PartitionProcessorManager {
}
}
_ = &mut shutdown => {
self.health_status.update(WorkerStatus::Unknown);
for task in self.snapshot_export_tasks.iter() {
task.cancel();
}
return Ok(());
break
}
}
}

self.shutdown().await;
Ok(())
}

async fn shutdown(&mut self) {
debug!("Shutting down partition processor manager.");

self.health_status.update(WorkerStatus::Unknown);

for task in self.snapshot_export_tasks.iter() {
task.cancel();
}

// stop all running processors
for processor_state in self.processor_states.values_mut() {
processor_state.stop();
}

// await that all running processors terminate
self.await_processors_termination().await;
}

async fn await_processors_termination(&mut self) {
while let Some(event) = self.asynchronous_operations.join_next().await {
let event = event.expect("asynchronous operations must not panic");
self.on_asynchronous_event(event);

if self.processor_states.is_empty() {
// all processors have terminated :-)
break;
}
}
}

fn on_partition_processor_rpc(
Expand Down

0 comments on commit 3fb9019

Please sign in to comment.