Skip to content

Commit

Permalink
refactor(storage-manager): remove unnecessary wait/retry policy
Browse files Browse the repository at this point in the history
It does not bring anything to wait and retry on error when attempting to
declare a Queryable or a Subscriber: either the Session is established
and these operations will succeed or the Session is no longer existing
in which case we should terminate.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - The `MAX_RETRY` and `WAIT_PERIODS_SECS` constants are no longer
    needed.
  - Removed the wait/retry loops when creating the Digest Subscriber and
    Aligner Queryable.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 26, 2024
1 parent f1b956d commit 049fdd8
Showing 1 changed file with 23 additions and 51 deletions.
74 changes: 23 additions & 51 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ use zenoh::{
};
use zenoh_backend_traits::Storage;

use super::{
digest::Digest,
log::LogLatest,
service::{MAX_RETRY, WAIT_PERIOD_SECS},
};
use super::{digest::Digest, log::LogLatest};
use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates};

kedefine!(
Expand Down Expand Up @@ -253,34 +249,22 @@ impl Replication {
}
};

let mut retry = 0;
let subscriber = loop {
match replication
let subscriber = match replication
.zenoh_session
.declare_subscriber(&digest_key_sub)
// NOTE: We need to explicitly set the locality to `Remote` as otherwise the
// Digest subscriber will also receive the Digest published by its own
// Digest publisher.
.allowed_origin(Locality::Remote)
.await
{
Ok(subscriber) => break subscriber,
Err(e) => {
if retry < MAX_RETRY {
retry += 1;
tracing::warn!(
"Failed to declare Digest subscriber: {e:?}. Attempt \
{retry}/{MAX_RETRY}."
);
tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await;
} else {
tracing::error!(
"Could not declare Digest subscriber. The storage will not \
receive the Replication Digest of other replicas."
);
return;
}
}
{
Ok(subscriber) => subscriber,
Err(e) => {
tracing::error!(
"Could not declare Digest subscriber: {e:?}. The storage will not receive \
the Replication Digest of other replicas."
);
return;
}
};

Expand Down Expand Up @@ -397,31 +381,19 @@ impl Replication {
}
};

let mut retry = 0;
let queryable = loop {
match replication
.zenoh_session
.declare_queryable(&aligner_ke)
.allowed_origin(Locality::Remote)
.await
{
Ok(queryable) => break queryable,
Err(e) => {
if retry < MAX_RETRY {
retry += 1;
tracing::warn!(
"Failed to declare the Aligner queryable: {e:?}. Attempt \
{retry}/{MAX_RETRY}."
);
tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await;
} else {
tracing::error!(
"Could not declare the Aligner queryable. This storage will NOT \
align with other replicas."
);
return;
}
}
let queryable = match replication
.zenoh_session
.declare_queryable(&aligner_ke)
.allowed_origin(Locality::Remote)
.await
{
Ok(queryable) => queryable,
Err(e) => {
tracing::error!(
"Could not declare the Aligner queryable: {e:?}. This storage will NOT \
align with other replicas."
);
return;
}
};

Expand Down

0 comments on commit 049fdd8

Please sign in to comment.