Skip to content

Commit

Permalink
interceptors removed from plugin storage API (#859)
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin authored Mar 25, 2024
1 parent 7300f4c commit 65a4d7f
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 132 deletions.
13 changes: 2 additions & 11 deletions plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
//
use async_std::sync::RwLock;
use async_trait::async_trait;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use zenoh::{prelude::OwnedKeyExpr, sample::Sample, time::Timestamp, value::Value};
use std::collections::{hash_map::Entry, HashMap};
use zenoh::{prelude::OwnedKeyExpr, time::Timestamp, value::Value};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
Expand Down Expand Up @@ -71,12 +68,6 @@ impl Volume for ExampleBackend {
async fn create_storage(&self, _props: StorageConfig) -> ZResult<Box<dyn Storage>> {
Ok(Box::<ExampleStorage>::default())
}
fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
None
}
fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
None
}
}

#[async_trait]
Expand Down
68 changes: 1 addition & 67 deletions plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,6 @@
//! // The properties are the ones passed via a PUT in the admin space for Storage creation.
//! Ok(Box::new(MyStorage::new(properties).await?))
//! }
//!
//! fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
//! // No interception point for incoming data (on PUT operations)
//! None
//! }
//!
//! fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
//! // No interception point for outgoing data (on GET operations)
//! None
//! }
//! }
//!
//! // Your Storage implementation
Expand Down Expand Up @@ -135,9 +125,7 @@
use async_trait::async_trait;
use const_format::concatcp;
use std::sync::Arc;
use zenoh::prelude::{KeyExpr, OwnedKeyExpr, Sample, Selector};
use zenoh::queryable::ReplyBuilder;
use zenoh::prelude::OwnedKeyExpr;
use zenoh::time::Timestamp;
use zenoh::value::Value;
pub use zenoh::Result as ZResult;
Expand Down Expand Up @@ -210,14 +198,6 @@ pub trait Volume: Send + Sync {

/// Creates a storage configured with some properties.
async fn create_storage(&self, props: StorageConfig) -> ZResult<Box<dyn Storage>>;

/// Returns an interceptor that will be called before pushing any data
/// into a storage created by this backend. `None` can be returned for no interception point.
fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>;

/// Returns an interceptor that will be called before sending any reply
/// to a query from a storage created by this backend. `None` can be returned for no interception point.
fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>;
}

pub type VolumeInstance = Box<dyn Volume + 'static>;
Expand Down Expand Up @@ -282,49 +262,3 @@ pub trait Storage: Send + Sync {
/// Remember to fetch the entry corresponding to the `None` key
async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>>;
}

/// A wrapper around the [`zenoh::queryable::Query`] allowing to call the
/// OutgoingDataInterceptor (if any) before to send the reply
pub struct Query {
q: zenoh::queryable::Query,
interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
}

impl Query {
pub fn new(
q: zenoh::queryable::Query,
interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
) -> Query {
Query { q, interceptor }
}

/// The full [`Selector`] of this Query.
#[inline(always)]
pub fn selector(&self) -> Selector<'_> {
self.q.selector()
}

/// The key selector part of this Query.
#[inline(always)]
pub fn key_expr(&self) -> &KeyExpr<'static> {
self.q.key_expr()
}

/// This Query's selector parameters.
#[inline(always)]
pub fn parameters(&self) -> &str {
self.q.parameters()
}

/// Sends a Sample as a reply to this Query
pub fn reply(&self, sample: Sample) -> ReplyBuilder<'_> {
// Call outgoing intercerceptor
let sample = if let Some(ref interceptor) = self.interceptor {
interceptor(sample)
} else {
sample
};
// Send reply
self.q.reply_sample(sample)
}
}
7 changes: 0 additions & 7 deletions plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use super::storages_mgt::*;
use flume::Sender;
use std::sync::Arc;
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_backend_traits::config::StorageConfig;
use zenoh_backend_traits::{Capability, VolumeInstance};
Expand All @@ -23,16 +22,12 @@ use zenoh_result::ZResult;
pub struct StoreIntercept {
pub storage: Box<dyn zenoh_backend_traits::Storage>,
pub capability: Capability,
pub in_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
pub out_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
}

