From b80fd0aa30842e607ae661547368df7f818f3a29 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 24 Mar 2024 18:58:55 +0100 Subject: [PATCH] interceptors removed from plugin storage API --- plugins/zenoh-backend-example/src/lib.rs | 13 +--- plugins/zenoh-backend-traits/src/lib.rs | 68 +------------------ .../src/backends_mgt.rs | 7 -- .../zenoh-plugin-storage-manager/src/lib.rs | 4 -- .../src/memory_backend/mod.rs | 20 ------ .../src/replica/storage.rs | 23 ------- 6 files changed, 3 insertions(+), 132 deletions(-) diff --git a/plugins/zenoh-backend-example/src/lib.rs b/plugins/zenoh-backend-example/src/lib.rs index 602d29f375..f81231a498 100644 --- a/plugins/zenoh-backend-example/src/lib.rs +++ b/plugins/zenoh-backend-example/src/lib.rs @@ -13,11 +13,8 @@ // use async_std::sync::RwLock; use async_trait::async_trait; -use std::{ - collections::{hash_map::Entry, HashMap}, - sync::Arc, -}; -use zenoh::{prelude::OwnedKeyExpr, sample::Sample, time::Timestamp, value::Value}; +use std::collections::{hash_map::Entry, HashMap}; +use zenoh::{prelude::OwnedKeyExpr, time::Timestamp, value::Value}; use zenoh_backend_traits::{ config::{StorageConfig, VolumeConfig}, Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume, @@ -71,12 +68,6 @@ impl Volume for ExampleBackend { async fn create_storage(&self, _props: StorageConfig) -> ZResult> { Ok(Box::::default()) } - fn incoming_data_interceptor(&self) -> Option Sample + Send + Sync>> { - None - } - fn outgoing_data_interceptor(&self) -> Option Sample + Send + Sync>> { - None - } } #[async_trait] diff --git a/plugins/zenoh-backend-traits/src/lib.rs b/plugins/zenoh-backend-traits/src/lib.rs index d17e6dfd77..40d022f1ec 100644 --- a/plugins/zenoh-backend-traits/src/lib.rs +++ b/plugins/zenoh-backend-traits/src/lib.rs @@ -68,16 +68,6 @@ //! // The properties are the ones passed via a PUT in the admin space for Storage creation. //! Ok(Box::new(MyStorage::new(properties).await?)) //! } -//! -//! fn incoming_data_interceptor(&self) -> Option Sample + Send + Sync>> { -//! // No interception point for incoming data (on PUT operations) -//! None -//! } -//! -//! fn outgoing_data_interceptor(&self) -> Option Sample + Send + Sync>> { -//! // No interception point for outgoing data (on GET operations) -//! None -//! } //! } //! //! // Your Storage implementation @@ -135,9 +125,7 @@ use async_trait::async_trait; use const_format::concatcp; -use std::sync::Arc; -use zenoh::prelude::{KeyExpr, OwnedKeyExpr, Sample, Selector}; -use zenoh::queryable::ReplyBuilder; +use zenoh::prelude::OwnedKeyExpr; use zenoh::time::Timestamp; use zenoh::value::Value; pub use zenoh::Result as ZResult; @@ -210,14 +198,6 @@ pub trait Volume: Send + Sync { /// Creates a storage configured with some properties. async fn create_storage(&self, props: StorageConfig) -> ZResult>; - - /// Returns an interceptor that will be called before pushing any data - /// into a storage created by this backend. `None` can be returned for no interception point. - fn incoming_data_interceptor(&self) -> Option Sample + Send + Sync>>; - - /// Returns an interceptor that will be called before sending any reply - /// to a query from a storage created by this backend. `None` can be returned for no interception point. - fn outgoing_data_interceptor(&self) -> Option Sample + Send + Sync>>; } pub type VolumeInstance = Box; @@ -282,49 +262,3 @@ pub trait Storage: Send + Sync { /// Remember to fetch the entry corresponding to the `None` key async fn get_all_entries(&self) -> ZResult, Timestamp)>>; } - -/// A wrapper around the [`zenoh::queryable::Query`] allowing to call the -/// OutgoingDataInterceptor (if any) before to send the reply -pub struct Query { - q: zenoh::queryable::Query, - interceptor: Option Sample + Send + Sync>>, -} - -impl Query { - pub fn new( - q: zenoh::queryable::Query, - interceptor: Option Sample + Send + Sync>>, - ) -> Query { - Query { q, interceptor } - } - - /// The full [`Selector`] of this Query. - #[inline(always)] - pub fn selector(&self) -> Selector<'_> { - self.q.selector() - } - - /// The key selector part of this Query. - #[inline(always)] - pub fn key_expr(&self) -> &KeyExpr<'static> { - self.q.key_expr() - } - - /// This Query's selector parameters. - #[inline(always)] - pub fn parameters(&self) -> &str { - self.q.parameters() - } - - /// Sends a Sample as a reply to this Query - pub fn reply(&self, sample: Sample) -> ReplyBuilder<'_> { - // Call outgoing intercerceptor - let sample = if let Some(ref interceptor) = self.interceptor { - interceptor(sample) - } else { - sample - }; - // Send reply - self.q.reply_sample(sample) - } -} diff --git a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs index aa7260e868..90a6ae6250 100644 --- a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs +++ b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs @@ -14,7 +14,6 @@ use super::storages_mgt::*; use flume::Sender; use std::sync::Arc; -use zenoh::prelude::r#async::*; use zenoh::Session; use zenoh_backend_traits::config::StorageConfig; use zenoh_backend_traits::{Capability, VolumeInstance}; @@ -23,16 +22,12 @@ use zenoh_result::ZResult; pub struct StoreIntercept { pub storage: Box, pub capability: Capability, - pub in_interceptor: Option Sample + Send + Sync>>, - pub out_interceptor: Option Sample + Send + Sync>>, } pub(crate) async fn create_and_start_storage( admin_key: String, config: StorageConfig, backend: &VolumeInstance, - in_interceptor: Option Sample + Send + Sync>>, - out_interceptor: Option Sample + Send + Sync>>, zenoh: Arc, ) -> ZResult> { log::trace!("Create storage '{}'", &admin_key); @@ -41,8 +36,6 @@ pub(crate) async fn create_and_start_storage( let store_intercept = StoreIntercept { storage, capability, - in_interceptor, - out_interceptor, }; start_storage(store_intercept, config, admin_key, zenoh).await diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index 0db30bbd6a..91df2f108d 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -239,14 +239,10 @@ impl StorageRuntimeInner { volume_id, backend.name() ); - let in_interceptor = backend.instance().incoming_data_interceptor(); - let out_interceptor = backend.instance().outgoing_data_interceptor(); let stopper = async_std::task::block_on(create_and_start_storage( admin_key, storage.clone(), backend.instance(), - in_interceptor, - out_interceptor, self.session.clone(), ))?; self.storages diff --git a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs index ebb4922c9d..4e333b8592 100644 --- a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs @@ -61,26 +61,6 @@ impl Volume for MemoryBackend { log::debug!("Create Memory Storage with configuration: {:?}", properties); Ok(Box::new(MemoryStorage::new(properties).await?)) } - - fn incoming_data_interceptor(&self) -> Option Sample + Send + Sync>> { - // By default: no interception point - None - // To test interceptors, uncomment this line: - // Some(Arc::new(|sample| { - // trace!(">>>> IN INTERCEPTOR FOR {:?}", sample); - // sample - // })) - } - - fn outgoing_data_interceptor(&self) -> Option Sample + Send + Sync>> { - // By default: no interception point - None - // To test interceptors, uncomment this line: - // Some(Arc::new(|sample| { - // trace!("<<<< OUT INTERCEPTOR FOR {:?}", sample); - // sample - // })) - } } impl Drop for MemoryBackend { diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 0708dcabd9..35134dfe43 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -60,8 +60,6 @@ pub struct StorageService { capability: Capability, tombstones: Arc>>, wildcard_updates: Arc>>, - in_interceptor: Option Sample + Send + Sync>>, - out_interceptor: Option Sample + Send + Sync>>, replication: Option, } @@ -85,8 +83,6 @@ impl StorageService { capability: store_intercept.capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), - in_interceptor: store_intercept.in_interceptor, - out_interceptor: store_intercept.out_interceptor, replication, }; if storage_service @@ -263,13 +259,6 @@ impl StorageService { // the trimming during PUT and GET should be handled by the plugin async fn process_sample(&self, sample: Sample) { log::trace!("[STORAGE] Processing sample: {:?}", sample); - // Call incoming data interceptor (if any) - let sample = if let Some(ref interceptor) = self.in_interceptor { - interceptor(sample) - } else { - sample - }; - // if wildcard, update wildcard_updates if sample.key_expr().is_wild() { self.register_wildcard_update(sample.clone()).await; @@ -523,12 +512,6 @@ impl StorageService { let sample = Sample::new(key.clone(), payload) .with_encoding(encoding) .with_timestamp(entry.timestamp); - // apply outgoing interceptor on results - let sample = if let Some(ref interceptor) = self.out_interceptor { - interceptor(sample) - } else { - sample - }; if let Err(e) = q.reply_sample(sample).res().await { log::warn!( "Storage '{}' raised an error replying a query: {}", @@ -561,12 +544,6 @@ impl StorageService { let sample = Sample::new(q.key_expr().clone(), payload) .with_encoding(encoding) .with_timestamp(entry.timestamp); - // apply outgoing interceptor on results - let sample = if let Some(ref interceptor) = self.out_interceptor { - interceptor(sample) - } else { - sample - }; if let Err(e) = q.reply_sample(sample).res().await { log::warn!( "Storage '{}' raised an error replying a query: {}",