Skip to content

Commit

Permalink
feat: bump Zenoh to 1.1.0
Browse files Browse the repository at this point in the history
This commit updates the version of Zenoh used to 1.1.0.

The bulk of the changes is due to changes in Zenoh's API which can be
summarised as follows:
- There is no longer an `async` prelude.
- There is no longer the need to call `res()` in order to make an
  asynchronous Zenoh call.
- A Zenoh `Session` internally uses an `Arc` so there is no longer
  the need to explicitly wrap them inside one.
- The `Value` structure was removed and, instead, relevant `payload()`
  method were added.
- To access the `Sample` associated with a `Reply` the method `result()`
  should be called.
- We can call `reply_err` to reply to a Query with an error.
- To access the bytes representation of a payload, the method
  `to_bytes()` was introduced.
- The `FlumeSubscriber` structure was removed from Zenoh's API. Instead,
  we use the `Subscriber<FifoChannelHandler<Sample>>`. Internally it
  still uses `flume` but that dependency is now hidden.

---

* Cargo.lock: copied the content of the `Cargo.lock` of Zenoh's 1.1.0
  release and ran `cargo build` after.

* Cargo.toml:
  - Updated and froze the version of the Zenoh crates to 1.1.4.
  - Removed the crates that were no longer needed:
    - `zenoh-protocol`,
    - `zenoh-result`,
    - `zenoh-sync`,
    - `zenoh-util`.

* zenoh-flow-commons/Cargo.toml: replaced the crate `zenoh-protocol`
  with `zenoh-config` as this is now the crate that exposes the
  `ZenohId` structure.

* zenoh-flow-commons/src/identifiers.rs:
  - `ZenohId` is now exposed in the `zenoh_config` crate.
  - `ZenohId::rand()` no longer exists and the implementation of the
    `Default` trait does the same.

* zenoh-flow-daemon/src/daemon/mod.rs:
  - There is no longer an `async` prelude.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.

* zenoh-flow-daemon/src/daemon/queryables.rs:
  - There is no longer an `async` prelude.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call.

* zenoh-flow-daemon/src/queries/instances/abort.rs:
  - There is no longer an `async` prelude.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.

* zenoh-flow-daemon/src/queries/instances/create.rs
  - There is no longer an `async` prelude.
  - The `Value` structure was removed and we have to use instead the
    `payload()` method to attach a payload to a query.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - To access the `Sample` of a `Reply` we now have to call the
    `result()` method.

* zenoh-flow-daemon/src/queries/instances/delete.rs:
  - There is no longer an `async` prelude.
  - The `Value` structure was removed and we have to use instead the
    `payload()` method to attach a payload to a query.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.

* zenoh-flow-daemon/src/queries/instances/mod.rs:
  - There is no longer an `async` prelude.
  - The `Query` structure is now under the `zenoh::query` module.
  - The `reply()` method now takes two parameters: a key expression
    and a payload.
  - We can now reply an error using the `reply_err()` method.

* zenoh-flow-daemon/src/queries/instances/start.rs:
  - There is no longer an `async` prelude.
  - The `Query` structure is now under the `zenoh::query` module.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - We can now reply an error using the `reply_err()` method.
  - The `reply()` method now takes two parameters: a key expression
    and a payload.
  - The `Value` structure was removed and we have to use instead the
    `payload()` method to attach a payload to a query.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.

* zenoh-flow-daemon/src/queries/mod.rs:
  - There is no longer an `async` prelude.
  - The `Query` structure is now under the `zenoh::query` module.
  - The check of the encoding was removed as it was superfluous, not
    providing much added benefits.
  - The `Value` structure was removed and we have to use instead the
    `payload()` method to access the payload of a query.
  - To access the bytes representation of a payload we can now call the
    `to_bytes()` method.

* zenoh-flow-daemon/src/queries/runtime.rs:
  - There is no longer an `async` prelude.
  - The `Query` structure is now under the `zenoh::query` module.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - The `reply()` method now takes two parameters: a key expression
    and a payload.
  - We can now reply an error using the `reply_err()` method.

