Skip to content

Commit

Permalink
Merge pull request #858 from eclipse-zenoh/sample_api_rework
Browse files Browse the repository at this point in the history
Sample api rework
  • Loading branch information
milyin authored Apr 6, 2024
2 parents 5ee2bdb + 232177f commit b330289
Show file tree
Hide file tree
Showing 22 changed files with 1,243 additions and 647 deletions.
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ async fn main() {
let session = zenoh::open(config).res().await.unwrap();

println!("Sending Query '{selector}'...");
let replies = match value {
Some(value) => session.get(&selector).with_value(value),
None => session.get(&selector),
}
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
let replies = session
.get(&selector)
.value(value)
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => {
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ async fn main() {
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
let mut put = publisher.put(buf);
if let Some(attachment) = &attachment {
put = put.with_attachment(
put = put.attachment(Some(
attachment
.split('&')
.map(|pair| split_once(pair, '='))
.collect(),
)
))
}
put.res().await.unwrap();
}
Expand Down
10 changes: 2 additions & 8 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.content_type()
.map(|m| Encoding::from(m.to_string()))
.unwrap_or_default();
query = query.with_value(Value::from(body).with_encoding(encoding));
query = query.payload(body).encoding(encoding);
}
match query.res().await {
Ok(receiver) => {
Expand Down Expand Up @@ -463,13 +463,7 @@ async fn write(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
// @TODO: Define the right congestion control value
let session = &req.state().0;
let res = match method_to_kind(req.method()) {
SampleKind::Put => {
session
.put(&key_expr, bytes)
.with_encoding(encoding)
.res()
.await
}
SampleKind::Put => session.put(&key_expr, bytes).encoding(encoding).res().await,
SampleKind::Delete => session.delete(&key_expr).res().await,
};
match res {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ impl AlignQueryable {
AlignData::Data(k, (v, ts)) => {
query
.reply(k, v.payload)
.with_encoding(v.encoding)
.with_timestamp(ts)
.encoding(v.encoding)
.timestamp(ts)
.res()
.await
.unwrap();
Expand Down
8 changes: 5 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::str;
use zenoh::key_expr::{KeyExpr, OwnedKeyExpr};
use zenoh::payload::StringOrBase64;
use zenoh::prelude::r#async::*;
use zenoh::sample::builder::SampleBuilder;
use zenoh::time::Timestamp;
use zenoh::Session;

Expand Down Expand Up @@ -108,9 +109,10 @@ impl Aligner {
let Value {
payload, encoding, ..
} = value;
let sample = Sample::new(key, payload)
.with_encoding(encoding)
.with_timestamp(ts);
let sample = SampleBuilder::put(key, payload)
.encoding(encoding)
.timestamp(ts)
.into();
log::debug!("[ALIGNER] Adding {:?} to storage", sample);
self.tx_sample.send_async(sample).await.unwrap_or_else(|e| {
log::error!("[ALIGNER] Error adding sample to storage: {}", e)
Expand Down
73 changes: 40 additions & 33 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ use futures::select;
use std::collections::{HashMap, HashSet};
use std::str::{self, FromStr};
use std::time::{SystemTime, UNIX_EPOCH};
use zenoh::buffers::buffer::SplitBuffer;
use zenoh::buffers::ZBuf;
use zenoh::prelude::r#async::*;
use zenoh::query::ConsolidationMode;
use zenoh::time::{Timestamp, NTP64};
use zenoh::query::{ConsolidationMode, QueryTarget};
use zenoh::sample::builder::SampleBuilder;
use zenoh::sample::{Sample, SampleKind};
use zenoh::time::{new_reception_timestamp, Timestamp, NTP64};
use zenoh::value::Value;
use zenoh::{Result as ZResult, Session};
use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig};
use zenoh_backend_traits::{Capability, History, Persistence, StorageInsertionResult, StoredData};
Expand Down Expand Up @@ -219,14 +223,15 @@ impl StorageService {
select!(
// on sample for key_expr
sample = storage_sub.recv_async() => {
let mut sample = match sample {
let sample = match sample {
Ok(sample) => sample,
Err(e) => {
log::error!("Error in sample: {}", e);
continue;
}
};
sample.ensure_timestamp();
let timestamp = sample.timestamp().cloned().unwrap_or(new_reception_timestamp());
let sample = SampleBuilder::from(sample).timestamp(timestamp).into();
self.process_sample(sample).await;
},
// on query on key_expr
Expand Down Expand Up @@ -290,23 +295,25 @@ impl StorageService {
);
// there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage.
// get the relevant wild card entry and use that value and timestamp to update the storage
let sample_to_store = match self
let sample_to_store: Sample = if let Some(update) = self
.ovderriding_wild_update(&k, sample.timestamp().unwrap())
.await
{
Some(overriding_update) => {
let Value {
payload, encoding, ..
} = overriding_update.data.value;
Sample::new(KeyExpr::from(k.clone()), payload)
.with_encoding(encoding)
.with_timestamp(overriding_update.data.timestamp)
.with_kind(overriding_update.kind)
match update.kind {
SampleKind::Put => {
SampleBuilder::put(KeyExpr::from(k.clone()), update.data.value.payload)
.encoding(update.data.value.encoding)
.timestamp(update.data.timestamp)
.into()
}
SampleKind::Delete => SampleBuilder::delete(KeyExpr::from(k.clone()))
.timestamp(update.data.timestamp)
.into(),
}
None => Sample::new(KeyExpr::from(k.clone()), sample.payload().clone())
.with_encoding(sample.encoding().clone())
.with_timestamp(*sample.timestamp().unwrap())
.with_kind(sample.kind()),
} else {
SampleBuilder::from(sample.clone())
.keyexpr(k.clone())
.into()
};

let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) {
Expand All @@ -323,7 +330,7 @@ impl StorageService {
.put(
stripped_key,
Value::new(sample_to_store.payload().clone())
.with_encoding(sample_to_store.encoding().clone()),
.encoding(sample_to_store.encoding().clone()),
*sample_to_store.timestamp().unwrap(),
)
.await
Expand Down Expand Up @@ -506,13 +513,13 @@ impl StorageService {
match storage.get(stripped_key, q.parameters()).await {
Ok(stored_data) => {
for entry in stored_data {
let Value {
payload, encoding, ..
} = entry.value;
let sample = Sample::new(key.clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
if let Err(e) = q.reply_sample(sample).res().await {
if let Err(e) = q
.reply(key.clone(), entry.value.payload)
.encoding(entry.value.encoding)
.timestamp(entry.timestamp)
.res()
.await
{
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand All @@ -538,13 +545,13 @@ impl StorageService {
match storage.get(stripped_key, q.parameters()).await {
Ok(stored_data) => {
for entry in stored_data {
let Value {
payload, encoding, ..
} = entry.value;
let sample = Sample::new(q.key_expr().clone(), payload)
.with_encoding(encoding)
.with_timestamp(entry.timestamp);
if let Err(e) = q.reply_sample(sample).res().await {
if let Err(e) = q
.reply(q.key_expr().clone(), entry.value.payload)
.encoding(entry.value.encoding)
.timestamp(entry.timestamp)
.res()
.await
{
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand Down Expand Up @@ -692,7 +699,7 @@ fn construct_update(data: String) -> Update {
for slice in result.3 {
payload.push_zslice(slice.to_vec().into());
}
let value = Value::new(payload).with_encoding(result.2);
let value = Value::new(payload).encoding(result.2);
let data = StoredData {
value,
timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap()
Expand Down
Loading

0 comments on commit b330289

Please sign in to comment.