Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample accessors #822

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ async fn main() {
match reply.sample {
Ok(sample) => {
let payload = sample
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> Received ('{}': '{}')",
sample.key_expr.as_str(),
sample.key_expr().as_str(),
payload,
);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() {
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr.as_str(),),
Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr().as_str(),),
Err(err) => {
let payload = err
.payload
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn main() {

let _sub = session
.declare_subscriber(key_expr_ping)
.callback(move |sample| publisher.put(sample.payload).res().unwrap())
.callback(move |sample| publisher.put(sample.payload().clone()).res().unwrap())
.res()
.unwrap();
std::thread::park();
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ async fn main() {
.pull_mode()
.callback(|sample| {
let payload = sample
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.kind(),
sample.key_expr().as_str(),
payload,
);
})
Expand Down
15 changes: 7 additions & 8 deletions examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,20 @@ async fn main() {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
let payload = sample.payload.deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(),payload);
if sample.kind == SampleKind::Delete {
stored.remove(&sample.key_expr.to_string());
} else {
stored.insert(sample.key_expr.to_string(), sample);
}
let payload = sample.payload().deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(),payload);
match sample.kind() {
SampleKind::Delete => stored.remove(&sample.key_expr().to_string()),
SampleKind::Put => stored.insert(sample.key_expr().to_string(), sample),
};
},

query = queryable.recv_async() => {
let query = query.unwrap();
println!(">> [Queryable ] Received Query '{}'", query.selector());
for (stored_name, sample) in stored.iter() {
if query.selector().key_expr.intersects(unsafe {keyexpr::from_str_unchecked(stored_name)}) {
query.reply(sample.key_expr.clone(), sample.payload.clone()).res().await.unwrap();
query.reply(sample.key_expr().clone(), sample.payload().clone()).res().await.unwrap();
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ async fn main() {
println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
let payload = sample
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.kind(),
sample.key_expr().as_str(),
payload
);
}
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
match sample.kind {
match sample.kind() {
SampleKind::Put => println!(
">> [LivelinessSubscriber] New alive token ('{}')",
sample.key_expr.as_str()
sample.key_expr().as_str()
),
SampleKind::Delete => println!(
">> [LivelinessSubscriber] Dropped token ('{}')",
sample.key_expr.as_str()
sample.key_expr().as_str()
),
}
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc<AtomicBool>) {
// on sample received by the Subscriber
sample = sub.recv_async() => {
let sample = sample.unwrap();
let payload = sample.payload.deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
info!("Received data ('{}': '{}')", sample.key_expr, payload);
stored.insert(sample.key_expr.to_string(), sample);
let payload = sample.payload().deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
info!("Received data ('{}': '{}')", sample.key_expr(), payload);
stored.insert(sample.key_expr().to_string(), sample);
},
// on query received by the Queryable
query = queryable.recv_async() => {
Expand Down
29 changes: 14 additions & 15 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn base64_encode(data: &[u8]) -> String {
general_purpose::STANDARD.encode(data)
}

fn payload_to_json(payload: Payload, encoding: &Encoding) -> serde_json::Value {
fn payload_to_json(payload: &Payload, encoding: &Encoding) -> serde_json::Value {
match payload.is_empty() {
// If the value is empty return a JSON null
true => serde_json::Value::Null,
Expand All @@ -81,21 +81,21 @@ fn payload_to_json(payload: Payload, encoding: &Encoding) -> serde_json::Value {
}
}

fn sample_to_json(sample: Sample) -> JSONSample {
fn sample_to_json(sample: &Sample) -> JSONSample {
JSONSample {
key: sample.key_expr.as_str().to_string(),
value: payload_to_json(sample.payload, &sample.encoding),
encoding: sample.encoding.to_string(),
time: sample.timestamp.map(|ts| ts.to_string()),
key: sample.key_expr().as_str().to_string(),
value: payload_to_json(sample.payload(), sample.encoding()),
encoding: sample.encoding().to_string(),
time: sample.timestamp().map(|ts| ts.to_string()),
}
}

fn result_to_json(sample: Result<Sample, Value>) -> JSONSample {
match sample {
Ok(sample) => sample_to_json(sample),
Ok(sample) => sample_to_json(&sample),
Err(err) => JSONSample {
key: "ERROR".into(),
value: payload_to_json(err.payload, &err.encoding),
value: payload_to_json(&err.payload, &err.encoding),
encoding: err.encoding.to_string(),
time: None,
},
Expand Down Expand Up @@ -123,8 +123,8 @@ async fn to_json_response(results: flume::Receiver<Reply>) -> Response {
fn sample_to_html(sample: Sample) -> String {
format!(
"<dt>{}</dt>\n<dd>{}</dd>\n",
sample.key_expr.as_str(),
String::from_utf8_lossy(&sample.payload.contiguous())
sample.key_expr().as_str(),
String::from_utf8_lossy(&sample.payload().contiguous())
)
}

Expand Down Expand Up @@ -159,8 +159,8 @@ async fn to_raw_response(results: flume::Receiver<Reply>) -> Response {
Ok(reply) => match reply.sample {
Ok(sample) => response(
StatusCode::Ok,
Cow::from(&sample.encoding).as_ref(),
String::from_utf8_lossy(&sample.payload.contiguous()).as_ref(),
Cow::from(sample.encoding()).as_ref(),
String::from_utf8_lossy(&sample.payload().contiguous()).as_ref(),
),
Err(value) => response(
StatusCode::Ok,
Expand Down Expand Up @@ -344,12 +344,11 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.unwrap();
loop {
let sample = sub.recv_async().await.unwrap();
let kind = sample.kind;
let json_sample =
serde_json::to_string(&sample_to_json(sample)).unwrap_or("{}".into());
serde_json::to_string(&sample_to_json(&sample)).unwrap_or("{}".into());

match sender
.send(&kind.to_string(), json_sample, None)
.send(&sample.kind().to_string(), json_sample, None)
.timeout(std::time::Duration::new(10, 0))
.await
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,8 @@ impl AlignQueryable {
if entry.is_some() {
let entry = entry.unwrap();
result.push(AlignData::Data(
OwnedKeyExpr::from(entry.key_expr),
(
Value::new(entry.payload).with_encoding(entry.encoding),
each.timestamp,
),
OwnedKeyExpr::from(entry.key_expr().clone()),
(Value::from(entry), each.timestamp),
));
}
}
Expand Down Expand Up @@ -238,10 +235,10 @@ impl AlignQueryable {
Ok(sample) => {
log::trace!(
"[ALIGN QUERYABLE] Received ('{}': '{}')",
sample.key_expr.as_str(),
StringOrBase64::from(sample.payload.clone())
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
);
if let Some(timestamp) = sample.timestamp {
if let Some(timestamp) = sample.timestamp() {
match timestamp.cmp(&logentry.timestamp) {
Ordering::Greater => return None,
Ordering::Less => {
Expand Down
17 changes: 7 additions & 10 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,8 @@ impl Aligner {

for sample in replies {
result.insert(
sample.key_expr.into(),
(
sample.timestamp.unwrap(),
Value::new(sample.payload).with_encoding(sample.encoding),
),
sample.key_expr().clone().into(),
(*sample.timestamp().unwrap(), Value::from(sample)),
);
}
(result, no_err)
Expand Down Expand Up @@ -213,7 +210,7 @@ impl Aligner {
let mut other_intervals: HashMap<u64, u64> = HashMap::new();
// expecting sample.payload to be a vec of intervals with their checksum
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload)) {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
Ok((i, c)) => {
other_intervals.insert(i, c);
}
Expand Down Expand Up @@ -259,7 +256,7 @@ impl Aligner {
let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await;
let mut other_subintervals: HashMap<u64, u64> = HashMap::new();
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload)) {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
Ok((i, c)) => {
other_subintervals.insert(i, c);
}
Expand Down Expand Up @@ -300,7 +297,7 @@ impl Aligner {
let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await;
let mut other_content: HashMap<u64, Vec<LogEntry>> = HashMap::new();
for each in reply_content {
match serde_json::from_str(&StringOrBase64::from(each.payload)) {
match serde_json::from_str(&StringOrBase64::from(each.payload())) {
Ok((i, c)) => {
other_content.insert(i, c);
}
Expand Down Expand Up @@ -340,8 +337,8 @@ impl Aligner {
Ok(sample) => {
log::trace!(
"[ALIGNER] Received ('{}': '{}')",
sample.key_expr.as_str(),
StringOrBase64::from(sample.payload.clone())
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
);
return_val.push(sample);
}
Expand Down
11 changes: 6 additions & 5 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,17 @@ impl Replica {
continue;
}
};
let from = &sample.key_expr.as_str()
let from = &sample.key_expr().as_str()
[Replica::get_digest_key(&self.key_expr, ALIGN_PREFIX).len() + 1..];
log::trace!(
"[DIGEST_SUB] From {} Received {} ('{}': '{}')",
from,
sample.kind,
sample.key_expr.as_str(),
StringOrBase64::from(sample.payload.clone())
sample.kind(),
sample.key_expr().as_str(),
StringOrBase64::from(sample.payload())
);
let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload)) {
let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload()))
{
Ok(digest) => digest,
Err(e) => {
log::error!("[DIGEST_SUB] Error in decoding the digest: {}", e);
Expand Down
Loading