* zenoh-flow-runtime/src/lib.rs:
  - The `client`, `empty` and `peer` function no longer exists.
  - There is no longer an `async` prelude.
  - The `open` method, `Config` and `Session` structures were moved at
    the root of the `zenoh` crate.

* zenoh-flow-runtime/src/runners/builtin/zenoh/sink.rs:
  - There is no longer an `async` prelude.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - The `OwnedKeyExpr` structure is now exposed under the
    `zenoh::key_expr` module.
  - The `Publisher` structure is now exposed under the `zenoh::pubsub`
    module.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.

* zenoh-flow-runtime/src/runners/builtin/zenoh/source.rs:
  - There is no longer an `async` prelude.
  - The `FlumeSubscriber` no longer exists and was instead replaced with
    the `FifoChannelHandler`.
  - The `OwnedKeyExpr` structure is now exposed under the
    `zenoh::key_expr` module.
  - The `Publisher` structure is now exposed under the `zenoh::pubsub`
    module.
  - The `Sample` structure is now exposed under the `zenoh::sample`
    module.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - To access the bytes representation of a payload we can now call the
    `to_bytes()` method.

* zenoh-flow-runtime/src/runners/connectors.rs:
  - There is no longer an `async` prelude.
  - The `FlumeSubscriber` no longer exists and was instead replaced with
    the `FifoChannelHandler`.
  - The `OwnedKeyExpr` structure is now exposed under the
    `zenoh::key_expr` module.
  - The `Publisher` structure is now exposed under the `zenoh::pubsub`
    module.
  - The `Sample` structure is now exposed under the `zenoh::sample`
    module.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - To "read" the bytes representation of a payload we can now call the
    `reader()` method.

* zenoh-flow-runtime/src/runtime/builder.rs:
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - There is no longer an `async` prelude.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.
  - The `zenoh::config::peer()` function no longer exists and the
    implementation of the `Default` trait now yields the same result.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res_async()`.

* zenoh-flow-runtime/src/runtime/mod.rs:
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.
  - Due to the previous change, the method `session()` now returns a
    reference that a caller can `clone()` — yielding the same result.

* zenoh-plugin-zenoh-flow/Cargo.toml: removed the no longer needed
  crates `zenoh-result` and `zenoh-util`.

* zenoh-plugin-zenoh-flow/src/lib.rs:
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one. That rendes the import to `std::sync::Arc`
    unneeded.
  - There is no longer an `async` prelude.
  - The `zenoh_result::zerror` macro was moved inside the
    `zenoh::internal` module.
  - The `Runtime` and `RunningPlugin` were, respectively, moved inside
    the modules `zenoh::internal::runtime` and
    `zenoh::internal::plugins`.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - The signature of the method `adminspace_getter` of the
    `RunningPluginTrait` changed to accept a `KeyExpr` instead of a
    `Selector`.
  - The `Result` structure was moved in the root of the `zenoh` crate.
  - The `Response` structure was moved in the `zenoh::internal::plugins`
    module.

* zfctl/Cargo.toml: removed the unneeded crate `zenoh-util`.

* zfctl/src/daemon_command.rs:
  - There is no longer an `async` prelude.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - The `Config` structure is exposed in the root of the `zenoh` crate.
  - The `zenoh::config::peer()` function no longer exists and the
    implementation of the `Default` trait now yields the same result.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res_async()`.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.

* zfctl/src/instance_command.rs:
  - There is no longer an `async` prelude.
  - The enumeration `ConsolidationMode` is now exposed in the
    `zenoh::query` module.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - The `Value` structure was removed and we have to use instead the
    `payload()` method to set the payload of a query.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - To access the `Sample` of a `Reply` we now have to call the
    `result()` method.
  - To access the bytes representation of a payload we can now call the
    `to_bytes()` method.
  - To access the `replier_id` we now have to call the corresponding
    getter method.

