Skip to content

Commit

Permalink
refactor: replace AsyncResolve with IntoFuture (#942)
Browse files Browse the repository at this point in the history
* refactor: replace `AsyncResolve` with `IntoFuture`, `SyncResolve` with `Wait`, and deprecate old API

* fix: fix shared memory

* fix: fix remaining test

* fix: fix remaining test

* Update examples/examples/z_get.rs

Co-authored-by: Luca Cominardi <[email protected]>

* fix: put back (A)SyncResolve in prelude

---------

Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
wyfo and Mallets authored Apr 29, 2024
1 parent 1021261 commit fcb0545
Show file tree
Hide file tree
Showing 81 changed files with 1,148 additions and 1,268 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;

#[tokio::main]
async fn main() {
Expand All @@ -23,15 +23,11 @@ async fn main() {
let sub_key_expr = KeyExpr::try_from("test/valgrind/**").unwrap();

println!("Declaring Publisher on '{pub_key_expr}'...");
let pub_session = zenoh::open(Config::default()).res().await.unwrap();
let publisher = pub_session
.declare_publisher(&pub_key_expr)
.res()
.await
.unwrap();
let pub_session = zenoh::open(Config::default()).await.unwrap();
let publisher = pub_session.declare_publisher(&pub_key_expr).await.unwrap();

println!("Declaring Subscriber on '{sub_key_expr}'...");
let sub_session = zenoh::open(Config::default()).res().await.unwrap();
let sub_session = zenoh::open(Config::default()).await.unwrap();
let _subscriber = sub_session
.declare_subscriber(&sub_key_expr)
.callback(|sample| {
Expand All @@ -45,15 +41,14 @@ async fn main() {
.unwrap_or_else(|e| format!("{}", e))
);
})
.res()
.await
.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] data");
println!("Putting Data ('{}': '{}')...", &pub_key_expr, buf);
publisher.put(buf).res().await.unwrap();
publisher.put(buf).await.unwrap();
}

tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
9 changes: 3 additions & 6 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::convert::TryFrom;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;

#[tokio::main]
async fn main() {
Expand All @@ -24,7 +24,7 @@ async fn main() {
let get_selector = Selector::try_from("test/valgrind/**").unwrap();

println!("Declaring Queryable on '{queryable_key_expr}'...");
let queryable_session = zenoh::open(Config::default()).res().await.unwrap();
let queryable_session = zenoh::open(Config::default()).await.unwrap();
let _queryable = queryable_session
.declare_queryable(queryable_key_expr.clone())
.callback(move |query| {
Expand All @@ -33,18 +33,16 @@ async fn main() {
zenoh_runtime::ZRuntime::Application.block_in_place(async move {
query
.reply(queryable_key_expr, query.value().unwrap().payload().clone())
.res()
.await
.unwrap();
});
})
.complete(true)
.res()
.await
.unwrap();

println!("Declaring Get session for '{get_selector}'...");
let get_session = zenoh::open(Config::default()).res().await.unwrap();
let get_session = zenoh::open(Config::default()).await.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -53,7 +51,6 @@ async fn main() {
.get(&get_selector)
.value(idx)
.target(QueryTarget::All)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
Expand Down
107 changes: 82 additions & 25 deletions commons/zenoh-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
pub use lazy_static::lazy_static;
pub mod macros;

use std::future::{Future, Ready};
use std::future::{Future, IntoFuture, Ready};

// Re-exports after moving ZError/ZResult to zenoh-result
pub use zenoh_result::{bail, to_zerror, zerror};
Expand All @@ -30,12 +30,34 @@ pub mod zresult {
pub use zresult::Error;
pub use zresult::ZResult as Result;

/// A resolvable execution, either sync or async
pub trait Resolvable {
type To: Sized + Send;
}

/// Trick used to mark `<Resolve as IntoFuture>::IntoFuture` bound as Send
#[doc(hidden)]
pub trait IntoSendFuture: Resolvable {
type IntoFuture: Future<Output = Self::To> + Send;
}

impl<T> IntoSendFuture for T
where
T: Resolvable + IntoFuture<Output = Self::To>,
T::IntoFuture: Send,
{
type IntoFuture = T::IntoFuture;
}

/// Synchronous execution of a resolvable
pub trait Wait: Resolvable {
/// Synchronously execute and wait
fn wait(self) -> Self::To;
}

#[deprecated = "use `.await` directly instead"]
pub trait AsyncResolve: Resolvable {
type Future: Future<Output = <Self as Resolvable>::To> + Send;
type Future: Future<Output = Self::To> + Send;

fn res_async(self) -> Self::Future;

Expand All @@ -47,34 +69,67 @@ pub trait AsyncResolve: Resolvable {
}
}

#[allow(deprecated)]
impl<T> AsyncResolve for T
where
T: Resolvable + IntoFuture<Output = Self::To>,
T::IntoFuture: Send,
{
type Future = T::IntoFuture;

fn res_async(self) -> Self::Future {
self.into_future()
}
}

#[deprecated = "use `.wait()` instead`"]
pub trait SyncResolve: Resolvable {
fn res_sync(self) -> <Self as Resolvable>::To;
fn res_sync(self) -> Self::To;

fn res(self) -> <Self as Resolvable>::To
fn res(self) -> Self::To
where
Self: Sized,
{
self.res_sync()
}
}

#[allow(deprecated)]
impl<T> SyncResolve for T
where
T: Wait,
{
fn res_sync(self) -> Self::To {
self.wait()
}
}

/// Zenoh's trait for resolving builder patterns.
///
/// Builder patterns in Zenoh can be resolved with [`AsyncResolve`] in async context and [`SyncResolve`] in sync context.
/// In both async and sync context calling `.res()` resolves the builder.
/// `.res()` maps to `.res_async()` in async context.
/// `.res()` maps to `.res_sync()` in sync context.
/// We advise to prefer the usage of [`AsyncResolve`] and to use [`SyncResolve`] with caution.
#[must_use = "Resolvables do nothing unless you resolve them using `.res()`."]
pub trait Resolve<Output>: Resolvable<To = Output> + SyncResolve + AsyncResolve + Send {}
/// Builder patterns in Zenoh can be resolved by awaiting them, in async context,
/// and [`Wait::wait`] in sync context.
/// We advise to prefer the usage of asynchronous execution, and to use synchronous one with caution
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub trait Resolve<Output>:
Resolvable<To = Output>
+ Wait
+ IntoSendFuture
+ IntoFuture<IntoFuture = <Self as IntoSendFuture>::IntoFuture, Output = Output>
+ Send
{
}

impl<T, Output> Resolve<Output> for T where
T: Resolvable<To = Output> + SyncResolve + AsyncResolve + Send
T: Resolvable<To = Output>
+ Wait
+ IntoSendFuture
+ IntoFuture<IntoFuture = <Self as IntoSendFuture>::IntoFuture, Output = Output>
+ Send
{
}

// Closure to wait
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub struct ResolveClosure<C, To>(C)
where
To: Sized + Send,
Expand All @@ -98,30 +153,31 @@ where
type To = To;
}

impl<C, To> AsyncResolve for ResolveClosure<C, To>
impl<C, To> IntoFuture for ResolveClosure<C, To>
where
To: Sized + Send,
C: FnOnce() -> To + Send,
{
type Future = Ready<<Self as Resolvable>::To>;
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}

impl<C, To> SyncResolve for ResolveClosure<C, To>
impl<C, To> Wait for ResolveClosure<C, To>
where
To: Sized + Send,
C: FnOnce() -> To + Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
fn wait(self) -> <Self as Resolvable>::To {
self.0()
}
}

// Future to wait
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub struct ResolveFuture<F, To>(F)
where
To: Sized + Send,
Expand All @@ -145,24 +201,25 @@ where
type To = To;
}

impl<F, To> AsyncResolve for ResolveFuture<F, To>
impl<F, To> IntoFuture for ResolveFuture<F, To>
where
To: Sized + Send,
F: Future<Output = To> + Send,
{
type Future = F;
type Output = To;
type IntoFuture = F;

fn res_async(self) -> Self::Future {
fn into_future(self) -> Self::IntoFuture {
self.0
}
}

impl<F, To> SyncResolve for ResolveFuture<F, To>
impl<F, To> Wait for ResolveFuture<F, To>
where
To: Sized + Send,
F: Future<Output = To> + Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
fn wait(self) -> <Self as Resolvable>::To {
zenoh_runtime::ZRuntime::Application.block_in_place(self.0)
}
}
Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ macro_rules! zcondfeat {
#[macro_export]
macro_rules! ztimeout {
($f:expr) => {
tokio::time::timeout(TIMEOUT, $f).await.unwrap()
tokio::time::timeout(TIMEOUT, ::core::future::IntoFuture::into_future($f))
.await
.unwrap()
};
}
6 changes: 3 additions & 3 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use zenoh_core::{ResolveFuture, SyncResolve};
use zenoh_core::{ResolveFuture, Wait};
use zenoh_runtime::ZRuntime;

#[derive(Clone)]
Expand Down Expand Up @@ -111,7 +111,7 @@ impl TaskController {
/// The call blocks until all tasks yield or timeout duration expires.
/// Returns 0 in case of success, number of non terminated tasks otherwise.
pub fn terminate_all(&self, timeout: Duration) -> usize {
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).res_sync()
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).wait()
}

/// Async version of [`TaskController::terminate_all()`].
Expand Down Expand Up @@ -176,7 +176,7 @@ impl TerminatableTask {
/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).res_sync()
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
}

/// Async version of [`TerminatableTask::terminate()`].
Expand Down
8 changes: 4 additions & 4 deletions examples/examples/z_alloc_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -120,9 +120,9 @@ async fn run() -> ZResult<()> {
sbuf[0..8].fill(0);

// Declare Session and Publisher (common code)
let session = zenoh::open(Config::default()).res_async().await?;
let publisher = session.declare_publisher("my/key/expr").res_async().await?;
let session = zenoh::open(Config::default()).await?;
let publisher = session.declare_publisher("my/key/expr").await?;

// Publish SHM buffer
publisher.put(sbuf).res_async().await
publisher.put(sbuf).await
}
8 changes: 4 additions & 4 deletions examples/examples/z_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand All @@ -23,12 +23,12 @@ async fn main() {
let (config, key_expr) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Deleting resources matching '{key_expr}'...");
session.delete(&key_expr).res().await.unwrap();
session.delete(&key_expr).await.unwrap();

session.close().res().await.unwrap();
session.close().await.unwrap();
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
Loading

0 comments on commit fcb0545

Please sign in to comment.