pub(crate) async fn create_and_start_storage(
admin_key: String,
config: StorageConfig,
backend: &VolumeInstance,
in_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
out_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
zenoh: Arc<Session>,
) -> ZResult<Sender<StorageMessage>> {
log::trace!("Create storage '{}'", &admin_key);
Expand All @@ -41,8 +36,6 @@ pub(crate) async fn create_and_start_storage(
let store_intercept = StoreIntercept {
storage,
capability,
in_interceptor,
out_interceptor,
};

start_storage(store_intercept, config, admin_key, zenoh).await
Expand Down
4 changes: 0 additions & 4 deletions plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,10 @@ impl StorageRuntimeInner {
volume_id,
backend.name()
);
let in_interceptor = backend.instance().incoming_data_interceptor();
let out_interceptor = backend.instance().outgoing_data_interceptor();
let stopper = async_std::task::block_on(create_and_start_storage(
admin_key,
storage.clone(),
backend.instance(),
in_interceptor,
out_interceptor,
self.session.clone(),
))?;
self.storages
Expand Down
20 changes: 0 additions & 20 deletions plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,6 @@ impl Volume for MemoryBackend {
log::debug!("Create Memory Storage with configuration: {:?}", properties);
Ok(Box::new(MemoryStorage::new(properties).await?))
}

fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
// By default: no interception point
None
// To test interceptors, uncomment this line:
// Some(Arc::new(|sample| {
// trace!(">>>> IN INTERCEPTOR FOR {:?}", sample);
// sample
// }))
}

fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
// By default: no interception point
None
// To test interceptors, uncomment this line:
// Some(Arc::new(|sample| {
// trace!("<<<< OUT INTERCEPTOR FOR {:?}", sample);
// sample
// }))
}
}

impl Drop for MemoryBackend {
Expand Down
23 changes: 0 additions & 23 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub struct StorageService {
capability: Capability,
tombstones: Arc<RwLock<KeBoxTree<Timestamp, NonWild, KeyedSetProvider>>>,
wildcard_updates: Arc<RwLock<KeBoxTree<Update, UnknownWildness, KeyedSetProvider>>>,
in_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
out_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
replication: Option<ReplicationService>,
}

Expand All @@ -85,8 +83,6 @@ impl StorageService {
capability: store_intercept.capability,
tombstones: Arc::new(RwLock::new(KeBoxTree::default())),
wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())),
in_interceptor: store_intercept.in_interceptor,
out_interceptor: store_intercept.out_interceptor,
replication,
};
if storage_service
Expand Down Expand Up @@ -263,13 +259,6 @@ impl StorageService {
// the trimming during PUT and GET should be handled by the plugin
async fn process_sample(&self, sample: Sample) {
log::trace!("[STORAGE] Processing sample: {:?}", sample);
// Call incoming data interceptor (if any)
let sample = if let Some(ref interceptor) = self.in_interceptor {
interceptor(sample)
} else {
sample
};

// if wildcard, update wildcard_updates
if sample.key_expr().is_wild() {
self.register_wildcard_update(sample.clone()).await;
Expand Down Expand Up @@ -523,12 +512,6 @@ impl StorageService {
let sample = Sample::new(key.clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
// apply outgoing interceptor on results
let sample = if let Some(ref interceptor) = self.out_interceptor {
interceptor(sample)
} else {
sample
};
if let Err(e) = q.reply_sample(sample).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
Expand Down Expand Up @@ -561,12 +544,6 @@ impl StorageService {
let sample = Sample::new(q.key_expr().clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
// apply outgoing interceptor on results
let sample = if let Some(ref interceptor) = self.out_interceptor {
interceptor(sample)
} else {
sample
};
if let Err(e) = q.reply_sample(sample).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
Expand Down

0 comments on commit 65a4d7f

Please sign in to comment.