From 53017dae62a09ecd6e251d2d60c77459237c476a Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Thu, 11 Jan 2024 11:27:19 +0100 Subject: [PATCH] Fix storages replication (a query was using wrong ConsolidationMode) --- .../src/replica/aligner.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index 45352a7334..041567ae27 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet}; use std::str; use zenoh::key_expr::{KeyExpr, OwnedKeyExpr}; use zenoh::prelude::r#async::*; -use zenoh::query::QueryConsolidation; use zenoh::time::Timestamp; use zenoh::Session; @@ -88,7 +87,10 @@ impl Aligner { let checksum = other.checksum; let timestamp = other.timestamp; let (missing_content, no_content_err) = self.get_missing_content(&other, from).await; - log::trace!("[ALIGNER] Missing content is {:?}", missing_content); + log::debug!( + "[ALIGNER] Missing {} entries; query corresponding samples", + missing_content.len() + ); // If missing content is not identified, it showcases some problem // The problem will be addressed in the future rounds, hence will not count as processed @@ -98,11 +100,12 @@ impl Aligner { .await; // Missing data might be empty since some samples in digest might be outdated - log::trace!("[ALIGNER] Missing data is {:?}", missing_data); + log::debug!("[ALIGNER] Received {} queried samples", missing_data.len()); + log::trace!("[ALIGNER] Received queried samples: {missing_data:?}"); for (key, (ts, value)) in missing_data { let sample = Sample::new(key, value).with_timestamp(ts); - log::debug!("[ALIGNER] Adding sample {:?} to storage", sample); + log::debug!("[ALIGNER] Adding {:?} to storage", sample); self.tx_sample.send_async(sample).await.unwrap_or_else(|e| { log::error!("[ALIGNER] Error adding sample to storage: {}", e) }); @@ -140,6 +143,7 @@ impl Aligner { } async fn get_missing_content(&self, other: &Digest, from: &str) -> (Vec, bool) { + log::debug!("[ALIGNER] Get missing content from {from} ..."); // get my digest let this = &self.snapshotter.get_digest().await; @@ -152,6 +156,9 @@ impl Aligner { let ((cold_data, no_cold_err), (warm_data, no_warm_err), (hot_data, no_hot_err)) = futures::join!(cold_alignment, warm_alignment, hot_alignment); + log::debug!("[ALIGNER] Missing content from {from} in Cold era: {cold_data:?}"); + log::debug!("[ALIGNER] Missing content from {from} in Warm era: {warm_data:?}"); + log::debug!("[ALIGNER] Missing content from {from} in Hot era: {hot_data:?}"); ( [cold_data, warm_data, hot_data].concat(), no_cold_err && no_warm_err && no_hot_err, @@ -313,7 +320,7 @@ impl Aligner { match self .session .get(&selector) - .consolidation(QueryConsolidation::AUTO) + .consolidation(zenoh::query::ConsolidationMode::None) .accept_replies(zenoh::query::ReplyKeyExpr::Any) .res() .await @@ -345,6 +352,7 @@ impl Aligner { no_err = false; } }; + log::trace!("[ALIGNER] On Query '{selector}' received: {return_val:?} (no_err:{no_err})"); (return_val, no_err) } }