From 55119a5001f0080bfd78a91733760a45a959674c Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 13 Mar 2024 15:07:59 +0100 Subject: [PATCH 1/4] make Sample ields pub(crate) provide accessors for external users --- examples/examples/z_get.rs | 4 +- examples/examples/z_pong.rs | 2 +- examples/examples/z_pull.rs | 6 +- examples/examples/z_storage.rs | 15 ++- examples/examples/z_sub.rs | 4 +- plugins/zenoh-plugin-example/src/lib.rs | 6 +- plugins/zenoh-plugin-rest/src/lib.rs | 22 ++--- .../src/replica/align_queryable.rs | 10 +- .../src/replica/aligner.rs | 16 ++-- .../src/replica/mod.rs | 11 ++- .../src/replica/storage.rs | 84 +++++++++-------- .../tests/operations.rs | 8 +- .../tests/wildcard.rs | 20 ++-- zenoh-ext/examples/z_query_sub.rs | 4 +- zenoh-ext/src/group.rs | 4 +- zenoh-ext/src/publication_cache.rs | 8 +- zenoh-ext/src/querying_subscriber.rs | 4 +- zenoh/src/liveliness.rs | 14 +-- zenoh/src/payload.rs | 4 +- zenoh/src/sample.rs | 93 +++++++++++++------ zenoh/src/subscriber.rs | 12 +-- zenoh/tests/attachments.rs | 4 +- zenoh/tests/events.rs | 20 ++-- zenoh/tests/interceptors.rs | 6 +- zenoh/tests/liveliness.rs | 8 +- zenoh/tests/qos.rs | 4 +- zenoh/tests/routing.rs | 4 +- zenoh/tests/session.rs | 10 +- zenoh/tests/unicity.rs | 6 +- 29 files changed, 224 insertions(+), 189 deletions(-) diff --git a/examples/examples/z_get.rs b/examples/examples/z_get.rs index 0fff95c250..dce74d367b 100644 --- a/examples/examples/z_get.rs +++ b/examples/examples/z_get.rs @@ -41,12 +41,12 @@ async fn main() { match reply.sample { Ok(sample) => { let payload = sample - .payload + .payload() .deserialize::() .unwrap_or_else(|e| format!("{}", e)); println!( ">> Received ('{}': '{}')", - sample.key_expr.as_str(), + sample.key_expr().as_str(), payload, ); } diff --git a/examples/examples/z_pong.rs b/examples/examples/z_pong.rs index 1f06c7abb9..6c333cbbeb 100644 --- a/examples/examples/z_pong.rs +++ b/examples/examples/z_pong.rs @@ -41,7 +41,7 @@ fn main() { let _sub = session .declare_subscriber(key_expr_ping) - .callback(move |sample| publisher.put(sample.payload).res().unwrap()) + .callback(move |sample| publisher.put(sample.payload().clone()).res().unwrap()) .res() .unwrap(); for _ in stdin().bytes().take_while(|b| !matches!(b, Ok(b'q'))) {} diff --git a/examples/examples/z_pull.rs b/examples/examples/z_pull.rs index ed2a90f1a6..5ba4f413bd 100644 --- a/examples/examples/z_pull.rs +++ b/examples/examples/z_pull.rs @@ -45,13 +45,13 @@ async fn main() { let subs = async { while let Ok(sample) = subscriber.recv_async().await { let payload = sample - .payload + .payload() .deserialize::() .unwrap_or_else(|e| format!("{}", e)); println!( ">> [Subscriber] Received {} ('{}': '{}')", - sample.kind, - sample.key_expr.as_str(), + sample.kind(), + sample.key_expr().as_str(), payload, ); } diff --git a/examples/examples/z_storage.rs b/examples/examples/z_storage.rs index 857181751b..ab62785f18 100644 --- a/examples/examples/z_storage.rs +++ b/examples/examples/z_storage.rs @@ -53,13 +53,12 @@ async fn main() { select!( sample = subscriber.recv_async() => { let sample = sample.unwrap(); - let payload = sample.payload.deserialize::().unwrap_or_else(|e| format!("{}", e)); - println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(),payload); - if sample.kind == SampleKind::Delete { - stored.remove(&sample.key_expr.to_string()); - } else { - stored.insert(sample.key_expr.to_string(), sample); - } + let payload = sample.payload().deserialize::().unwrap_or_else(|e| format!("{}", e)); + println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(),payload); + match sample.kind() { + SampleKind::Delete => stored.remove(&sample.key_expr().to_string()), + SampleKind::Put => stored.insert(sample.key_expr().to_string(), sample), + }; }, query = queryable.recv_async() => { @@ -67,7 +66,7 @@ async fn main() { println!(">> [Queryable ] Received Query '{}'", query.selector()); for (stored_name, sample) in stored.iter() { if query.selector().key_expr.intersects(unsafe {keyexpr::from_str_unchecked(stored_name)}) { - query.reply(sample.key_expr.clone(), sample.payload.clone()).res().await.unwrap(); + query.reply(sample.key_expr().clone(), sample.payload().clone()).res().await.unwrap(); } } }, diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index 195e2f7640..f2d337a7cf 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -46,8 +46,8 @@ async fn main() { select!( sample = subscriber.recv_async() => { let sample = sample.unwrap(); - let payload = sample.payload.deserialize::().unwrap_or_else(|e| format!("{}", e)); - println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(), payload); + let payload = sample.payload().deserialize::().unwrap_or_else(|e| format!("{}", e)); + println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(), payload); }, _ = stdin.read_exact(&mut input).fuse() => { match input[0] { diff --git a/plugins/zenoh-plugin-example/src/lib.rs b/plugins/zenoh-plugin-example/src/lib.rs index 12cc6ffa84..04f49b4739 100644 --- a/plugins/zenoh-plugin-example/src/lib.rs +++ b/plugins/zenoh-plugin-example/src/lib.rs @@ -164,9 +164,9 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc) { // on sample received by the Subscriber sample = sub.recv_async() => { let sample = sample.unwrap(); - let payload = sample.payload.deserialize::().unwrap_or_else(|e| format!("{}", e)); - info!("Received data ('{}': '{}')", sample.key_expr, payload); - stored.insert(sample.key_expr.to_string(), sample); + let payload = sample.payload().deserialize::().unwrap_or_else(|e| format!("{}", e)); + info!("Received data ('{}': '{}')", sample.key_expr(), payload); + stored.insert(sample.key_expr().to_string(), sample); }, // on query received by the Queryable query = queryable.recv_async() => { diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 1a99d7b5a4..c689bc7d7d 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -46,7 +46,7 @@ lazy_static::lazy_static! { } const RAW_KEY: &str = "_raw"; -fn payload_to_json(payload: Payload) -> String { +fn payload_to_json(payload: &Payload) -> String { payload .deserialize::() .unwrap_or_else(|_| format!(r#""{}""#, b64_std_engine.encode(payload.contiguous()))) @@ -55,10 +55,10 @@ fn payload_to_json(payload: Payload) -> String { fn sample_to_json(sample: Sample) -> String { format!( r#"{{ "key": "{}", "value": {}, "encoding": "{}", "time": "{}" }}"#, - sample.key_expr.as_str(), - payload_to_json(sample.payload), - sample.encoding, - if let Some(ts) = sample.timestamp { + sample.key_expr().as_str(), + payload_to_json(sample.payload()), + sample.encoding(), + if let Some(ts) = sample.timestamp() { ts.to_string() } else { "None".to_string() @@ -72,7 +72,7 @@ fn result_to_json(sample: Result) -> String { Err(err) => { format!( r#"{{ "key": "ERROR", "value": {}, "encoding": "{}"}}"#, - payload_to_json(err.payload), + payload_to_json(&err.payload), err.encoding, ) } @@ -100,8 +100,8 @@ async fn to_json_response(results: flume::Receiver) -> Response { fn sample_to_html(sample: Sample) -> String { format!( "
{}
\n
{}
\n", - sample.key_expr.as_str(), - String::from_utf8_lossy(&sample.payload.contiguous()) + sample.key_expr().as_str(), + String::from_utf8_lossy(&sample.payload().contiguous()) ) } @@ -136,8 +136,8 @@ async fn to_raw_response(results: flume::Receiver) -> Response { Ok(reply) => match reply.sample { Ok(sample) => response( StatusCode::Ok, - Cow::from(&sample.encoding).as_ref(), - String::from_utf8_lossy(&sample.payload.contiguous()).as_ref(), + Cow::from(sample.encoding()).as_ref(), + String::from_utf8_lossy(&sample.payload().contiguous()).as_ref(), ), Err(value) => response( StatusCode::Ok, @@ -322,7 +322,7 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result { log::trace!( "[ALIGN QUERYABLE] Received ('{}': '{}')", - sample.key_expr.as_str(), - StringOrBase64::from(sample.payload.clone()) + sample.key_expr().as_str(), + StringOrBase64::from(sample.payload()) ); - if let Some(timestamp) = sample.timestamp { + if let Some(timestamp) = sample.timestamp() { match timestamp.cmp(&logentry.timestamp) { Ordering::Greater => return None, Ordering::Less => { diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index 03c6fa949a..b11a94e4f2 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -141,10 +141,10 @@ impl Aligner { for sample in replies { result.insert( - sample.key_expr.into(), + sample.key_expr().clone().into(), ( - sample.timestamp.unwrap(), - Value::new(sample.payload).with_encoding(sample.encoding), + sample.timestamp().unwrap().clone(), + Value::from(sample), ), ); } @@ -213,7 +213,7 @@ impl Aligner { let mut other_intervals: HashMap = HashMap::new(); // expecting sample.payload to be a vec of intervals with their checksum for each in reply_content { - match serde_json::from_str(&StringOrBase64::from(each.payload)) { + match serde_json::from_str(&StringOrBase64::from(each.payload())) { Ok((i, c)) => { other_intervals.insert(i, c); } @@ -259,7 +259,7 @@ impl Aligner { let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await; let mut other_subintervals: HashMap = HashMap::new(); for each in reply_content { - match serde_json::from_str(&StringOrBase64::from(each.payload)) { + match serde_json::from_str(&StringOrBase64::from(each.payload())) { Ok((i, c)) => { other_subintervals.insert(i, c); } @@ -300,7 +300,7 @@ impl Aligner { let (reply_content, mut no_err) = self.perform_query(other_rep, properties).await; let mut other_content: HashMap> = HashMap::new(); for each in reply_content { - match serde_json::from_str(&StringOrBase64::from(each.payload)) { + match serde_json::from_str(&StringOrBase64::from(each.payload())) { Ok((i, c)) => { other_content.insert(i, c); } @@ -340,8 +340,8 @@ impl Aligner { Ok(sample) => { log::trace!( "[ALIGNER] Received ('{}': '{}')", - sample.key_expr.as_str(), - StringOrBase64::from(sample.payload.clone()) + sample.key_expr().as_str(), + StringOrBase64::from(sample.payload()) ); return_val.push(sample); } diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index 78254213f7..5dda032029 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -220,16 +220,17 @@ impl Replica { continue; } }; - let from = &sample.key_expr.as_str() + let from = &sample.key_expr().as_str() [Replica::get_digest_key(&self.key_expr, ALIGN_PREFIX).len() + 1..]; log::trace!( "[DIGEST_SUB] From {} Received {} ('{}': '{}')", from, - sample.kind, - sample.key_expr.as_str(), - StringOrBase64::from(sample.payload.clone()) + sample.kind(), + sample.key_expr().as_str(), + StringOrBase64::from(sample.payload()) ); - let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload)) { + let digest: Digest = match serde_json::from_str(&StringOrBase64::from(sample.payload())) + { Ok(digest) => digest, Err(e) => { log::error!("[DIGEST_SUB] Error in decoding the digest: {}", e); diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 6b48895612..895f2e1914 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -179,7 +179,7 @@ impl StorageService { }; // log error if the sample is not timestamped // This is to reduce down the line inconsistencies of having duplicate samples stored - if sample.get_timestamp().is_none() { + if sample.timestamp().is_none() { log::error!("Sample {:?} is not timestamped. Please timestamp samples meant for replicated storage.", sample); } else { @@ -271,28 +271,28 @@ impl StorageService { }; // if wildcard, update wildcard_updates - if sample.key_expr.is_wild() { + if sample.key_expr().is_wild() { self.register_wildcard_update(sample.clone()).await; } - let matching_keys = if sample.key_expr.is_wild() { - self.get_matching_keys(&sample.key_expr).await + let matching_keys = if sample.key_expr().is_wild() { + self.get_matching_keys(&sample.key_expr()).await } else { - vec![sample.key_expr.clone().into()] + vec![sample.key_expr().clone().into()] }; log::trace!( "The list of keys matching `{}` is : {:?}", - sample.key_expr, + sample.key_expr(), matching_keys ); for k in matching_keys { if !self - .is_deleted(&k.clone(), sample.get_timestamp().unwrap()) + .is_deleted(&k.clone(), sample.timestamp().unwrap()) .await && (self.capability.history.eq(&History::All) || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, sample.get_timestamp().unwrap()).await)) + && self.is_latest(&k, sample.timestamp().unwrap()).await)) { log::trace!( "Sample `{:?}` identified as neded processing for key {}", @@ -302,30 +302,30 @@ impl StorageService { // there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage. // get the relevant wild card entry and use that value and timestamp to update the storage let sample_to_store = match self - .ovderriding_wild_update(&k, sample.get_timestamp().unwrap()) + .ovderriding_wild_update(&k, sample.timestamp().unwrap()) .await { Some(overriding_update) => { let Value { payload, encoding, .. } = overriding_update.data.value; - let mut sample_to_store = Sample::new(KeyExpr::from(k.clone()), payload) + let sample_to_store = Sample::new(KeyExpr::from(k.clone()), payload) .with_encoding(encoding) - .with_timestamp(overriding_update.data.timestamp); - sample_to_store.kind = overriding_update.kind; + .with_timestamp(overriding_update.data.timestamp) + .with_kind(overriding_update.kind); sample_to_store } None => { - let mut sample_to_store = - Sample::new(KeyExpr::from(k.clone()), sample.payload.clone()) - .with_encoding(sample.encoding.clone()) - .with_timestamp(sample.timestamp.unwrap()); - sample_to_store.kind = sample.kind; + let sample_to_store = + Sample::new(KeyExpr::from(k.clone()), sample.payload().clone()) + .with_encoding(sample.encoding().clone()) + .with_timestamp(sample.timestamp().unwrap().clone()) + .with_kind(sample.kind()); sample_to_store } }; - let stripped_key = match self.strip_prefix(&sample_to_store.key_expr) { + let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { log::error!("{}", e); @@ -333,24 +333,25 @@ impl StorageService { } }; let mut storage = self.storage.lock().await; - let result = if sample.kind == SampleKind::Put { - storage - .put( - stripped_key, - Value::new(sample_to_store.payload.clone()) - .with_encoding(sample_to_store.encoding.clone()), - sample_to_store.timestamp.unwrap(), - ) - .await - } else if sample.kind == SampleKind::Delete { - // register a tombstone - self.mark_tombstone(&k, sample_to_store.timestamp.unwrap()) - .await; - storage - .delete(stripped_key, sample_to_store.timestamp.unwrap()) - .await - } else { - Err("sample kind not implemented".into()) + let result = match sample.kind() { + SampleKind::Put => { + storage + .put( + stripped_key, + Value::new(sample_to_store.payload().clone()) + .with_encoding(sample_to_store.encoding().clone()), + sample_to_store.timestamp().unwrap().clone(), + ) + .await + } + SampleKind::Delete => { + // register a tombstone + self.mark_tombstone(&k, sample_to_store.timestamp().unwrap().clone()) + .await; + storage + .delete(stripped_key, sample_to_store.timestamp().unwrap().clone()) + .await + } }; drop(storage); if self.replication.is_some() @@ -362,7 +363,7 @@ impl StorageService { .as_ref() .unwrap() .log_propagation - .send((k.clone(), *sample_to_store.get_timestamp().unwrap())); + .send((k.clone(), sample_to_store.timestamp().unwrap().clone())); match sending { Ok(_) => (), Err(e) => { @@ -395,15 +396,16 @@ impl StorageService { async fn register_wildcard_update(&self, sample: Sample) { // @TODO: change into a better store that does incremental writes - let key = sample.clone().key_expr; + let key = sample.key_expr().clone(); let mut wildcards = self.wildcard_updates.write().await; + let timestamp = sample.timestamp().unwrap().clone(); wildcards.insert( &key, Update { - kind: sample.kind, + kind: sample.kind(), data: StoredData { - value: Value::new(sample.payload).with_encoding(sample.encoding), - timestamp: sample.timestamp.unwrap(), + value: Value::from(sample), + timestamp, }, }, ); diff --git a/plugins/zenoh-plugin-storage-manager/tests/operations.rs b/plugins/zenoh-plugin-storage-manager/tests/operations.rs index 81029e2fa7..36162f01c2 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/operations.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/operations.rs @@ -101,7 +101,7 @@ async fn test_updates_in_order() { // expects exactly one sample let data = get_data(&session, "operation/test/a").await; assert_eq!(data.len(), 1); - assert_eq!(StringOrBase64::from(data[0].payload.clone()).as_str(), "1"); + assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "1"); put_data( &session, @@ -117,7 +117,7 @@ async fn test_updates_in_order() { // expects exactly one sample let data = get_data(&session, "operation/test/b").await; assert_eq!(data.len(), 1); - assert_eq!(StringOrBase64::from(data[0].payload.clone()).as_str(), "2"); + assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2"); delete_data( &session, @@ -136,8 +136,8 @@ async fn test_updates_in_order() { // expects exactly one sample let data = get_data(&session, "operation/test/b").await; assert_eq!(data.len(), 1); - assert_eq!(StringOrBase64::from(data[0].payload.clone()).as_str(), "2"); - assert_eq!(data[0].key_expr.as_str(), "operation/test/b"); + assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2"); + assert_eq!(data[0].key_expr().as_str(), "operation/test/b"); drop(storage); } diff --git a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs index 4808ec246f..5a71dc23f0 100644 --- a/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs +++ b/plugins/zenoh-plugin-storage-manager/tests/wildcard.rs @@ -117,8 +117,8 @@ async fn test_wild_card_in_order() { // expected single entry let data = get_data(&session, "wild/test/*").await; assert_eq!(data.len(), 1); - assert_eq!(data[0].key_expr.as_str(), "wild/test/a"); - assert_eq!(StringOrBase64::from(data[0].payload.clone()).as_str(), "2"); + assert_eq!(data[0].key_expr().as_str(), "wild/test/a"); + assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "2"); put_data( &session, @@ -134,10 +134,10 @@ async fn test_wild_card_in_order() { // expected two entries let data = get_data(&session, "wild/test/*").await; assert_eq!(data.len(), 2); - assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr.as_str())); - assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr.as_str())); - assert!(["2", "3"].contains(&StringOrBase64::from(data[0].payload.clone()).as_str())); - assert!(["2", "3"].contains(&StringOrBase64::from(data[1].payload.clone()).as_str())); + assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr().as_str())); + assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr().as_str())); + assert!(["2", "3"].contains(&StringOrBase64::from(data[0].payload()).as_str())); + assert!(["2", "3"].contains(&StringOrBase64::from(data[1].payload()).as_str())); put_data( &session, @@ -153,10 +153,10 @@ async fn test_wild_card_in_order() { // expected two entries let data = get_data(&session, "wild/test/*").await; assert_eq!(data.len(), 2); - assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr.as_str())); - assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr.as_str())); - assert_eq!(StringOrBase64::from(data[0].payload.clone()).as_str(), "4"); - assert_eq!(StringOrBase64::from(data[1].payload.clone()).as_str(), "4"); + assert!(["wild/test/a", "wild/test/b"].contains(&data[0].key_expr().as_str())); + assert!(["wild/test/a", "wild/test/b"].contains(&data[1].key_expr().as_str())); + assert_eq!(StringOrBase64::from(data[0].payload()).as_str(), "4"); + assert_eq!(StringOrBase64::from(data[1].payload()).as_str(), "4"); delete_data( &session, diff --git a/zenoh-ext/examples/z_query_sub.rs b/zenoh-ext/examples/z_query_sub.rs index 80efc0854f..8c1307d712 100644 --- a/zenoh-ext/examples/z_query_sub.rs +++ b/zenoh-ext/examples/z_query_sub.rs @@ -60,8 +60,8 @@ async fn main() { select!( sample = subscriber.recv_async() => { let sample = sample.unwrap(); - let payload = sample.payload.deserialize::().unwrap_or_else(|e| format!("{}", e)); - println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(), payload); + let payload = sample.payload().deserialize::().unwrap_or_else(|e| format!("{}", e)); + println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(), payload); }, _ = stdin.read_exact(&mut input).fuse() => { diff --git a/zenoh-ext/src/group.rs b/zenoh-ext/src/group.rs index 75a435e8f4..41007d8b87 100644 --- a/zenoh-ext/src/group.rs +++ b/zenoh-ext/src/group.rs @@ -248,7 +248,7 @@ async fn net_event_handler(z: Arc, state: Arc) { .await .unwrap(); while let Ok(s) = sub.recv_async().await { - match bincode::deserialize::(&(s.payload.contiguous())) { + match bincode::deserialize::(&(s.payload().contiguous())) { Ok(evt) => match evt { GroupNetEvent::Join(je) => { log::debug!("Member join: {:?}", &je.member); @@ -308,7 +308,7 @@ async fn net_event_handler(z: Arc, state: Arc) { match reply.sample { Ok(sample) => { match bincode::deserialize::( - &sample.payload.contiguous(), + &sample.payload().contiguous(), ) { Ok(m) => { let mut expiry = Instant::now(); diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 1c9a286800..85cb96cce2 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -180,9 +180,9 @@ impl<'a> PublicationCache<'a> { sample = sub_recv.recv_async() => { if let Ok(sample) = sample { let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix { - prefix.join(&sample.key_expr).unwrap().into() + prefix.join(sample.key_expr()).unwrap().into() } else { - sample.key_expr.clone() + sample.key_expr().clone() }; if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) { @@ -207,7 +207,7 @@ impl<'a> PublicationCache<'a> { if !query.selector().key_expr.as_str().contains('*') { if let Some(queue) = cache.get(query.selector().key_expr.as_keyexpr()) { for sample in queue { - if let (Ok(Some(time_range)), Some(timestamp)) = (query.selector().time_range(), sample.timestamp) { + if let (Ok(Some(time_range)), Some(timestamp)) = (query.selector().time_range(), sample.timestamp()) { if !time_range.contains(timestamp.get_time().to_system_time()){ continue; } @@ -221,7 +221,7 @@ impl<'a> PublicationCache<'a> { for (key_expr, queue) in cache.iter() { if query.selector().key_expr.intersects(unsafe{ keyexpr::from_str_unchecked(key_expr) }) { for sample in queue { - if let (Ok(Some(time_range)), Some(timestamp)) = (query.selector().time_range(), sample.timestamp) { + if let (Ok(Some(time_range)), Some(timestamp)) = (query.selector().time_range(), sample.timestamp()) { if !time_range.contains(timestamp.get_time().to_system_time()){ continue; } diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 2c89ec82ae..470f795f2b 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -304,8 +304,8 @@ impl MergeQueue { } fn push(&mut self, sample: Sample) { - if let Some(ts) = sample.timestamp { - self.timstamped.entry(ts).or_insert(sample); + if let Some(ts) = sample.timestamp() { + self.timstamped.entry(ts.clone()).or_insert(sample); } else { self.untimestamped.push_back(sample); } diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 9f14866363..d4229db4cc 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -131,9 +131,9 @@ impl<'a> Liveliness<'a> { /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session.liveliness().declare_subscriber("key/expression").res().await.unwrap(); /// while let Ok(sample) = subscriber.recv_async().await { - /// match sample.kind { - /// SampleKind::Put => println!("New liveliness: {}", sample.key_expr), - /// SampleKind::Delete => println!("Lost liveliness: {}", sample.key_expr), + /// match sample.kind() { + /// SampleKind::Put => println!("New liveliness: {}", sample.key_expr()), + /// SampleKind::Delete => println!("Lost liveliness: {}", sample.key_expr()), /// } /// } /// # }) @@ -169,7 +169,7 @@ impl<'a> Liveliness<'a> { /// let replies = session.liveliness().get("key/expression").res().await.unwrap(); /// while let Ok(reply) = replies.recv_async().await { /// if let Ok(sample) = reply.sample { - /// println!(">> Liveliness token {}", sample.key_expr); + /// println!(">> Liveliness token {}", sample.key_expr()); /// } /// } /// # }) @@ -425,7 +425,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") - /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr, sample.payload); }) + /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) /// .res() /// .await /// .unwrap(); @@ -499,7 +499,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { /// .await /// .unwrap(); /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {} {:?}", sample.key_expr, sample.payload); + /// println!("Received: {} {:?}", sample.key_expr(), sample.payload()); /// } /// # }) /// ``` @@ -593,7 +593,7 @@ where /// .unwrap(); /// while let Ok(token) = tokens.recv_async().await { /// match token.sample { -/// Ok(sample) => println!("Alive token ('{}')", sample.key_expr.as_str()), +/// Ok(sample) => println!("Alive token ('{}')", sample.key_expr().as_str()), /// Err(err) => println!("Received (ERROR: '{:?}')", err.payload), /// } /// } diff --git a/zenoh/src/payload.rs b/zenoh/src/payload.rs index f499db50da..62f40f9294 100644 --- a/zenoh/src/payload.rs +++ b/zenoh/src/payload.rs @@ -579,8 +579,8 @@ impl std::fmt::Display for StringOrBase64 { } } -impl From for StringOrBase64 { - fn from(v: Payload) -> Self { +impl From<&Payload> for StringOrBase64 { + fn from(v: &Payload) -> Self { use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine}; match v.deserialize::() { Ok(s) => StringOrBase64::String(s), diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 9c68b460d9..1ac04313ab 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -363,38 +363,18 @@ pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; #[non_exhaustive] #[derive(Clone, Debug)] pub struct Sample { - /// The key expression on which this Sample was published. - pub key_expr: KeyExpr<'static>, - /// The payload of this Sample. - pub payload: Payload, - /// The kind of this Sample. - pub kind: SampleKind, - /// The encoding of this sample - pub encoding: Encoding, - /// The [`Timestamp`] of this Sample. - pub timestamp: Option, - /// Quality of service settings this sample was sent with. - pub qos: QoS, + pub(crate) key_expr: KeyExpr<'static>, + pub(crate) payload: Payload, + pub(crate) kind: SampleKind, + pub(crate) encoding: Encoding, + pub(crate) timestamp: Option, + pub(crate) qos: QoS, #[cfg(feature = "unstable")] - ///
- /// 🔬 - /// This API has been marked as unstable: it works as advertised, but we may change it in a future release. - /// To use it, you must enable zenoh's unstable feature flag. - ///
- /// - /// Infos on the source of this Sample. - pub source_info: SourceInfo, + pub(crate) source_info: SourceInfo, #[cfg(feature = "unstable")] - ///
- /// 🔬 - /// This API has been marked as unstable: it works as advertised, but we may change it in a future release. - /// To use it, you must enable zenoh's unstable feature flag. - ///
- /// - /// A map of key-value pairs, where each key and value are byte-slices. - pub attachment: Option, + pub(crate) attachment: Option, } impl Sample { @@ -471,19 +451,67 @@ impl Sample { self } + /// Gets the key expression on which this Sample was published. + #[inline] + pub fn key_expr(&self) -> &KeyExpr<'static> { + &self.key_expr + } + + /// Gets the payload of this Sample. + #[inline] + pub fn payload(&self) -> &Payload { + &self.payload + } + + /// Gets the kind of this Sample. + #[inline] + pub fn kind(&self) -> SampleKind { + self.kind + } + + /// Sets the kind of this Sample. + #[inline] + #[doc(hidden)] + #[zenoh_macros::unstable] + pub fn with_kind(mut self, kind: SampleKind) -> Self { + self.kind = kind; + self + } + + /// Gets the encoding of this sample + #[inline] + pub fn encoding(&self) -> &Encoding { + &self.encoding + } + /// Gets the timestamp of this Sample. #[inline] - pub fn get_timestamp(&self) -> Option<&Timestamp> { + pub fn timestamp(&self) -> Option<&Timestamp> { self.timestamp.as_ref() } /// Sets the timestamp of this Sample. #[inline] + #[doc(hidden)] + #[zenoh_macros::unstable] pub fn with_timestamp(mut self, timestamp: Timestamp) -> Self { self.timestamp = Some(timestamp); self } + /// Gets the quality of service settings this Sample was sent with. + #[inline] + pub fn qos(&self) -> &QoS { + &self.qos + } + + /// Gets infos on the source of this Sample. + #[zenoh_macros::unstable] + #[inline] + pub fn source_info(&self) -> &SourceInfo { + &self.source_info + } + /// Sets the source info of this Sample. #[zenoh_macros::unstable] #[inline] @@ -506,17 +534,22 @@ impl Sample { } } + /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. #[zenoh_macros::unstable] + #[inline] pub fn attachment(&self) -> Option<&Attachment> { self.attachment.as_ref() } + /// Gets the mutable sample attachment: a map of key-value pairs, where each key and value are byte-slices. #[zenoh_macros::unstable] + #[inline] pub fn attachment_mut(&mut self) -> &mut Option { &mut self.attachment } - #[allow(clippy::result_large_err)] + #[inline] + #[doc(hidden)] #[zenoh_macros::unstable] pub fn with_attachment(mut self, attachment: Attachment) -> Self { self.attachment = Some(attachment); diff --git a/zenoh/src/subscriber.rs b/zenoh/src/subscriber.rs index e276d0c6d0..d4c3257472 100644 --- a/zenoh/src/subscriber.rs +++ b/zenoh/src/subscriber.rs @@ -67,7 +67,7 @@ impl fmt::Debug for SubscriberState { /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") -/// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr, sample.payload) }) +/// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()) }) /// .res() /// .await /// .unwrap(); @@ -100,7 +100,7 @@ pub(crate) struct SubscriberInner<'a> { /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") -/// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr, sample.payload); }) +/// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) /// .pull_mode() /// .res() /// .await @@ -123,7 +123,7 @@ impl<'a> PullSubscriberInner<'a> { /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") - /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr, sample.payload); }) + /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) /// .pull_mode() /// .res() /// .await @@ -332,7 +332,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") - /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr, sample.payload); }) + /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) /// .res() /// .await /// .unwrap(); @@ -407,7 +407,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> { /// .await /// .unwrap(); /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {} {:?}", sample.key_expr, sample.payload); + /// println!("Received: {} {:?}", sample.key_expr(), sample.payload()); /// } /// # }) /// ``` @@ -636,7 +636,7 @@ where /// .await /// .unwrap(); /// while let Ok(sample) = subscriber.recv_async().await { -/// println!("Received: {} {:?}", sample.key_expr, sample.payload); +/// println!("Received: {} {:?}", sample.key_expr(), sample.payload()); /// } /// # }) /// ``` diff --git a/zenoh/tests/attachments.rs b/zenoh/tests/attachments.rs index 0e7c1c0de7..38d03b0a84 100644 --- a/zenoh/tests/attachments.rs +++ b/zenoh/tests/attachments.rs @@ -9,9 +9,9 @@ fn pubsub() { .callback(|sample| { println!( "{}", - std::str::from_utf8(&sample.payload.contiguous()).unwrap() + std::str::from_utf8(&sample.payload().contiguous()).unwrap() ); - for (k, v) in &sample.attachment.unwrap() { + for (k, v) in sample.attachment().unwrap() { assert!(k.iter().rev().zip(v.as_slice()).all(|(k, v)| k == v)) } }) diff --git a/zenoh/tests/events.rs b/zenoh/tests/events.rs index 0ea775784a..5823b16150 100644 --- a/zenoh/tests/events.rs +++ b/zenoh/tests/events.rs @@ -69,15 +69,15 @@ fn zenoh_events() { let sample = ztimeout!(sub1.recv_async()); assert!(sample.is_ok()); - let key_expr = sample.as_ref().unwrap().key_expr.as_str(); + let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.eq(&format!("@/session/{zid}/transport/unicast/{zid2}"))); - assert!(sample.as_ref().unwrap().kind == SampleKind::Put); + assert!(sample.as_ref().unwrap().kind() == SampleKind::Put); let sample = ztimeout!(sub2.recv_async()); assert!(sample.is_ok()); - let key_expr = sample.as_ref().unwrap().key_expr.as_str(); + let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.starts_with(&format!("@/session/{zid}/transport/unicast/{zid2}/link/"))); - assert!(sample.as_ref().unwrap().kind == SampleKind::Put); + assert!(sample.as_ref().unwrap().kind() == SampleKind::Put); let replies: Vec = ztimeout!(session .get(format!("@/session/{zid}/transport/unicast/*")) @@ -87,7 +87,7 @@ fn zenoh_events() { .collect(); assert!(replies.len() == 1); assert!(replies[0].sample.is_ok()); - let key_expr = replies[0].sample.as_ref().unwrap().key_expr.as_str(); + let key_expr = replies[0].sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.eq(&format!("@/session/{zid}/transport/unicast/{zid2}"))); let replies: Vec = ztimeout!(session @@ -98,22 +98,22 @@ fn zenoh_events() { .collect(); assert!(replies.len() == 1); assert!(replies[0].sample.is_ok()); - let key_expr = replies[0].sample.as_ref().unwrap().key_expr.as_str(); + let key_expr = replies[0].sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.starts_with(&format!("@/session/{zid}/transport/unicast/{zid2}/link/"))); close_session(session2).await; let sample = ztimeout!(sub1.recv_async()); assert!(sample.is_ok()); - let key_expr = sample.as_ref().unwrap().key_expr.as_str(); + let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.eq(&format!("@/session/{zid}/transport/unicast/{zid2}"))); - assert!(sample.as_ref().unwrap().kind == SampleKind::Delete); + assert!(sample.as_ref().unwrap().kind() == SampleKind::Delete); let sample = ztimeout!(sub2.recv_async()); assert!(sample.is_ok()); - let key_expr = sample.as_ref().unwrap().key_expr.as_str(); + let key_expr = sample.as_ref().unwrap().key_expr().as_str(); assert!(key_expr.starts_with(&format!("@/session/{zid}/transport/unicast/{zid2}/link/"))); - assert!(sample.as_ref().unwrap().kind == SampleKind::Delete); + assert!(sample.as_ref().unwrap().kind() == SampleKind::Delete); sub2.undeclare().res().await.unwrap(); sub1.undeclare().res().await.unwrap(); diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 2a5c30e7b8..1f502138e4 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -83,9 +83,9 @@ fn downsampling_by_keyexpr_impl(egress: bool) { .callback(move |sample| { let mut count = zlock!(total_count_clone); *count += 1; - if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { + if sample.key_expr().as_str() == "test/downsamples_by_keyexp/r100" { zlock!(counter_r100).tick(); - } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { + } else if sample.key_expr().as_str() == "test/downsamples_by_keyexp/r50" { zlock!(counter_r50).tick(); } }) @@ -191,7 +191,7 @@ fn downsampling_by_interface_impl(egress: bool) { .callback(move |sample| { let mut count = zlock!(total_count_clone); *count += 1; - if sample.key_expr.as_str() == "test/downsamples_by_interface/r100" { + if sample.key_expr().as_str() == "test/downsamples_by_interface/r100" { zlock!(counter_r100).tick(); } }) diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index 96cca533df..c55eed4bc4 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -72,14 +72,14 @@ fn zenoh_liveliness() { .res_async()) .unwrap(); let sample = ztimeout!(replies.recv_async()).unwrap().sample.unwrap(); - assert!(sample.kind == SampleKind::Put); - assert!(sample.key_expr.as_str() == "zenoh_liveliness_test"); + assert!(sample.kind() == SampleKind::Put); + assert!(sample.key_expr().as_str() == "zenoh_liveliness_test"); assert!(ztimeout!(replies.recv_async()).is_err()); let sample = ztimeout!(sub.recv_async()).unwrap(); - assert!(sample.kind == SampleKind::Put); - assert!(sample.key_expr.as_str() == "zenoh_liveliness_test"); + assert!(sample.kind() == SampleKind::Put); + assert!(sample.key_expr().as_str() == "zenoh_liveliness_test"); drop(token); diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 475d8d7a1b..24119e7b1e 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -52,13 +52,13 @@ fn pubsub() { task::sleep(SLEEP).await; ztimeout!(publisher1.put("qos").res_async()).unwrap(); - let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; + let qos = ztimeout!(subscriber.recv_async()).unwrap().qos().clone(); assert_eq!(qos.priority(), Priority::DataHigh); assert_eq!(qos.congestion_control(), CongestionControl::Drop); ztimeout!(publisher2.put("qos").res_async()).unwrap(); - let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; + let qos = ztimeout!(subscriber.recv_async()).unwrap().qos().clone(); assert_eq!(qos.priority(), Priority::DataLow); assert_eq!(qos.congestion_control(), CongestionControl::Block); diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 82053b4f1d..06a8f5da45 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -58,7 +58,7 @@ impl Task { let sub = ztimeout!(session.declare_subscriber(ke).res_async())?; let mut counter = 0; while let Ok(sample) = sub.recv_async().await { - let recv_size = sample.payload.len(); + let recv_size = sample.payload().len(); if recv_size != *expected_size { bail!("Received payload size {recv_size} mismatches the expected {expected_size}"); } @@ -91,7 +91,7 @@ impl Task { while let Ok(reply) = replies.recv_async().await { match reply.sample { Ok(sample) => { - let recv_size = sample.payload.len(); + let recv_size = sample.payload().len(); if recv_size != *expected_size { bail!("Received payload size {recv_size} mismatches the expected {expected_size}"); } diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 077c58298d..e3f5e2df63 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -95,7 +95,7 @@ async fn test_session_pubsub(peer01: &Session, peer02: &Session, reliability: Re let sub = ztimeout!(peer01 .declare_subscriber(key_expr) .callback(move |sample| { - assert_eq!(sample.payload.len(), size); + assert_eq!(sample.payload().len(), size); c_msgs.fetch_add(1, Ordering::Relaxed); }) .res_async()) @@ -198,8 +198,8 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); while let Ok(s) = ztimeout!(rs.recv_async()) { let s = s.sample.unwrap(); - assert_eq!(s.kind, SampleKind::Put); - assert_eq!(s.payload.len(), size); + assert_eq!(s.kind(), SampleKind::Put); + assert_eq!(s.payload().len(), size); cnt += 1; } } @@ -216,8 +216,8 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); while let Ok(s) = ztimeout!(rs.recv_async()) { let s = s.sample.unwrap(); - assert_eq!(s.kind, SampleKind::Delete); - assert_eq!(s.payload.len(), 0); + assert_eq!(s.kind(), SampleKind::Delete); + assert_eq!(s.payload().len(), 0); cnt += 1; } } diff --git a/zenoh/tests/unicity.rs b/zenoh/tests/unicity.rs index def0dffe33..8eb007b0c0 100644 --- a/zenoh/tests/unicity.rs +++ b/zenoh/tests/unicity.rs @@ -114,7 +114,7 @@ async fn test_unicity_pubsub(s01: &Session, s02: &Session, s03: &Session) { let sub1 = ztimeout!(s01 .declare_subscriber(key_expr) .callback(move |sample| { - assert_eq!(sample.payload.len(), size); + assert_eq!(sample.payload().len(), size); c_msgs1.fetch_add(1, Ordering::Relaxed); }) .res_async()) @@ -126,7 +126,7 @@ async fn test_unicity_pubsub(s01: &Session, s02: &Session, s03: &Session) { let sub2 = ztimeout!(s02 .declare_subscriber(key_expr) .callback(move |sample| { - assert_eq!(sample.payload.len(), size); + assert_eq!(sample.payload().len(), size); c_msgs2.fetch_add(1, Ordering::Relaxed); }) .res_async()) @@ -232,7 +232,7 @@ async fn test_unicity_qryrep(s01: &Session, s02: &Session, s03: &Session) { for _ in 0..msg_count { let rs = ztimeout!(s03.get(key_expr).res_async()).unwrap(); while let Ok(s) = ztimeout!(rs.recv_async()) { - assert_eq!(s.sample.unwrap().payload.len(), size); + assert_eq!(s.sample.unwrap().payload().len(), size); cnt += 1; } } From f3af52ac0f1787d3eff29ef82c5f00e695c249e2 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 13 Mar 2024 15:17:37 +0100 Subject: [PATCH 2/4] format and clippy --- examples/examples/z_get_liveliness.rs | 2 +- examples/examples/z_sub_liveliness.rs | 6 ++-- .../src/replica/align_queryable.rs | 5 +--- .../src/replica/aligner.rs | 5 +--- .../src/replica/storage.rs | 29 ++++++++----------- zenoh-ext/src/querying_subscriber.rs | 2 +- zenoh/tests/qos.rs | 4 +-- 7 files changed, 21 insertions(+), 32 deletions(-) diff --git a/examples/examples/z_get_liveliness.rs b/examples/examples/z_get_liveliness.rs index 036dc0ab98..66de570356 100644 --- a/examples/examples/z_get_liveliness.rs +++ b/examples/examples/z_get_liveliness.rs @@ -37,7 +37,7 @@ async fn main() { .unwrap(); while let Ok(reply) = replies.recv_async().await { match reply.sample { - Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr.as_str(),), + Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr().as_str(),), Err(err) => { let payload = err .payload diff --git a/examples/examples/z_sub_liveliness.rs b/examples/examples/z_sub_liveliness.rs index 52ba53875c..02e2e71ba4 100644 --- a/examples/examples/z_sub_liveliness.rs +++ b/examples/examples/z_sub_liveliness.rs @@ -46,13 +46,13 @@ async fn main() { select!( sample = subscriber.recv_async() => { let sample = sample.unwrap(); - match sample.kind { + match sample.kind() { SampleKind::Put => println!( ">> [LivelinessSubscriber] New alive token ('{}')", - sample.key_expr.as_str()), + sample.key_expr().as_str()), SampleKind::Delete => println!( ">> [LivelinessSubscriber] Dropped token ('{}')", - sample.key_expr.as_str()), + sample.key_expr().as_str()), } }, diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs index fc361d77f2..32be4a5534 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs @@ -180,10 +180,7 @@ impl AlignQueryable { let entry = entry.unwrap(); result.push(AlignData::Data( OwnedKeyExpr::from(entry.key_expr().clone()), - ( - Value::from(entry), - each.timestamp, - ), + (Value::from(entry), each.timestamp), )); } } diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index b11a94e4f2..fb46b78082 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -142,10 +142,7 @@ impl Aligner { for sample in replies { result.insert( sample.key_expr().clone().into(), - ( - sample.timestamp().unwrap().clone(), - Value::from(sample), - ), + (*sample.timestamp().unwrap(), Value::from(sample)), ); } (result, no_err) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 895f2e1914..0708dcabd9 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -276,7 +276,7 @@ impl StorageService { } let matching_keys = if sample.key_expr().is_wild() { - self.get_matching_keys(&sample.key_expr()).await + self.get_matching_keys(sample.key_expr()).await } else { vec![sample.key_expr().clone().into()] }; @@ -309,20 +309,15 @@ impl StorageService { let Value { payload, encoding, .. } = overriding_update.data.value; - let sample_to_store = Sample::new(KeyExpr::from(k.clone()), payload) + Sample::new(KeyExpr::from(k.clone()), payload) .with_encoding(encoding) .with_timestamp(overriding_update.data.timestamp) - .with_kind(overriding_update.kind); - sample_to_store - } - None => { - let sample_to_store = - Sample::new(KeyExpr::from(k.clone()), sample.payload().clone()) - .with_encoding(sample.encoding().clone()) - .with_timestamp(sample.timestamp().unwrap().clone()) - .with_kind(sample.kind()); - sample_to_store + .with_kind(overriding_update.kind) } + None => Sample::new(KeyExpr::from(k.clone()), sample.payload().clone()) + .with_encoding(sample.encoding().clone()) + .with_timestamp(*sample.timestamp().unwrap()) + .with_kind(sample.kind()), }; let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) { @@ -340,16 +335,16 @@ impl StorageService { stripped_key, Value::new(sample_to_store.payload().clone()) .with_encoding(sample_to_store.encoding().clone()), - sample_to_store.timestamp().unwrap().clone(), + *sample_to_store.timestamp().unwrap(), ) .await } SampleKind::Delete => { // register a tombstone - self.mark_tombstone(&k, sample_to_store.timestamp().unwrap().clone()) + self.mark_tombstone(&k, *sample_to_store.timestamp().unwrap()) .await; storage - .delete(stripped_key, sample_to_store.timestamp().unwrap().clone()) + .delete(stripped_key, *sample_to_store.timestamp().unwrap()) .await } }; @@ -363,7 +358,7 @@ impl StorageService { .as_ref() .unwrap() .log_propagation - .send((k.clone(), sample_to_store.timestamp().unwrap().clone())); + .send((k.clone(), *sample_to_store.timestamp().unwrap())); match sending { Ok(_) => (), Err(e) => { @@ -398,7 +393,7 @@ impl StorageService { // @TODO: change into a better store that does incremental writes let key = sample.key_expr().clone(); let mut wildcards = self.wildcard_updates.write().await; - let timestamp = sample.timestamp().unwrap().clone(); + let timestamp = *sample.timestamp().unwrap(); wildcards.insert( &key, Update { diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 470f795f2b..480e490fdd 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -305,7 +305,7 @@ impl MergeQueue { fn push(&mut self, sample: Sample) { if let Some(ts) = sample.timestamp() { - self.timstamped.entry(ts.clone()).or_insert(sample); + self.timstamped.entry(*ts).or_insert(sample); } else { self.untimestamped.push_back(sample); } diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 24119e7b1e..1a9df306b2 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -52,13 +52,13 @@ fn pubsub() { task::sleep(SLEEP).await; ztimeout!(publisher1.put("qos").res_async()).unwrap(); - let qos = ztimeout!(subscriber.recv_async()).unwrap().qos().clone(); + let qos = *ztimeout!(subscriber.recv_async()).unwrap().qos(); assert_eq!(qos.priority(), Priority::DataHigh); assert_eq!(qos.congestion_control(), CongestionControl::Drop); ztimeout!(publisher2.put("qos").res_async()).unwrap(); - let qos = ztimeout!(subscriber.recv_async()).unwrap().qos().clone(); + let qos = *ztimeout!(subscriber.recv_async()).unwrap().qos(); assert_eq!(qos.priority(), Priority::DataLow); assert_eq!(qos.congestion_control(), CongestionControl::Block); From 0ca41e817044e80a6c422122f46aa3e60821ce64 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 13 Mar 2024 15:26:19 +0100 Subject: [PATCH 3/4] mark remaining sample-mutating methods as unstable and hidden --- zenoh/src/sample.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 1ac04313ab..9b9c55822e 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -520,10 +520,12 @@ impl Sample { self } - #[inline] /// Ensure that an associated Timestamp is present in this Sample. /// If not, a new one is created with the current system time and 0x00 as id. /// Get the timestamp of this sample (either existing one or newly created) + #[inline] + #[doc(hidden)] + #[zenoh_macros::unstable] pub fn ensure_timestamp(&mut self) -> &Timestamp { if let Some(ref timestamp) = self.timestamp { timestamp @@ -542,8 +544,9 @@ impl Sample { } /// Gets the mutable sample attachment: a map of key-value pairs, where each key and value are byte-slices. - #[zenoh_macros::unstable] #[inline] + #[doc(hidden)] + #[zenoh_macros::unstable] pub fn attachment_mut(&mut self) -> &mut Option { &mut self.attachment } From d73da7d70fef25d76bf94945792da0f0adffed0b Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Fri, 15 Mar 2024 11:22:00 +0100 Subject: [PATCH 4/4] clippy --- plugins/zenoh-plugin-rest/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index e0df8f286b..e2718f6579 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -344,7 +344,6 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result