Skip to content

Commit

Permalink
refactor: replace SyncResolve by Wait, deprecate old API
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Apr 26, 2024
1 parent 9f45e91 commit 9b41899
Show file tree
Hide file tree
Showing 46 changed files with 344 additions and 291 deletions.
85 changes: 66 additions & 19 deletions commons/zenoh-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,64 @@ pub mod zresult {
pub use zresult::Error;
pub use zresult::ZResult as Result;

/// A resolvable execution, either sync or async
pub trait Resolvable {
type To: Sized + Send;
}

/// Trick used to mark `<Resolve as IntoFuture>::IntoFuture` bound as Send
#[doc(hidden)]
pub trait IntoSendFuture: Resolvable {
type IntoFuture: Future<Output = Self::To> + Send;
}

impl<T> IntoSendFuture for T
where
T: Resolvable + IntoFuture<Output = Self::To>,
T::IntoFuture: Send,
{
type IntoFuture = T::IntoFuture;
}

/// Synchronous execution of a resolvable
pub trait Wait: Resolvable {
/// Synchronously execute and wait
fn wait(self) -> Self::To;
}

#[deprecated = "use `.await` directly instead"]
pub trait AsyncResolve: Resolvable {
type Future: Future<Output = <Self as Resolvable>::To> + Send;

fn res_async(self) -> Self::Future;

fn res(self) -> Self::Future
where
Self: Sized,
{
self.res_async()
}
}

#[allow(deprecated)]
impl<T> AsyncResolve for T
where
T: Resolvable + IntoFuture<Output = Self::To>,
T::IntoFuture: Send,
{
type Future = T::IntoFuture;

fn res_async(self) -> Self::Future {
self.into_future()
}
}

#[deprecated = "use `.wait()` instead`"]
pub trait SyncResolve: Resolvable {
// #[deprecated = "use `.wait()` instead, see `Wait::wait`"]
fn res_sync(self) -> <Self as Resolvable>::To;

// #[deprecated = "use `.wait()` instead, see `Wait::wait`"]
fn res(self) -> <Self as Resolvable>::To
where
Self: Sized,
Expand All @@ -57,34 +96,42 @@ pub trait SyncResolve: Resolvable {
}
}

#[allow(deprecated)]
impl<T> SyncResolve for T
where
T: Wait,
{
fn res_sync(self) -> Self::To {
self.wait()
}
}

/// Zenoh's trait for resolving builder patterns.
///
/// Builder patterns in Zenoh can be resolved with [`AsyncResolve`] in async context and [`SyncResolve`] in sync context.
/// In both async and sync context calling `.res()` resolves the builder.
/// `.res()` maps to `` in async context.
/// `.res()` maps to `.res_sync()` in sync context.
/// We advise to prefer the usage of [`AsyncResolve`] and to use [`SyncResolve`] with caution.
#[must_use = "Resolvables do nothing unless you resolve them using `.res()`."]
/// Builder patterns in Zenoh can be resolved by awaiting them, in async context,
/// and [`Wait::wait`] in sync context.
/// We advise to prefer the usage of asynchronous execution, and to use synchronous one with caution
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub trait Resolve<Output>:
Resolvable<To = Output>
+ SyncResolve
+ AsyncResolve
+ IntoFuture<IntoFuture = <Self as AsyncResolve>::Future, Output = Output>
+ Wait
+ IntoSendFuture
+ IntoFuture<IntoFuture = <Self as IntoSendFuture>::IntoFuture, Output = Output>
+ Send
{
}

impl<T, Output> Resolve<Output> for T where
T: Resolvable<To = Output>
+ SyncResolve
+ AsyncResolve
+ IntoFuture<IntoFuture = <Self as AsyncResolve>::Future, Output = Output>
+ Wait
+ IntoSendFuture
+ IntoFuture<IntoFuture = <Self as IntoSendFuture>::IntoFuture, Output = Output>
+ Send
{
}

// Closure to wait
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub struct ResolveClosure<C, To>(C)
where
To: Sized + Send,
Expand Down Expand Up @@ -117,22 +164,22 @@ where
type IntoFuture = Ready<<Self as Resolvable>::To>;

fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.res_sync())
std::future::ready(self.wait())
}
}

