Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace AsyncResolve with IntoFuture #942

Merged
merged 6 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 5 additions & 10 deletions ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;

#[tokio::main]
async fn main() {
Expand All @@ -23,15 +23,11 @@ async fn main() {
let sub_key_expr = KeyExpr::try_from("test/valgrind/**").unwrap();

println!("Declaring Publisher on '{pub_key_expr}'...");
let pub_session = zenoh::open(Config::default()).res().await.unwrap();
let publisher = pub_session
.declare_publisher(&pub_key_expr)
.res()
.await
.unwrap();
let pub_session = zenoh::open(Config::default()).await.unwrap();
let publisher = pub_session.declare_publisher(&pub_key_expr).await.unwrap();

println!("Declaring Subscriber on '{sub_key_expr}'...");
let sub_session = zenoh::open(Config::default()).res().await.unwrap();
let sub_session = zenoh::open(Config::default()).await.unwrap();
let _subscriber = sub_session
.declare_subscriber(&sub_key_expr)
.callback(|sample| {
Expand All @@ -45,15 +41,14 @@ async fn main() {
.unwrap_or_else(|e| format!("{}", e))
);
})
.res()
.await
.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] data");
println!("Putting Data ('{}': '{}')...", &pub_key_expr, buf);
publisher.put(buf).res().await.unwrap();
publisher.put(buf).await.unwrap();
}

tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
9 changes: 3 additions & 6 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::convert::TryFrom;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;