* zfctl/src/main.rs:
  - There is no longer an `async` prelude.
  - The `Config` structure is exposed in the root of the `zenoh` crate.
  - The `zenoh::config::peer()` function no longer exists and the
    implementation of the `Default` trait now yields the same result.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.

* zfctl/src/run_local.rs:
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res_async()`.
  - A `Session` internally uses an `Arc`, meaning that we no longer need
    to wrap it inside one.

* zfctl/src/runtime_command.rs:
  - There is no longer an `async` prelude.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - The `Value` structure was removed and we have to use instead the
    `payload()` method to attach a payload to a query.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - To access the `Sample` of a `Reply` we now have to call the
    `result()` method.
  - To access the bytes representation of a payload we can now call the
    `to_bytes()` method.

* zfctl/src/utils.rs:
  - There is no longer an `async` prelude.
  - The `Session` structure is exposed in the root of the `zenoh` crate.
  - The enumeration `ConsolidationMode` is now exposed in the
    `zenoh::query` module.
  - The `Value` structure was removed and we have to use instead the
    `payload()` method to attach a payload to a query.
  - The `AsyncResolve` trait no longer exists, we can directly `await`
    a future returned by a Zenoh call — no longer needing to call the
    method `res()`.
  - To access the `Sample` of a `Reply` we now have to call the
    `result()` method.
  - To access the bytes representation of a payload we can now call the
    `to_bytes()` method.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Dec 13, 2024
1 parent ae35925 commit 6688448
Show file tree
Hide file tree
Showing 28 changed files with 1,303 additions and 1,043 deletions.
1,936 changes: 1,131 additions & 805 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 6 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,18 @@ tracing-subscriber = { version = "0.3" }
uhlc = "0.6"
url = { version = "2.2", features = ["serde"] }
uuid = { version = "1.1", features = ["serde", "v4"] }
zenoh = { version = "0.11.0-rc.3", features = ["unstable", "plugins"] }
zenoh-collections = { version = "0.11.0-rc.3" }
zenoh-core = { version = "0.11.0-rc.3" }
zenoh-ext = { version = "0.11.0-rc.3" }
# ⚠️ To update the version of Zenoh, first *manually* copy the `Cargo.lock` from
# the targeted version, then run `cargo build` and finally commit the updates.
zenoh = { version = "=1.1.0", features = ["unstable", "internal", "plugins"] }
zenoh-config = { version = "=1.1.0" }
zenoh-flow-commons = { path = "./zenoh-flow-commons" }
zenoh-flow-daemon = { path = "./zenoh-flow-daemon" }
zenoh-flow-descriptors = { path = "./zenoh-flow-descriptors" }
zenoh-flow-nodes = { path = "./zenoh-flow-nodes" }
zenoh-flow-records = { path = "./zenoh-flow-records" }
zenoh-flow-runtime = { path = "./zenoh-flow-runtime" }
zenoh-keyexpr = { version = "0.11.0-rc.3" }
zenoh-plugin-trait = { version = "0.11.0-rc.3" }
zenoh-protocol = { version = "0.11.0-rc.3" }
zenoh-result = "0.11.0-rc.3"
zenoh-sync = { version = "0.11.0-rc.3" }
zenoh-util = { version = "0.11.0-rc.3" }
zenoh-plugin-trait = { version = "=1.1.0" }
zenoh-keyexpr = { version = "=1.1.0" }

[profile.dev]
debug = true
Expand Down
2 changes: 1 addition & 1 deletion zenoh-flow-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ serde_yaml = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
zenoh-keyexpr = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-config = { workspace = true }
7 changes: 3 additions & 4 deletions zenoh-flow-commons/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{fmt::Display, ops::Deref, str::FromStr, sync::Arc};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use zenoh_protocol::core::ZenohId;
use zenoh_config::ZenohId;

use crate::deserialize::deserialize_id;

Expand Down Expand Up @@ -116,10 +116,9 @@ pub struct RuntimeId(ZenohId);

impl RuntimeId {
/// Generate a new random identifier, guaranteed (with a high probability) to be unique.
///
/// This internally calls [ZenohId::rand].
pub fn rand() -> Self {
Self(ZenohId::rand())
// NOTE: The `Default` trait implementation internally calls `rand()`.
Self(ZenohId::default())
}
}

Expand Down
14 changes: 9 additions & 5 deletions zenoh-flow-daemon/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod queryables;
use std::sync::Arc;

use flume::{Receiver, Sender};
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::Result;
pub use zenoh_flow_runtime::{Extension, Extensions, Runtime};

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Daemon {
/// [runtime]: Runtime
/// [configuration]: ZenohFlowConfiguration
pub async fn spawn_from_config(
zenoh_session: Arc<Session>,
zenoh_session: Session,
configuration: ZenohFlowConfiguration,
) -> Result<Self> {
let extensions = configuration.extensions.unwrap_or_default();
Expand Down Expand Up @@ -116,9 +116,13 @@ impl Daemon {
// TODO: Clean everything up before aborting.
}

if let Err(e) =
queryables::spawn_instances_queryable(session, runtime.clone(), abort_rx, abort_ack_tx)
.await
if let Err(e) = queryables::spawn_instances_queryable(
session.clone(),
runtime.clone(),
abort_rx,
abort_ack_tx,
)
.await
{
tracing::error!(
"The Zenoh-Flow daemon encountered a fatal error:\n{:?}\nAborting",
Expand Down
18 changes: 5 additions & 13 deletions zenoh-flow-daemon/src/daemon/queryables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use anyhow::bail;
use flume::{Receiver, Sender};
use futures::select;
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::Result;
use zenoh_flow_runtime::Runtime;

Expand All @@ -27,17 +27,13 @@ use crate::queries::{

/// Spawns an async task to answer queries received on `zenoh-flow/{runtime_id}/instances`.
pub(crate) async fn spawn_instances_queryable(
zenoh_session: Arc<Session>,
zenoh_session: Session,
runtime: Arc<Runtime>,
abort_rx: Receiver<()>,
abort_ack_tx: Sender<()>,
) -> Result<()> {
let ke_instances = selectors::selector_instances(runtime.id());
let queryable = match zenoh_session
.declare_queryable(ke_instances.clone())
.res()
.await
{
let queryable = match zenoh_session.declare_queryable(ke_instances.clone()).await {
Ok(queryable) => {
tracing::trace!("declared queryable: {}", ke_instances);
queryable
Expand Down Expand Up @@ -89,18 +85,14 @@ pub(crate) async fn spawn_instances_queryable(
}

pub(crate) async fn spawn_runtime_queryable(
zenoh_session: Arc<Session>,
zenoh_session: Session,
runtime: Arc<Runtime>,
abort_rx: Receiver<()>,
abort_ack_tx: Sender<()>,
) -> Result<()> {
let ke_runtime = selectors::selector_runtimes(runtime.id());

let queryable = match zenoh_session
.declare_queryable(ke_runtime.clone())
.res()
.await
{
let queryable = match zenoh_session.declare_queryable(ke_runtime.clone()).await {
Ok(queryable) => {
tracing::trace!("declared queryable < {} >", ke_runtime);
queryable
Expand Down
11 changes: 3 additions & 8 deletions zenoh-flow-daemon/src/queries/instances/abort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::{InstanceId, RuntimeId};
use zenoh_flow_runtime::Runtime;

Expand All @@ -27,7 +27,7 @@ pub(crate) fn abort(runtime: Arc<Runtime>, origin: Origin, instance_id: Instance
match runtime.try_get_record(&instance_id).await {
Ok(record) => {
query_abort(
&runtime.session(),
runtime.session(),
record
.mapping()
.keys()
Expand Down Expand Up @@ -74,12 +74,7 @@ pub(crate) async fn query_abort(
for runtime_id in runtimes {
let selector = selectors::selector_instances(runtime_id);

if let Err(e) = session
.get(selector)
.with_value(abort_query.clone())
.res()
.await
{
if let Err(e) = session.get(selector).payload(abort_query.clone()).await {
tracing::error!(
"Sending abort query to runtime < {} > failed with error: {:?}",
runtime_id,
Expand Down
6 changes: 2 additions & 4 deletions zenoh-flow-daemon/src/queries/instances/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;

use anyhow::Context;
use zenoh::prelude::r#async::*;
use zenoh_flow_commons::{InstanceId, Result};
use zenoh_flow_descriptors::FlattenedDataFlowDescriptor;
use zenoh_flow_records::DataFlowRecord;
Expand Down Expand Up @@ -134,8 +133,7 @@ Query:
runtime
.session()
.get(&selector)
.with_value(payload.clone())
.res()
.payload(payload.clone())
.await,
r#"Zenoh query on < {} > failed"#,
selector
Expand All @@ -148,7 +146,7 @@ Query:
);

rollback_if_err!(
reply.sample,
reply.result(),
"Runtime < {} > failed to load data flow instance < {} >",
&runtime_id,
&instance_id
Expand Down
11 changes: 3 additions & 8 deletions zenoh-flow-daemon/src/queries/instances/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use async_std::task::JoinHandle;
use zenoh::prelude::r#async::*;
use zenoh::Session;
use zenoh_flow_commons::{InstanceId, RuntimeId};
use zenoh_flow_runtime::{DataFlowErr, Runtime};

Expand All @@ -39,12 +39,7 @@ pub(crate) async fn query_delete(

// NOTE: No need to process the request, as, even if the query failed, this is not something we want to recover
// from.
if let Err(e) = session
.get(selector)
.with_value(delete_query.clone())
.res()
.await
{
if let Err(e) = session.get(selector).payload(delete_query.clone()).await {
tracing::error!(
"Sending delete query to runtime < {} > failed with error: {:?}",
runtime_id,
Expand All @@ -69,7 +64,7 @@ pub(crate) fn delete_instance(
match runtime.try_get_record(&instance_id).await {
Ok(record) => {
query_delete(
&runtime.session(),
runtime.session(),
record
.mapping()
.keys()
Expand Down
22 changes: 8 additions & 14 deletions zenoh-flow-daemon/src/queries/instances/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{fmt::Debug, sync::Arc};

use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh::query::Query;
use zenoh_flow_commons::{InstanceId, Result};
use zenoh_flow_descriptors::FlattenedDataFlowDescriptor;
use zenoh_flow_records::DataFlowRecord;
Expand All @@ -48,20 +48,14 @@ pub enum Origin {
}

async fn reply<T: Serialize + Debug>(query: Query, data: Result<T>) -> Result<()> {
let sample = match data {
Ok(data) => match serde_json::to_vec(&data) {
Ok(payload) => Ok(Sample::new(query.key_expr().clone(), payload)),
Err(e) => Err(Value::from(e.to_string())),
match data {
Ok(payload) => match serde_json::to_vec(&payload) {
Ok(payload) => query.reply(query.key_expr(), payload).await,
Err(e) => query.reply_err(e.to_string()).await,
},

Err(e) => Err(Value::from(e.to_string())),
};

query
.reply(sample)
.res()
.await
.map_err(|e| anyhow!("Failed to send reply: {:?}", e))
Err(e) => query.reply_err(e.to_string()).await,
}
.map_err(|e| anyhow!("Failed to send reply: {e:?}"))
}

/// The available interactions to manipulate a data flow instance.
Expand Down
17 changes: 5 additions & 12 deletions zenoh-flow-daemon/src/queries/instances/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use anyhow::bail;
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh::{bytes::ZBytes, query::Query, Session};
use zenoh_flow_commons::{InstanceId, Result, RuntimeId};
use zenoh_flow_runtime::Runtime;

Expand Down Expand Up @@ -51,7 +51,7 @@ Caused by:
message,
e
);
if let Err(e) = query.reply(Err(Value::from(message))).res().await {
if let Err(e) = query.reply_err(message).await {
tracing::error!(
"Failed to reply (error) to query on < {} >: {:?}",
query.key_expr(),
Expand All @@ -74,7 +74,7 @@ Caused by:
if matches!(origin, Origin::Client) {
return_if_err!(
query_start(
&runtime.session(),
runtime.session(),
record
.mapping()
.keys()
Expand All @@ -97,10 +97,7 @@ Caused by:

tracing::trace!("Successfully started instance < {} >", instance_id);
return_if_err!(
query
.reply(Ok(Sample::new(query.key_expr().clone(), Value::empty())))
.res()
.await,
query.reply(query.key_expr(), ZBytes::default()).await,
"Failed to reply (success) to query on < {} >",
query.key_expr()
);
Expand Down Expand Up @@ -169,11 +166,7 @@ Caused by:
let selector = selectors::selector_instances(runtime_id);

rollback_if_err!(
session
.get(selector)
.with_value(start_query.clone())
.res()
.await,
session.get(selector).payload(start_query.clone()).await,
"Query `start` on runtime < {} > failed",
runtime_id
);
Expand Down
21 changes: 4 additions & 17 deletions zenoh-flow-daemon/src/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) mod selectors;

use anyhow::{anyhow, bail};
use serde::Deserialize;
use zenoh::{prelude::*, queryable::Query};
use zenoh::query::Query;
use zenoh_flow_commons::Result;
pub use zenoh_flow_runtime::InstanceStatus;

Expand All @@ -48,22 +48,9 @@ pub use self::{
///
/// After these checks, the method `process` is called on the variant of `InstancesQuery`.
pub(crate) async fn validate_query<T: for<'a> Deserialize<'a>>(query: &Query) -> Result<T> {
let value = match query.value() {
Some(value) => value,
None => {
bail!("Received empty payload");
}
let Some(payload) = query.payload() else {
bail!("Received Query with empty payload")
};

if ![
Encoding::APP_OCTET_STREAM,
Encoding::APP_JSON,
Encoding::TEXT_JSON,
]
.contains(&value.encoding)
{
bail!("Encoding < {} > is not supported", value.encoding);
}

serde_json::from_slice::<T>(&value.payload.contiguous()).map_err(|e| anyhow!("{:?}", e))
serde_json::from_slice::<T>(&payload.to_bytes()).map_err(|e| anyhow!("{:?}", e))
}
12 changes: 5 additions & 7 deletions zenoh-flow-daemon/src/queries/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{collections::HashMap, sync::Arc};

use serde::{Deserialize, Serialize};
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind};
use zenoh::{prelude::r#async::*, queryable::Query};
use zenoh::query::Query;
use zenoh_flow_commons::{InstanceId, RuntimeId};
use zenoh_flow_runtime::{InstanceState, Runtime};

Expand Down Expand Up @@ -103,12 +103,10 @@ impl RuntimesQuery {
}
};

let sample = match payload {
Ok(payload) => Ok(Sample::new(query.key_expr().clone(), payload)),
Err(e) => Err(Value::from(e.to_string())),
};

if let Err(e) = query.reply(sample).res().await {
if let Err(e) = match payload {
Ok(payload) => query.reply(query.key_expr(), payload).await,
Err(e) => query.reply_err(e.to_string()).await,
} {
tracing::error!(
r#"Failed to reply to query < {} >:
Caused by:
Expand Down
6 changes: 1 addition & 5 deletions zenoh-flow-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,5 @@ pub use runtime::{DataFlowErr, Runtime, RuntimeBuilder};
/// A re-export of the Zenoh structures needed to open a [Session](zenoh::Session) asynchronously.
#[cfg(feature = "zenoh")]
pub mod zenoh {
pub use zenoh::{
config::{client, empty, peer},
open,
prelude::{r#async::AsyncResolve, Config, Session},
};
pub use zenoh::{open, Config, Session};
}
Loading

0 comments on commit 6688448

Please sign in to comment.