impl<C, To> SyncResolve for ResolveClosure<C, To>
impl<C, To> Wait for ResolveClosure<C, To>
where
To: Sized + Send,
C: FnOnce() -> To + Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
fn wait(self) -> <Self as Resolvable>::To {
self.0()
}
}

// Future to wait
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or synchronous `.wait()` method"]
pub struct ResolveFuture<F, To>(F)
where
To: Sized + Send,
Expand Down Expand Up @@ -169,12 +216,12 @@ where
}
}

impl<F, To> SyncResolve for ResolveFuture<F, To>
impl<F, To> Wait for ResolveFuture<F, To>
where
To: Sized + Send,
F: Future<Output = To> + Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
fn wait(self) -> <Self as Resolvable>::To {
zenoh_runtime::ZRuntime::Application.block_in_place(self.0)
}
}
Expand Down
6 changes: 3 additions & 3 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use zenoh_core::{ResolveFuture, SyncResolve};
use zenoh_core::{ResolveFuture, Wait};
use zenoh_runtime::ZRuntime;

#[derive(Clone)]
Expand Down Expand Up @@ -111,7 +111,7 @@ impl TaskController {
/// The call blocks until all tasks yield or timeout duration expires.
/// Returns 0 in case of success, number of non terminated tasks otherwise.
pub fn terminate_all(&self, timeout: Duration) -> usize {
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).res_sync()
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).wait()
}

/// Async version of [`TaskController::terminate_all()`].
Expand Down Expand Up @@ -176,7 +176,7 @@ impl TerminatableTask {
/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).res_sync()
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
}

