Skip to content

Commit

Permalink
Merge pull request #822 from DenisBiryukov91/release/protocol_changes…
Browse files Browse the repository at this point in the history
…/sample_accessors

Sample accessors
  • Loading branch information
milyin authored Mar 15, 2024
2 parents 622b230 + d73da7d commit 2e9db3f
Show file tree
Hide file tree
Showing 31 changed files with 235 additions and 209 deletions.
4 changes: 2 additions & 2 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ async fn main() {
match reply.sample {
Ok(sample) => {
let payload = sample
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> Received ('{}': '{}')",
sample.key_expr.as_str(),
sample.key_expr().as_str(),
payload,
);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() {
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr.as_str(),),
Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr().as_str(),),
Err(err) => {
let payload = err
.payload
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn main() {

let _sub = session
.declare_subscriber(key_expr_ping)
.callback(move |sample| publisher.put(sample.payload).res().unwrap())
.callback(move |sample| publisher.put(sample.payload().clone()).res().unwrap())
.res()
.unwrap();
std::thread::park();
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ async fn main() {
.pull_mode()
.callback(|sample| {
let payload = sample
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.kind(),
sample.key_expr().as_str(),
payload,
);
})
Expand Down
15 changes: 7 additions & 8 deletions examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,20 @@ async fn main() {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
let payload = sample.payload.deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(),payload);
if sample.kind == SampleKind::Delete {
stored.remove(&sample.key_expr.to_string());
} else {
stored.insert(sample.key_expr.to_string(), sample);
}
let payload = sample.payload().deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(),payload);
match sample.kind() {
SampleKind::Delete => stored.remove(&sample.key_expr().to_string()),
SampleKind::Put => stored.insert(sample.key_expr().to_string(), sample),
};
},

query = queryable.recv_async() => {
let query = query.unwrap();
println!(">> [Queryable ] Received Query '{}'", query.selector());
for (stored_name, sample) in stored.iter() {
if query.selector().key_expr.intersects(unsafe {keyexpr::from_str_unchecked(stored_name)}) {
query.reply(sample.key_expr.clone(), sample.payload.clone()).res().await.unwrap();
query.reply(sample.key_expr().clone(), sample.payload().clone()).res().await.unwrap();
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ async fn main() {
println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
let payload = sample
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.kind(),
sample.key_expr().as_str(),
payload
);
}
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
match sample.kind {
match sample.kind() {
SampleKind::Put => println!(
">> [LivelinessSubscriber] New alive token ('{}')",
sample.key_expr.as_str()
sample.key_expr().as_str()
),
SampleKind::Delete => println!(
">> [LivelinessSubscriber] Dropped token ('{}')",
sample.key_expr.as_str()
sample.key_expr().as_str()
),
}
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc<AtomicBool>) {
// on sample received by the Subscriber
sample = sub.recv_async() => {
let sample = sample.unwrap();
let payload = sample.payload.deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
info!("Received data ('{}': '{}')", sample.key_expr, payload);
stored.insert(sample.key_expr.to_string(), sample);
let payload = sample.payload().deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
info!("Received data ('{}': '{}')", sample.key_expr(), payload);
stored.insert(sample.key_expr().to_string(), sample);
},
// on query received by the Queryable
query = queryable.recv_async() => {
Expand Down
29 changes: 14 additions & 15 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn base64_encode(data: &[u8]) -> String {
general_purpose::STANDARD.encode(data)
}

fn payload_to_json(payload: Payload, encoding: &Encoding) -> serde_json::Value {
fn payload_to_json(payload: &Payload, encoding: &Encoding) -> serde_json::Value {
match payload.is_empty() {
// If the value is empty return a JSON null
true => serde_json::Value::Null,
Expand All @@ -81,21 +81,21 @@ fn payload_to_json(payload: Payload, encoding: &Encoding) -> serde_json::Value {
}
}

fn sample_to_json(sample: Sample) -> JSONSample {
fn sample_to_json(sample: &Sample) -> JSONSample {
JSONSample {
key: sample.key_expr.as_str().to_string(),
value: payload_to_json(sample.payload, &sample.encoding),
encoding: sample.encoding.to_string(),
time: sample.timestamp.map(|ts| ts.to_string()),
key: sample.key_expr().as_str().to_string(),
value: payload_to_json(sample.payload(), sample.encoding()),
encoding: sample.encoding().to_string(),
time: sample.timestamp().map(|ts| ts.to_string()),
}
}

fn result_to_json(sample: Result<Sample, Value>) -> JSONSample {
match sample {
Ok(sample) => sample_to_json(sample),
Ok(sample) => sample_to_json(&sample),
Err(err) => JSONSample {
key: "ERROR".into(),
value: payload_to_json(err.payload, &err.encoding),
value: payload_to_json(&err.payload, &err.encoding),
encoding: err.encoding.to_string(),
time: None,
},
Expand Down Expand Up @@ -123,8 +123,8 @@ async fn to_json_response(results: flume::Receiver<Reply>) -> Response {
fn sample_to_html(sample: Sample) -> String {
format!(
"<dt>{}</dt>\n<dd>{}</dd>\n",
sample.key_expr.as_str(),
String::from_utf8_lossy(&sample.payload.contiguous())
sample.key_expr().as_str(),
String::from_utf8_lossy(&sample.payload().contiguous())
)
}

Expand Down Expand Up @@ -159,8 +159,8 @@ async fn to_raw_response(results: flume::Receiver<Reply>) -> Response {
Ok(reply) => match reply.sample {
Ok(sample) => response(
StatusCode::Ok,
Cow::from(&sample.encoding).as_ref(),
String::from_utf8_lossy(&sample.payload.contiguous()).as_ref(),
Cow::from(sample.encoding()).as_ref(),
String::from_utf8_lossy(&sample.payload().contiguous()).as_ref(),
),
Err(value) => response(
StatusCode::Ok,
Expand Down Expand Up @@ -344,12 +344,11 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.unwrap();
loop {
let sample = sub.recv_async().await.unwrap();
let kind = sample.kind;
let json_sample =
serde_json::to_string(&sample_to_json(sample)).unwrap_or("{}".into());
serde_json::to_string(&sample_to_json(&sample)).unwrap_or("{}".into());

match sender
.send(&kind.to_string(), json_sample, None)
.send(&sample.kind().to_string(), json_sample, None)
.timeout(std::time::Duration::new(10, 0))
.await
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,8 @@ impl AlignQueryable {
if entry.is_some() {
let entry = entry.unwrap();
result.push(AlignData::Data(
OwnedKeyExpr::from(entry.key_expr),
(
Value::new(entry.payload).with_encoding(entry.encoding),
each.timestamp,
),
OwnedKeyExpr::from(entry.key_expr().clone()),
(Value::from(entry), each.timestamp),
));
}
}
Expand Down Expand Up @@ -238,10 +235,10 @@ impl AlignQueryable {
Ok(sample) => {
log::trace!(
"[ALIGN QUERYABLE] Received ('{}': '{}')",
sample.key_expr.as_str(),
StringOrBase64::from(sample.payload.clone())
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
);
if let Some(timestamp) = sample.timestamp {
if let Some(timestamp) = sample.timestamp() {
match timestamp.cmp(&logentry.timestamp) {
Ordering::Greater => return None,
Ordering::Less => {
Expand Down
17 changes: 7 additions & 10 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,8 @@ impl Aligner {

for sample in replies {
result.insert(
sample.key_expr.into(),
(
sample.timestamp.unwrap(),
Value::new(sample.payload).with_encoding(sample.encoding),
),
sample.key_expr().clone().into(),
(*sample.timestamp().unwrap(), Value::from(sample)),
);
}
(result, no_err)
Expand Down Expand Up @@ -213,7 +210,7 @@ impl Aligner {
let mut other_intervals: HashMap<u64, u64> = HashMap::new();
// expecting sample.payload to be a vec of intervals with their checksum
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload)) {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
Ok((i, c)) => {
other_intervals.insert(i, c);
}
Expand Down Expand Up @@ -259,7 +256,7 @@ impl Aligner {
let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await;
let mut other_subintervals: HashMap<u64, u64> = HashMap::new();
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload)) {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
Ok((i, c)) => {
other_subintervals.insert(i, c);
}
Expand Down Expand Up @@ -300,7 +297,7 @@ impl Aligner {
let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await;
let mut other_content: HashMap<u64, Vec<LogEntry>> = HashMap::new();
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload)) {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
Ok((i, c)) => {
other_content.insert(i, c);
}
Expand Down Expand Up @@ -340,8 +337,8 @@ impl Aligner {
Ok(sample) => {
log::trace!(
"[ALIGNER] Received ('{}': '{}')",
sample.key_expr.as_str(),
StringOrBase64::from(sample.payload.clone())
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
);
return_val.push(sample);
}
Expand Down
11 changes: 6 additions & 5 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,17 @@ impl Replica {
continue;
}
};
let from = &sample.key_expr.as_str()
let from = &sample.key_expr().as_str()
[Replica::get_digest_key(&self.key_expr, ALIGN_PREFIX).len() + 1..];
log::trace!(
"[DIGEST_SUB] From {} Received {} ('{}': '{}')",
from,
sample.kind,
sample.key_expr.as_str(),
StringOrBase64::from(sample.payload.clone())
sample.kind(),
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
);
let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload)) {
let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload()))
{
Ok(digest) => digest,
Err(e) => {
log::error!("[DIGEST_SUB] Error in decoding the digest: {}", e);
Expand Down
Loading

0 comments on commit 2e9db3f

Please sign in to comment.