Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interceptors removed from plugin storage API #859

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
//
use async_std::sync::RwLock;
use async_trait::async_trait;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use zenoh::{prelude::OwnedKeyExpr, sample::Sample, time::Timestamp, value::Value};
use std::collections::{hash_map::Entry, HashMap};
use zenoh::{prelude::OwnedKeyExpr, time::Timestamp, value::Value};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
Expand Down Expand Up @@ -71,12 +68,6 @@ impl Volume for ExampleBackend {
async fn create_storage(&self, _props: StorageConfig) -> ZResult<Box<dyn Storage>> {
Ok(Box::<ExampleStorage>::default())
}
fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
None
}
fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
None
}
}

#[async_trait]
Expand Down
68 changes: 1 addition & 67 deletions plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,6 @@
//! // The properties are the ones passed via a PUT in the admin space for Storage creation.
//! Ok(Box::new(MyStorage::new(properties).await?))
//! }
//!
//! fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
//! // No interception point for incoming data (on PUT operations)
//! None
//! }
//!
//! fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
//! // No interception point for outgoing data (on GET operations)
//! None
//! }
//! }
//!
//! // Your Storage implementation
Expand Down Expand Up @@ -135,9 +125,7 @@

use async_trait::async_trait;
use const_format::concatcp;
use std::sync::Arc;
use zenoh::prelude::{KeyExpr, OwnedKeyExpr, Sample, Selector};
use zenoh::queryable::ReplyBuilder;
use zenoh::prelude::OwnedKeyExpr;
use zenoh::time::Timestamp;
use zenoh::value::Value;
pub use zenoh::Result as ZResult;
Expand Down Expand Up @@ -210,14 +198,6 @@ pub trait Volume: Send + Sync {

/// Creates a storage configured with some properties.
async fn create_storage(&self, props: StorageConfig) -> ZResult<Box<dyn Storage>>;

/// Returns an interceptor that will be called before pushing any data
/// into a storage created by this backend. `None` can be returned for no interception point.
fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>;

/// Returns an interceptor that will be called before sending any reply
/// to a query from a storage created by this backend. `None` can be returned for no interception point.
fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>;
}

pub type VolumeInstance = Box<dyn Volume + 'static>;
Expand Down Expand Up @@ -282,49 +262,3 @@ pub trait Storage: Send + Sync {
/// Remember to fetch the entry corresponding to the `None` key
async fn get_all_entries(&self) -> ZResult<Vec<(Option<OwnedKeyExpr>, Timestamp)>>;
}

/// A wrapper around the [`zenoh::queryable::Query`] allowing to call the
/// OutgoingDataInterceptor (if any) before to send the reply
pub struct Query {
q: zenoh::queryable::Query,
interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
}

impl Query {
pub fn new(
q: zenoh::queryable::Query,
interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
) -> Query {
Query { q, interceptor }
}

/// The full [`Selector`] of this Query.
#[inline(always)]
pub fn selector(&self) -> Selector<'_> {
self.q.selector()
}

/// The key selector part of this Query.
#[inline(always)]
pub fn key_expr(&self) -> &KeyExpr<'static> {
self.q.key_expr()
}

/// This Query's selector parameters.
#[inline(always)]
pub fn parameters(&self) -> &str {
self.q.parameters()
}

/// Sends a Sample as a reply to this Query
pub fn reply(&self, sample: Sample) -> ReplyBuilder<'_> {
// Call outgoing intercerceptor
let sample = if let Some(ref interceptor) = self.interceptor {
interceptor(sample)
} else {
sample
};
// Send reply
self.q.reply_sample(sample)
}
}
7 changes: 0 additions & 7 deletions plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use super::storages_mgt::*;
use flume::Sender;
use std::sync::Arc;
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_backend_traits::config::StorageConfig;
use zenoh_backend_traits::{Capability, VolumeInstance};
Expand All @@ -23,16 +22,12 @@ use zenoh_result::ZResult;
pub struct StoreIntercept {
pub storage: Box<dyn zenoh_backend_traits::Storage>,
pub capability: Capability,
pub in_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
pub out_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
}

