Skip to content

Commit

Permalink
refactor: replace AsyncResolve with IntoFuture
Browse files Browse the repository at this point in the history
Into the future with `IntoFuture`! :D
  • Loading branch information
wyfo committed Apr 17, 2024
1 parent a675130 commit fc7404e
Show file tree
Hide file tree
Showing 59 changed files with 673 additions and 845 deletions.
13 changes: 4 additions & 9 deletions ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ async fn main() {
let sub_key_expr = KeyExpr::try_from("test/valgrind/**").unwrap();

println!("Declaring Publisher on '{pub_key_expr}'...");
let pub_session = zenoh::open(Config::default()).res().await.unwrap();
let publisher = pub_session
.declare_publisher(&pub_key_expr)
.res()
.await
.unwrap();
let pub_session = zenoh::open(Config::default()).await.unwrap();
let publisher = pub_session.declare_publisher(&pub_key_expr).await.unwrap();

println!("Declaring Subscriber on '{sub_key_expr}'...");
let sub_session = zenoh::open(Config::default()).res().await.unwrap();
let sub_session = zenoh::open(Config::default()).await.unwrap();
let _subscriber = sub_session
.declare_subscriber(&sub_key_expr)
.callback(|sample| {
Expand All @@ -48,15 +44,14 @@ async fn main() {
.unwrap_or_else(|e| format!("{}", e))
);
})
.res()
.await
.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] data");
println!("Putting Data ('{}': '{}')...", &pub_key_expr, buf);
publisher.put(buf).res().await.unwrap();
publisher.put(buf).await.unwrap();
}

tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
7 changes: 2 additions & 5 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() {
let get_selector = Selector::try_from("test/valgrind/**").unwrap();

println!("Declaring Queryable on '{queryable_key_expr}'...");
let queryable_session = zenoh::open(Config::default()).res().await.unwrap();
let queryable_session = zenoh::open(Config::default()).await.unwrap();
let _queryable = queryable_session
.declare_queryable(&queryable_key_expr.clone())
.callback(move |query| {
Expand All @@ -38,18 +38,16 @@ async fn main() {
query.selector().key_expr(),
query.value().unwrap().payload().clone(),
)
.res()
.await
.unwrap();
});
})
.complete(true)
.res()
.await
.unwrap();

println!("Declaring Get session for '{get_selector}'...");
let get_session = zenoh::open(Config::default()).res().await.unwrap();
let get_session = zenoh::open(Config::default()).await.unwrap();

