Skip to content

Commit

Permalink
samples, plugins updated
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Mar 24, 2024
1 parent cc580a5 commit 2708402
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 17 deletions.
13 changes: 12 additions & 1 deletion plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::{
use zenoh::plugins::{RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::r#async::*;
use zenoh::runtime::Runtime;
use zenoh::sample_builder::SampleBuilderTrait;
use zenoh_core::zlock;
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
use zenoh_result::{bail, ZResult};
Expand Down Expand Up @@ -174,7 +175,17 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc<AtomicBool>) {
info!("Handling query '{}'", query.selector());
for (key_expr, sample) in stored.iter() {
if query.selector().key_expr.intersects(unsafe{keyexpr::from_str_unchecked(key_expr)}) {
query.reply_sample(sample.clone()).res().await.unwrap();
let reply = query
.reply_sample(sample.key_expr().clone().into_owned())
.with_timestamp_opt(sample.timestamp().cloned());
#[cfg(feature = "unstable")]
let reply = reply
.with_attachment_opt(sample.attachment())
.with_source_info(sample.source_info());
match sample.kind() {
SampleKind::Put => reply.put(sample.payload().clone()).res().await.unwrap(),
SampleKind::Delete => reply.delete().res().await.unwrap(),
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::str;
use std::str::FromStr;
use zenoh::payload::StringOrBase64;
use zenoh::prelude::r#async::*;
use zenoh::sample_builder::PutSampleBuilderTrait;
use zenoh::sample_builder::SampleBuilderTrait;
use zenoh::time::Timestamp;
use zenoh::Session;

Expand Down
4 changes: 2 additions & 2 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use zenoh::buffers::ZBuf;
use zenoh::prelude::r#async::*;
use zenoh::query::ConsolidationMode;
use zenoh::sample_builder::{SampleBuilderTrait, SampleUpdater};
use zenoh::sample_builder::{PutSampleBuilderTrait, SampleBuilder, SampleBuilderTrait};
use zenoh::time::{new_reception_timestamp, Timestamp, NTP64};
use zenoh::{Result as ZResult, Session};
use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig};
Expand Down Expand Up @@ -368,7 +368,7 @@ impl StorageService {
}
};
let sample = if sample.timestamp().is_none() {
SampleUpdater::from(sample).with_timestamp(new_reception_timestamp()).res_sync()
SampleBuilder::from(sample).with_timestamp(new_reception_timestamp()).res_sync()
} else {
sample
};
Expand Down
25 changes: 19 additions & 6 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::convert::TryInto;
use std::future::Ready;
use zenoh::prelude::r#async::*;
use zenoh::queryable::{Query, Queryable};
use zenoh::sample_builder::SampleBuilderTrait;
use zenoh::subscriber::FlumeSubscriber;
use zenoh::SessionRef;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
Expand Down Expand Up @@ -116,6 +117,22 @@ pub struct PublicationCache<'a> {
_stoptx: Sender<bool>,
}

async fn reply_sample(query: &Query, sample: &Sample) {
let reply = query
.reply_sample(sample.key_expr().clone().into_owned())
.with_timestamp_opt(sample.timestamp().cloned());
#[cfg(feature = "unstable")]
let reply = reply
.with_attachment_opt(sample.attachment())
.with_source_info(sample.source_info());
if let Err(e) = match sample.kind() {
SampleKind::Put => reply.put(sample.payload().clone()).res_async().await,
SampleKind::Delete => reply.delete().res_async().await,
} {
log::warn!("Error replying to query: {}", e);
}
}

impl<'a> PublicationCache<'a> {
fn new(conf: PublicationCacheBuilder<'a, '_, '_>) -> ZResult<PublicationCache<'a>> {
let key_expr = conf.pub_key_expr?;
Expand Down Expand Up @@ -212,9 +229,7 @@ impl<'a> PublicationCache<'a> {
continue;
}
}
if let Err(e) = query.reply_sample(sample.clone()).res_async().await {
log::warn!("Error replying to query: {}", e);
}
reply_sample(&query, sample).await;
}
}
} else {
Expand All @@ -226,9 +241,7 @@ impl<'a> PublicationCache<'a> {
continue;
}
}
if let Err(e) = query.reply_sample(sample.clone()).res_async().await {
log::warn!("Error replying to query: {}", e);
}
reply_sample(&query, sample).await;
}
}
}
Expand Down
24 changes: 16 additions & 8 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,22 @@ impl SyncResolve for ReplyDelBuilder<'_> {
}
}

impl<'a> AsyncResolve for ReplyBuilder<'a> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
}
}

impl<'a> AsyncResolve for ReplyDelBuilder<'a> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
}
}

impl Query {
fn _reply_sample(&self, sample: Sample) -> ZResult<()> {
if !self._accepts_any_replies().unwrap_or(false)
Expand Down Expand Up @@ -583,14 +599,6 @@ impl Query {
}
}

impl<'a> AsyncResolve for ReplyBuilder<'a> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
}
}

impl<'a> Resolvable for ReplyErrBuilder<'a> {
type To = ZResult<()>;
}
Expand Down

0 comments on commit 2708402

Please sign in to comment.