Skip to content

Commit

Permalink
refactor: on_abort hook returns no error (#205)
Browse files Browse the repository at this point in the history
The purpose of the `abort` method is to forcefully stop the execution of a
node. We cannot recover from an error there, hence there is no use in allowing
returning one.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet authored Mar 6, 2024
1 parent b59d29d commit dafec0e
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 18 deletions.
4 changes: 1 addition & 3 deletions zenoh-flow-nodes/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ pub trait Node: Send + Sync {
Ok(())
}

async fn on_abort(&self) -> Result<()> {
Ok(())
}
async fn on_abort(&self) {}
}

/// The `Source` trait represents a Source of data in Zenoh Flow. Sources only possess `Outputs` and
Expand Down
5 changes: 2 additions & 3 deletions zenoh-flow-runtime/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,13 @@ impl DataFlowInstance {
Ok(())
}

pub async fn abort(&mut self) -> Result<()> {
pub async fn abort(&mut self) {
for (node_id, runner) in self.runners.iter_mut() {
runner.abort().await?;
runner.abort().await;
tracing::trace!("Aborted node < {} >", node_id);
}

self.state = InstanceState::Aborted;
Ok(())
}

pub fn state(&self) -> &InstanceState {
Expand Down
4 changes: 1 addition & 3 deletions zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,9 @@ Caused by:
// This action is motivated by two factors:
// 1. It prevents receiving publications that happened while the Zenoh Source was not active.
// 2. It prevents impacting the other subscribers / publishers on the same resource.
async fn on_abort(&self) -> Result<()> {
async fn on_abort(&self) {
let mut subscribers = self.subscribers.lock().await;
subscribers.clear();

Ok(())
}

// The iteration of a Zenoh Source polls, concurrently, the subscribers and forwards the first publication received
Expand Down
9 changes: 2 additions & 7 deletions zenoh-flow-runtime/src/runners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,10 @@ impl Runner {
/// What this also means is that there is a possibility to leave the node in an **inconsistent state**. For
/// instance, modified values that are not saved between several `.await` points would be lost if the node is
/// aborted.
pub(crate) async fn abort(&mut self) -> Result<()> {
pub(crate) async fn abort(&mut self) {
if let Some(handle) = self.handle.take() {
handle.cancel().await;
self.node
.on_abort()
.await
.with_context(|| format!("{}: call to `on_abort` failed", self.id))?;
self.node.on_abort().await;
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions zenoh-flow-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl Runtime {

let mut instance_guard = instance.write().await;

instance_guard.abort().await?;
instance_guard.abort().await;

tracing::info!("aborted");

Expand Down Expand Up @@ -226,7 +226,7 @@ impl Runtime {
}
};

instance.abort().await?;
instance.abort().await;

drop(instance); // Forcefully drop the instance so we can check if we can free up some Libraries.
self.loader.lock().await.remove_unused_libraries();
Expand Down

0 comments on commit dafec0e

Please sign in to comment.