for idx in 0..5 {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -58,7 +56,6 @@ async fn main() {
.get(&get_selector)
.value(idx)
.target(QueryTarget::All)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
Expand Down
48 changes: 30 additions & 18 deletions commons/zenoh-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
pub use lazy_static::lazy_static;
pub mod macros;

use std::future::{Future, Ready};
use std::future::{Future, IntoFuture, Ready};

// Re-exports after moving ZError/ZResult to zenoh-result
pub use zenoh_result::{bail, to_zerror, zerror};
Expand All @@ -36,15 +36,14 @@ pub trait Resolvable {

pub trait AsyncResolve: Resolvable {
type Future: Future<Output = <Self as Resolvable>::To> + Send;
}

fn res_async(self) -> Self::Future;

fn res(self) -> Self::Future
where
Self: Sized,
{
self.res_async()
}
impl<T> AsyncResolve for T
where
T: Resolvable + IntoFuture<Output = Self::To>,
T::IntoFuture: Send,
{
type Future = T::IntoFuture;
}

pub trait SyncResolve: Resolvable {
Expand All @@ -62,14 +61,25 @@ pub trait SyncResolve: Resolvable {
///
/// Builder patterns in Zenoh can be resolved with [`AsyncResolve`] in async context and [`SyncResolve`] in sync context.
/// In both async and sync context calling `.res()` resolves the builder.
/// `.res()` maps to `.res_async()` in async context.
/// `.res()` maps to `` in async context.
/// `.res()` maps to `.res_sync()` in sync context.
/// We advise to prefer the usage of [`AsyncResolve`] and to use [`SyncResolve`] with caution.
#[must_use = "Resolvables do nothing unless you resolve them using `.res()`."]
pub trait Resolve<Output>: Resolvable<To = Output> + SyncResolve + AsyncResolve + Send {}
pub trait Resolve<Output>:
Resolvable<To = Output>
+ SyncResolve
+ AsyncResolve
+ IntoFuture<IntoFuture = <Self as AsyncResolve>::Future, Output = Output>
+ Send
{
}

impl<T, Output> Resolve<Output> for T where
T: Resolvable<To = Output> + SyncResolve + AsyncResolve + Send
T: Resolvable<To = Output>
+ SyncResolve
+ AsyncResolve
+ IntoFuture<IntoFuture = <Self as AsyncResolve>::Future, Output = Output>
+ Send
{
}

Expand Down Expand Up @@ -98,14 +108,15 @@ where
type To = To;
}

impl<C, To> AsyncResolve for ResolveClosure<C, To>
impl<C, To> IntoFuture for ResolveClosure<C, To>
where
To: Sized + Send,
C: FnOnce() -> To + Send,
{
type Future = Ready<<Self as Resolvable>::To>;
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

fn res_async(self) -> Self::Future {
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.res_sync())
}
}
Expand Down Expand Up @@ -145,14 +156,15 @@ where
type To = To;
}

impl<F, To> AsyncResolve for ResolveFuture<F, To>
impl<F, To> IntoFuture for ResolveFuture<F, To>
where
To: Sized + Send,
F: Future<Output = To> + Send,
{
type Future = F;
type Output = To;
type IntoFuture = F;

fn res_async(self) -> Self::Future {
fn into_future(self) -> Self::IntoFuture {
self.0
}
}
Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ macro_rules! zcondfeat {
#[macro_export]
macro_rules! ztimeout {
($f:expr) => {
tokio::time::timeout(TIMEOUT, $f).await.unwrap()
tokio::time::timeout(TIMEOUT, ::core::future::IntoFuture::into_future($f))
.await
.unwrap()
};
}
6 changes: 3 additions & 3 deletions examples/examples/z_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ async fn main() {
let (config, key_expr) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Deleting resources matching '{key_expr}'...");
session.delete(&key_expr).res().await.unwrap();
session.delete(&key_expr).await.unwrap();

session.close().res().await.unwrap();
session.close().await.unwrap();
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ async fn main() {
let (config, key_expr, forward) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Declaring Subscriber on '{key_expr}'...");
let mut subscriber = session.declare_subscriber(&key_expr).res().await.unwrap();
let mut subscriber = session.declare_subscriber(&key_expr).await.unwrap();
println!("Declaring Publisher on '{forward}'...");
let publisher = session.declare_publisher(&forward).res().await.unwrap();
let publisher = session.declare_publisher(&forward).await.unwrap();
println!("Forwarding data from '{key_expr}' to '{forward}'...");
subscriber.forward(publisher).await.unwrap();
}
Expand Down
3 changes: 1 addition & 2 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() {
let (config, selector, value, target, timeout) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Sending Query '{selector}'...");
let replies = session
Expand All @@ -37,7 +37,6 @@ async fn main() {
.value(value)
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
Expand Down
3 changes: 1 addition & 2 deletions examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ async fn main() {
let (config, key_expr, timeout) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Sending Liveliness Query '{key_expr}'...");
let replies = session
.liveliness()
.get(&key_expr)
.timeout(timeout)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
Expand Down
8 changes: 4 additions & 4 deletions examples/examples/z_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ async fn main() {
let config = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

let info = session.info();
println!("zid: {}", info.zid().res().await);
println!("zid: {}", info.zid().await);
println!(
"routers zid: {:?}",
info.routers_zid().res().await.collect::<Vec<ZenohId>>()
info.routers_zid().await.collect::<Vec<ZenohId>>()
);
println!(
"peers zid: {:?}",
info.peers_zid().res().await.collect::<Vec<ZenohId>>()
info.peers_zid().await.collect::<Vec<ZenohId>>()
);
}

Expand Down
13 changes: 3 additions & 10 deletions examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,18 @@ async fn main() {
let (config, key_expr) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Declaring LivelinessToken on '{}'...", &key_expr);
let mut token = Some(
session
.liveliness()
.declare_token(&key_expr)
.res()
.await
.unwrap(),
);
let mut token = Some(session.liveliness().declare_token(&key_expr).await.unwrap());

println!("Press CTRL-C to undeclare LivelinessToken and quit...");
std::thread::park();
// LivelinessTokens are automatically closed when dropped
// Use the code below to manually undeclare it if needed
if let Some(token) = token.take() {
println!("Undeclaring LivelinessToken...");
token.undeclare().res().await.unwrap();
token.undeclare().await.unwrap();
};
}

Expand Down
11 changes: 3 additions & 8 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,17 @@ async fn main() {
let (config, key_expr, value, attachment) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Declaring Publisher on '{key_expr}'...");
let publisher = session.declare_publisher(&key_expr).res().await.unwrap();
let publisher = session.declare_publisher(&key_expr).await.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
publisher
.put(buf)
.attachment(&attachment)
.res()
.await
.unwrap();
publisher.put(buf).attachment(&attachment).await.unwrap();
}
}

Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ async fn main() -> Result<(), zenoh::Error> {
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Creating Shared Memory Manager...");
let id = session.zid();
let mut shm = SharedMemoryManager::make(id.to_string(), N * 1024).unwrap();

println!("Allocating Shared Memory Buffer...");
let publisher = session.declare_publisher(&path).res().await.unwrap();
let publisher = session.declare_publisher(&path).await.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..(K * N as u32) {
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn main() -> Result<(), zenoh::Error> {
path,
String::from_utf8_lossy(&slice[0..slice_len])
);
publisher.put(sbuf.clone()).res().await?;
publisher.put(sbuf.clone()).await?;
if idx % K == 0 {
let freed = shm.garbage_collect();
println!("The Gargabe collector freed {freed} bytes");
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() {
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let z = zenoh::open(config).res().await.unwrap();
let z = zenoh::open(config).await.unwrap();
let id = z.zid();
let mut shm = SharedMemoryManager::make(id.to_string(), sm_size).unwrap();
let mut buf = shm.alloc(size).unwrap();
Expand All @@ -40,11 +40,11 @@ async fn main() {

let publisher = z.declare_publisher("test/thr")
// Make sure to not drop messages because of congestion control
.congestion_control(CongestionControl::Block).res().await.unwrap();
.congestion_control(CongestionControl::Block).await.unwrap();

println!("Press CTRL-C to quit...");
loop {
publisher.put(buf.clone()).res().await.unwrap();
publisher.put(buf.clone()).await.unwrap();
}
}

Expand Down
3 changes: 1 addition & 2 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ async fn main() {
let (config, key_expr, size, interval) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).await.unwrap();

println!("Declaring Subscriber on '{key_expr}'...");
let subscriber = session
.declare_subscriber(&key_expr)
.with(RingChannel::new(size))
.res()
.await
.unwrap();

Expand Down
Loading

0 comments on commit fc7404e

Please sign in to comment.