pub(crate) async fn create_and_start_storage(
admin_key: String,
config: StorageConfig,
backend: &VolumeInstance,
in_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
out_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
zenoh: Arc<Session>,
) -> ZResult<Sender<StorageMessage>> {
log::trace!("Create storage '{}'", &admin_key);
Expand All @@ -41,8 +36,6 @@ pub(crate) async fn create_and_start_storage(
let store_intercept = StoreIntercept {
storage,
capability,
in_interceptor,
out_interceptor,
};

start_storage(store_intercept, config, admin_key, zenoh).await
Expand Down
4 changes: 0 additions & 4 deletions plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,10 @@ impl StorageRuntimeInner {
volume_id,
backend.name()
);
let in_interceptor = backend.instance().incoming_data_interceptor();
let out_interceptor = backend.instance().outgoing_data_interceptor();
let stopper = async_std::task::block_on(create_and_start_storage(
admin_key,
storage.clone(),
backend.instance(),
in_interceptor,
out_interceptor,
self.session.clone(),
))?;
self.storages
Expand Down
20 changes: 0 additions & 20 deletions plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,6 @@ impl Volume for MemoryBackend {
log::debug!("Create Memory Storage with configuration: {:?}", properties);
Ok(Box::new(MemoryStorage::new(properties).await?))
}

fn incoming_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
// By default: no interception point
None
// To test interceptors, uncomment this line:
// Some(Arc::new(|sample| {
// trace!(">>>> IN INTERCEPTOR FOR {:?}", sample);
// sample
// }))
}

fn outgoing_data_interceptor(&self) -> Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>> {
// By default: no interception point
None
// To test interceptors, uncomment this line:
// Some(Arc::new(|sample| {
// trace!("<<<< OUT INTERCEPTOR FOR {:?}", sample);
// sample
// }))
}
}

impl Drop for MemoryBackend {
Expand Down
23 changes: 0 additions & 23 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub struct StorageService {
capability: Capability,
tombstones: Arc<RwLock<KeBoxTree<Timestamp, NonWild, KeyedSetProvider>>>,
wildcard_updates: Arc<RwLock<KeBoxTree<Update, UnknownWildness, KeyedSetProvider>>>,
in_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
out_interceptor: Option<Arc<dyn Fn(Sample) -> Sample + Send + Sync>>,
replication: Option<ReplicationService>,
}

Expand All @@ -85,8 +83,6 @@ impl StorageService {
capability: store_intercept.capability,
tombstones: Arc::new(RwLock::new(KeBoxTree::default())),
wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())),
in_interceptor: store_intercept.in_interceptor,
out_interceptor: store_intercept.out_interceptor,
replication,
};
if storage_service
Expand Down Expand Up @@ -263,13 +259,6 @@ impl StorageService {
// the trimming during PUT and GET should be handled by the plugin
async fn process_sample(&self, sample: Sample) {
log::trace!("[STORAGE] Processing sample: {:?}", sample);
// Call incoming data interceptor (if any)
let sample = if let Some(ref interceptor) = self.in_interceptor {
interceptor(sample)
} else {
sample
};

// if wildcard, update wildcard_updates
if sample.key_expr().is_wild() {
self.register_wildcard_update(sample.clone()).await;
Expand Down Expand Up @@ -523,12 +512,6 @@ impl StorageService {
let sample = Sample::new(key.clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
// apply outgoing interceptor on results
let sample = if let Some(ref interceptor) = self.out_interceptor {
interceptor(sample)
} else {
sample
};
if let Err(e) = q.reply_sample(sample).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
Expand Down Expand Up @@ -561,12 +544,6 @@ impl StorageService {
let sample = Sample::new(q.key_expr().clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
// apply outgoing interceptor on results
let sample = if let Some(ref interceptor) = self.out_interceptor {
interceptor(sample)
} else {
sample
};
if let Err(e) = q.reply_sample(sample).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
Expand Down