/// Async version of [`TerminatableTask::terminate()`].
Expand Down
10 changes: 5 additions & 5 deletions examples/examples/z_ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ fn main() {
zenoh_util::init_log_from_env();

let (config, warmup, size, n, express) = parse_args();
let session = zenoh::open(config).res().unwrap();
let session = zenoh::open(config).wait().unwrap();

// The key expression to publish data on
let key_expr_ping = keyexpr::new("test/ping").unwrap();

// The key expression to wait the response back
let key_expr_pong = keyexpr::new("test/pong").unwrap();

let sub = session.declare_subscriber(key_expr_pong).res().unwrap();
let sub = session.declare_subscriber(key_expr_pong).wait().unwrap();
let publisher = session
.declare_publisher(key_expr_ping)
.congestion_control(CongestionControl::Block)
.express(express)
.res()
.wait()
.unwrap();

let data: Payload = (0usize..size)
Expand All @@ -51,15 +51,15 @@ fn main() {
let now = Instant::now();
while now.elapsed() < warmup {
let data = data.clone();
publisher.put(data).res().unwrap();
publisher.put(data).wait().unwrap();

let _ = sub.recv();
}

for _ in 0..n {
let data = data.clone();
let write_time = Instant::now();
publisher.put(data).res().unwrap();
publisher.put(data).wait().unwrap();

let _ = sub.recv();
let ts = write_time.elapsed().as_micros();
Expand Down
8 changes: 4 additions & 4 deletions examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn main() {

let (config, express) = parse_args();

let session = zenoh::open(config).res().unwrap().into_arc();
let session = zenoh::open(config).wait().unwrap().into_arc();

// The key expression to read the data from
let key_expr_ping = keyexpr::new("test/ping").unwrap();
Expand All @@ -35,13 +35,13 @@ fn main() {
.declare_publisher(key_expr_pong)
.congestion_control(CongestionControl::Block)
.express(express)
.res()
.wait()
.unwrap();

let _sub = session
.declare_subscriber(key_expr_ping)
.callback(move |sample| publisher.put(sample.payload().clone()).res().unwrap())
.res()
.callback(move |sample| publisher.put(sample.payload().clone()).wait().unwrap())
.wait()
.unwrap();
std::thread::park();
}
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ fn main() {
.collect::<Vec<u8>>()
.into();

let session = zenoh::open(args.common).res().unwrap();
let session = zenoh::open(args.common).wait().unwrap();

let publisher = session
.declare_publisher("test/thr")
.congestion_control(CongestionControl::Block)
.priority(prio)
.express(args.express)
.res()
.wait()
.unwrap();

println!("Press CTRL-C to quit...");
let mut count: usize = 0;
let mut start = std::time::Instant::now();
loop {
publisher.put(data.clone()).res().unwrap();
publisher.put(data.clone()).wait().unwrap();

if args.print {
if count < args.number {
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn main() {
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let session = zenoh::open(config).res().unwrap();
let session = zenoh::open(config).wait().unwrap();

let key_expr = "test/thr";

Expand All @@ -91,7 +91,7 @@ fn main() {
std::process::exit(0)
}
})
.res()
.wait()
.unwrap();

println!("Press CTRL-C to quit...");
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, SyncResolve};
use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::transport::BatchSize;
use zenoh_runtime::ZRuntime;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl UnicastPipeListener {

fn stop_listening(self) {
self.token.cancel();
let _ = ResolveFuture::new(self.handle).res_sync();
let _ = ResolveFuture::new(self.handle).wait();
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::{
};
use tracing::{debug, info};
use zenoh::plugins::{RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::runtime::Runtime;
use zenoh_core::zlock;
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tide::sse::Sender;
use tide::{Request, Response, Server, StatusCode};
use zenoh::payload::StringOrBase64;
use zenoh::plugins::{RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::query::{QueryConsolidation, Reply};
use zenoh::runtime::Runtime;
use zenoh::selector::TIME_RANGE_KEY;
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl StorageRuntimeInner {
let plugins_manager = PluginsManager::dynamic(lib_loader.clone(), BACKEND_LIB_PREFIX)
.declare_static_plugin::<MemoryBackend>();

let session = Arc::new(zenoh::init(runtime.clone()).res_sync()?);
let session = Arc::new(zenoh::init(runtime.clone()).wait()?);

// After this moment result should be only Ok. Failure of loading of one voulme or storage should not affect others.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use async_std::sync::RwLock;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::time::Timestamp;
use zenoh_backend_traits::config::{StorageConfig, VolumeConfig};
use zenoh_backend_traits::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::str;
use std::str::FromStr;
use zenoh::payload::StringOrBase64;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::time::Timestamp;
use zenoh::Session;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
use std::str;
use zenoh::key_expr::{KeyExpr, OwnedKeyExpr};
use zenoh::payload::StringOrBase64;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::sample::builder::SampleBuilder;
use zenoh::time::Timestamp;
use zenoh::Session;
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::str::FromStr;
use std::time::{Duration, SystemTime};
use urlencoding::encode;
use zenoh::payload::StringOrBase64;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::time::Timestamp;
use zenoh::Session;
use zenoh_backend_traits::config::{ReplicaConfig, StorageConfig};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::str::{self, FromStr};
use std::time::{SystemTime, UNIX_EPOCH};
use zenoh::buffers::buffer::SplitBuffer;
use zenoh::buffers::ZBuf;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::query::{ConsolidationMode, QueryTarget};
use zenoh::sample::builder::SampleBuilder;
use zenoh::sample::{Sample, SampleKind};
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-storage-manager/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::thread::sleep;

use async_std::task;
use zenoh::payload::StringOrBase64;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::query::Reply;
use zenoh::{prelude::Config, time::Timestamp};
use zenoh_core::zasync_executor_init;
Expand Down
Loading

0 comments on commit 9b41899

Please sign in to comment.