diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs index e805bda26..b957fc50f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs @@ -18,18 +18,20 @@ use std::{ }; use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; use zenoh::{ bytes::ZBytes, internal::Value, - key_expr::OwnedKeyExpr, + key_expr::{format::keformat, OwnedKeyExpr}, query::{ConsolidationMode, Query, Selector}, sample::{Sample, SampleKind}, + session::ZenohId, }; use zenoh_backend_traits::StorageInsertionResult; use super::{ classification::{IntervalIdx, SubIntervalIdx}, - core::Replication, + core::{aligner_key_expr_formatter, Replication}, digest::{DigestDiff, Fingerprint}, log::EventMetadata, }; @@ -51,6 +53,8 @@ use super::{ /// hence directly skipping to the `SubIntervals` variant. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentQuery { + Discovery, + All, Diff(DigestDiff), Intervals(HashSet), SubIntervals(HashMap>), @@ -67,6 +71,7 @@ pub(crate) enum AlignmentQuery { /// Not all replies are made, it depends on the Era when a misalignment was detected. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentReply { + Discovery(ZenohId), Intervals(HashMap), SubIntervals(HashMap>), Events(Vec), @@ -101,6 +106,47 @@ impl Replication { }; match alignment_query { + AlignmentQuery::Discovery => { + tracing::trace!("Processing `AlignmentQuery::Discovery`"); + reply_to_query( + &query, + AlignmentReply::Discovery(self.zenoh_session.zid()), + None, + ) + .await; + } + AlignmentQuery::All => { + tracing::trace!("Processing `AlignmentQuery::All`"); + + let idx_intervals = self + .replication_log + .read() + .await + .intervals + .keys() + .copied() + .collect::>(); + + for interval_idx in idx_intervals { + let mut events_to_send = Vec::default(); + if let Some(interval) = self + .replication_log + .read() + .await + .intervals + .get(&interval_idx) + { + interval.sub_intervals.values().for_each(|sub_interval| { + events_to_send.extend(sub_interval.events.values().map(Into::into)); + }); + } + + // NOTE: As we took the lock in the `if let` block, it is released here, + // diminishing contention. + + self.reply_events(&query, events_to_send).await; + } + } AlignmentQuery::Diff(digest_diff) => { tracing::trace!("Processing `AlignmentQuery::Diff`"); if digest_diff.cold_eras_differ { @@ -262,6 +308,11 @@ impl Replication { /// is the reason why we need the consolidation to set to be `None` (⚠️). pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec) { for event_metadata in events_to_retrieve { + if event_metadata.action == SampleKind::Delete { + reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await; + continue; + } + let stored_data = { let mut storage = self.storage.lock().await; match storage.get(event_metadata.stripped_key.clone(), "").await { @@ -323,7 +374,7 @@ impl Replication { &self, replica_aligner_ke: OwnedKeyExpr, alignment_query: AlignmentQuery, - ) { + ) -> JoinHandle<()> { let replication = self.clone(); tokio::task::spawn(async move { let attachment = match bincode::serialize(&alignment_query) { @@ -334,17 +385,29 @@ impl Replication { } }; + // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies are + // sent, they will be "consolidated" and only one of them will make it through. + // + // When we retrieve Samples from a Replica, each Sample is sent in a separate + // reply. Hence the need to have no consolidation. + let mut consolidation = ConsolidationMode::None; + + if matches!(alignment_query, AlignmentQuery::Discovery) { + // NOTE: `Monotonic` means that Zenoh will forward the first answer it receives (and + // ensure that later answers are with a higher timestamp — we do not care + // about that last aspect). + // + // By setting the consolidation to this value when performing the initial + // alignment, we select the most reactive Replica (hopefully the closest as + // well). + consolidation = ConsolidationMode::Monotonic; + } + match replication .zenoh_session .get(Into::::into(replica_aligner_ke.clone())) .attachment(attachment) - // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies - // are sent, they will be "consolidated" and only one of them will make it - // through. - // - // When we retrieve Samples from a Replica, each Sample is sent in a separate - // reply. Hence the need to have no consolidation. - .consolidation(ConsolidationMode::None) + .consolidation(consolidation) .await { Err(e) => { @@ -387,10 +450,17 @@ impl Replication { sample, ) .await; + + // The consolidation mode `Monotonic`, used for sending out an + // `AlignmentQuery::Discovery`, will keep on sending replies. We only want + // to discover / align with a single Replica so we break here. + if matches!(alignment_query, AlignmentQuery::Discovery) { + return; + } } } } - }); + }) } /// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is @@ -438,6 +508,39 @@ impl Replication { sample: Sample, ) { match alignment_reply { + AlignmentReply::Discovery(replica_zid) => { + let parsed_ke = match aligner_key_expr_formatter::parse(&replica_aligner_ke) { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to parse < {replica_aligner_ke} > as a valid Aligner key \ + expression: {e:?}" + ); + return; + } + }; + + let replica_aligner_ke = match keformat!( + aligner_key_expr_formatter::formatter(), + hash_configuration = parsed_ke.hash_configuration(), + zid = replica_zid, + ) { + Ok(ke) => ke, + Err(e) => { + tracing::error!("Failed to generate a valid Aligner key expression: {e:?}"); + return; + } + }; + + tracing::debug!("Performing initial alignment with Replica < {replica_zid} >"); + + if let Err(e) = self + .spawn_query_replica_aligner(replica_aligner_ke, AlignmentQuery::All) + .await + { + tracing::error!("Error returned while performing the initial alignment: {e:?}"); + } + } AlignmentReply::Intervals(replica_intervals) => { tracing::trace!("Processing `AlignmentReply::Intervals`"); let intervals_diff = { @@ -616,18 +719,26 @@ impl Replication { } } - if matches!( - self.storage - .lock() - .await - .put( - replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, - ) - .await, - Ok(StorageInsertionResult::Outdated) | Err(_) - ) { + // NOTE: This code can only be called with `action` set to `delete` on an initial + // alignment, in which case the Storage of the receiving Replica is empty => there + // is no need to actually call `storage.delete`. + // + // Outside of an initial alignment, the `delete` action will be performed at the + // step above, in `AlignmentReply::Events`. + if replica_event.action == SampleKind::Put + && matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) + { return; } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index bd96912a4..0db1459ec 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -53,6 +53,63 @@ pub(crate) struct Replication { } impl Replication { + /// Performs an initial alignment, skipping the comparison of Digest, asking directly the first + /// discovered Replica for all its entries. + /// + /// # ⚠️ Assumption: empty Storage + /// + /// We assume that this method will only be called if the underlying Storage is empty. This has + /// at least one consequence: if the Aligner receives a `delete` event from the Replica, it will + /// not attempt to delete anything from the Storage. + /// + /// # Replica discovery + /// + /// To discover a Replica, this method will create a Digest subscriber, wait to receive a + /// *valid* Digest and, upon reception, ask that Replica for all its entries. + /// + /// To avoid waiting indefinitely (in case there are no other Replica on the network), the + /// subscriber will wait for, at most, the duration of two Intervals. + pub(crate) async fn initial_alignment(&self) { + let ke_all_replicas = match keformat!( + aligner_key_expr_formatter::formatter(), + hash_configuration = *self + .replication_log + .read() + .await + .configuration + .fingerprint(), + zid = "*", + ) { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to generate key expression to query all Replicas: {e:?}. Skipping \ + initial alignment." + ); + return; + } + }; + + // NOTE: As discussed with @OlivierHecart, the plugins do not wait for the duration of the + // "scouting delay" before performing any Zenoh operation. Hence, we manually enforce this + // delay when performing the initial alignment. + let delay = self + .zenoh_session + .config() + .lock() + .scouting + .delay() + .unwrap_or(500); + tokio::time::sleep(Duration::from_millis(delay)).await; + + if let Err(e) = self + .spawn_query_replica_aligner(ke_all_replicas, AlignmentQuery::Discovery) + .await + { + tracing::error!("Initial alignment failed with: {e:?}"); + } + } + /// Spawns a task that periodically publishes the [Digest] of the Replication [Log]. /// /// This task will perform the following steps: diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index f3b87c6c3..e48fb4fec 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -12,13 +12,13 @@ // ZettaScale Zenoh Team, // -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::{ sync::{broadcast::Receiver, RwLock}, task::JoinHandle, }; -use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session}; +use zenoh::{key_expr::OwnedKeyExpr, session::Session}; use super::{core::Replication, LogLatest}; use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService}; @@ -29,17 +29,22 @@ pub(crate) struct ReplicationService { aligner_queryable_handle: JoinHandle<()>, } -pub(crate) const MAX_RETRY: usize = 2; -pub(crate) const WAIT_PERIOD_SECS: u64 = 4; - impl ReplicationService { /// Starts the `ReplicationService`, spawning multiple tasks. /// + /// # Initial alignment + /// + /// To optimise network resources, if the Storage is empty an "initial alignment" will be + /// performed: if a Replica is detected, a query will be made to retrieve the entire content of + /// its Storage. + /// /// # Tasks spawned /// - /// This function will spawn two tasks: + /// This function will spawn four long-lived tasks: /// 1. One to publish the [Digest]. - /// 2. One to wait on the provided [Receiver] in order to stop the Replication Service, + /// 2. One to receive the [Digest] of other Replica. + /// 3. One to receive alignment queries of other Replica. + /// 4. One to wait on the provided [Receiver] in order to stop the Replication Service, /// attempting to abort all the tasks that were spawned, once a Stop message has been /// received. pub async fn spawn_start( @@ -50,65 +55,27 @@ impl ReplicationService { latest_updates: Arc>, mut rx: Receiver, ) { - // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing - // information and, here, we need to have the queryables propagated. - // - // 4 seconds is an arbitrary value. - let mut attempt = 0; - let mut received_reply = false; - - while attempt < MAX_RETRY { - attempt += 1; - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - - match zenoh_session - .get(&storage_key_expr) - // `BestMatching`, the default option for `target`, will try to minimise the storage - // that are queried and their distance while trying to maximise the key space - // covered. - // - // In other words, if there is a close and complete storage, it will only query this - // one. - .target(QueryTarget::BestMatching) - // The value `Remote` is self-explanatory but why it is needed deserves an - // explanation: we do not want to query the local database as the purpose is to get - // the data from other replicas (if there is one). - .allowed_destination(Locality::Remote) - .await - { - Ok(replies) => { - while let Ok(reply) = replies.recv_async().await { - received_reply = true; - if let Ok(sample) = reply.into_result() { - if let Err(e) = storage_service.process_sample(sample).await { - tracing::error!("{e:?}"); - } - } - } - } - Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"), - } + let storage = storage_service.storage.clone(); - if received_reply { - break; - } + let replication = Replication { + zenoh_session, + replication_log, + storage_key_expr, + latest_updates, + storage, + }; - tracing::debug!( - "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}." - ); + if replication + .replication_log + .read() + .await + .intervals + .is_empty() + { + replication.initial_alignment().await; } - let storage = storage_service.storage.clone(); - tokio::task::spawn(async move { - let replication = Replication { - zenoh_session, - replication_log, - storage_key_expr, - latest_updates, - storage, - }; - let replication_service = Self { digest_publisher_handle: replication.spawn_digest_publisher(), digest_subscriber_handle: replication.spawn_digest_subscriber(), @@ -124,7 +91,7 @@ impl ReplicationService { }); } - /// Stops all the tasks spawned by the `ReplicationService`. + /// Stops all the long-lived tasks spawned by the `ReplicationService`. pub fn stop(self) { self.digest_publisher_handle.abort(); self.digest_subscriber_handle.abort();