#[tokio::main]
async fn main() {
Expand All @@ -24,7 +24,7 @@ async fn main() {
let get_selector = Selector::try_from("test/valgrind/**").unwrap();

println!("Declaring Queryable on '{queryable_key_expr}'...");
let queryable_session = zenoh::open(Config::default()).res().await.unwrap();
let queryable_session = zenoh::open(Config::default()).await.unwrap();
let _queryable = queryable_session
.declare_queryable(queryable_key_expr.clone())
.callback(move |query| {
Expand All @@ -33,18 +33,16 @@ async fn main() {
zenoh_runtime::ZRuntime::Application.block_in_place(async move {
query
.reply(queryable_key_expr, query.value().unwrap().payload().clone())
.res()
.await
.unwrap();
});
})
.complete(true)
.res()
.await
.unwrap();

println!("Declaring Get session for '{get_selector}'...");
let get_session = zenoh::open(Config::default()).res().await.unwrap();
let get_session = zenoh::open(Config::default()).await.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -53,7 +51,6 @@ async fn main() {
.get(&get_selector)
.value(idx)
.target(QueryTarget::All)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
Expand Down
107 changes: 82 additions & 25 deletions commons/zenoh-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
pub use lazy_static::lazy_static;
pub mod macros;

use std::future::{Future, Ready};
use std::future::{Future, IntoFuture, Ready};

// Re-exports after moving ZError/ZResult to zenoh-result
pub use zenoh_result::{bail, to_zerror, zerror};
Expand All @@ -30,12 +30,34 @@ pub mod zresult {
pub use zresult::Error;
pub use zresult::ZResult as Result;

/// A resolvable execution, either sync or async
pub trait Resolvable {
type To: Sized + Send;
}

/// Trick used to mark `<Resolve as IntoFuture>::IntoFuture` bound as Send
#[doc(hidden)]
pub trait IntoSendFuture: Resolvable {
type IntoFuture: Future<Output = Self::To> + Send;
}

impl<T> IntoSendFuture for T
where
T: Resolvable + IntoFuture<Output = Self::To>,
T::IntoFuture: Send,
{
type IntoFuture = T::IntoFuture;
}

/// Synchronous execution of a resolvable
pub trait Wait: Resolvable {
/// Synchronously execute and wait
fn wait(self) -> Self::To;
}

#[deprecated = "use `.await` directly instead"]
pub trait AsyncResolve: Resolvable {
type Future: Future<Output = <Self as Resolvable>::To> + Send;
type Future: Future<Output = Self::To> + Send;

fn res_async(self) -> Self::Future;

Expand All @@ -47,34 +69,67 @@ pub trait AsyncResolve: Resolvable {
}
}

#[allow(deprecated)]
impl<T> AsyncResolve for T
where
T: Resolvable + IntoFuture<Output = Self::To>,
T::IntoFuture: Send,
{
type Future = T::IntoFuture;

fn res_async(self) -> Self::Future {
self.into_future()
}
}

#[deprecated = "use `.wait()` instead`"]
pub trait SyncResolve: Resolvable {
fn res_sync(self) -> <Self as Resolvable>::To;
fn res_sync(self) -> Self::To;

fn res(self) -> <Self as Resolvable>::To
fn res(self) -> Self::To
where
Self: Sized,
{
self.res_sync()
}
}

#[allow(deprecated)]
impl<T> SyncResolve for T
where
T: Wait,
{
fn res_sync(self) -> Self::To {
self.wait()
}
}

/// Zenoh's trait for resolving builder patterns.
///
/// Builder patterns in Zenoh can be resolved with [`AsyncResolve`] in async context and [`SyncResolve`] in sync context.
/// In both async and sync context calling `.res()` resolves the builder.
/// `.res()` maps to `.res_async()` in async context.
/// `.res()` maps to `.res_sync()` in sync context.
/// We advise to prefer the usage of [`AsyncResolve`] and to use [`SyncResolve`] with caution.
#[must_use = "Resolvables do nothing unless you resolve them using `.res()`."]
pub trait Resolve<Output>: Resolvable<To = Output> + SyncResolve + AsyncResolve + Send {}
/// Builder patterns in Zenoh can be resolved by awaiting them, in async context,
/// and [`Wait::wait`] in sync context.
/// We advise to prefer the usage of asynchronous execution, and to use synchronous one with caution
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub trait Resolve<Output>:
Resolvable<To = Output>
+ Wait
+ IntoSendFuture
+ IntoFuture<IntoFuture = <Self as IntoSendFuture>::IntoFuture, Output = Output>
+ Send
{
}

impl<T, Output> Resolve<Output> for T where
T: Resolvable<To = Output> + SyncResolve + AsyncResolve + Send
T: Resolvable<To = Output>
+ Wait
+ IntoSendFuture
+ IntoFuture<IntoFuture = <Self as IntoSendFuture>::IntoFuture, Output = Output>
+ Send
{
}

// Closure to wait
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub struct ResolveClosure<C, To>(C)
where
To: Sized + Send,
Expand All @@ -98,30 +153,31 @@ where
type To = To;
}

impl<C, To> AsyncResolve for ResolveClosure<C, To>
impl<C, To> IntoFuture for ResolveClosure<C, To>
where
To: Sized + Send,
C: FnOnce() -> To + Send,
{
type Future = Ready<<Self as Resolvable>::To>;
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}

impl<C, To> SyncResolve for ResolveClosure<C, To>
impl<C, To> Wait for ResolveClosure<C, To>
where
To: Sized + Send,
C: FnOnce() -> To + Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
fn wait(self) -> <Self as Resolvable>::To {
self.0()
}
}

// Future to wait
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub struct ResolveFuture<F, To>(F)
where
To: Sized + Send,
Expand All @@ -145,24 +201,25 @@ where
type To = To;
}

impl<F, To> AsyncResolve for ResolveFuture<F, To>
impl<F, To> IntoFuture for ResolveFuture<F, To>
where
To: Sized + Send,
F: Future<Output = To> + Send,
{
type Future = F;
type Output = To;
type IntoFuture = F;

fn res_async(self) -> Self::Future {
fn into_future(self) -> Self::IntoFuture {
self.0
}
}

impl<F, To> SyncResolve for ResolveFuture<F, To>
impl<F, To> Wait for ResolveFuture<F, To>
where
To: Sized + Send,
F: Future<Output = To> + Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
fn wait(self) -> <Self as Resolvable>::To {
zenoh_runtime::ZRuntime::Application.block_in_place(self.0)
}
}
Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ macro_rules! zcondfeat {
#[macro_export]
macro_rules! ztimeout {
($f:expr) => {
tokio::time::timeout(TIMEOUT, $f).await.unwrap()
tokio::time::timeout(TIMEOUT, ::core::future::IntoFuture::into_future($f))
.await
.unwrap()
};
}
6 changes: 3 additions & 3 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use zenoh_core::{ResolveFuture, SyncResolve};
use zenoh_core::{ResolveFuture, Wait};
use zenoh_runtime::ZRuntime;

#[derive(Clone)]
Expand Down Expand Up @@ -111,7 +111,7 @@ impl TaskController {
/// The call blocks until all tasks yield or timeout duration expires.
/// Returns 0 in case of success, number of non terminated tasks otherwise.
pub fn terminate_all(&self, timeout: Duration) -> usize {
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).res_sync()
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).wait()
}

/// Async version of [`TaskController::terminate_all()`].
Expand Down Expand Up @@ -176,7 +176,7 @@ impl TerminatableTask {
/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).res_sync()
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
}

/// Async version of [`TerminatableTask::terminate()`].
Expand Down
8 changes: 4 additions & 4 deletions examples/examples/z_alloc_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -120,9 +120,9 @@ async fn run() -> ZResult<()> {
sbuf[0..8].fill(0);

// Declare Session and Publisher (common code)
let session = zenoh::open(Config::default()).res_async().await?;
let publisher = session.declare_publisher("my/key/expr").res_async().await?;
let session = zenoh::open(Config::default()).await?;
let publisher = session.declare_publisher("my/key/expr").await?;

// Publish SHM buffer
publisher.put(sbuf).res_async().await
publisher.put(sbuf).await
}
8 changes: 4 additions & 4 deletions examples/examples/z_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand All @@ -23,12 +23,12 @@ async fn main() {
let (config, key_expr) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Deleting resources matching '{key_expr}'...");
session.delete(&key_expr).res().await.unwrap();
session.delete(&key_expr).await.unwrap();

session.close().res().await.unwrap();
session.close().await.unwrap();
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
Loading