Skip to content

Commit

Permalink
Merge branch 'master' of github.com:eclipse-zenoh/zenoh into plugin_p…
Browse files Browse the repository at this point in the history
…rivates
  • Loading branch information
DariusIMP committed Nov 27, 2023
2 parents 86f512a + 8ebf8b8 commit f51b337
Show file tree
Hide file tree
Showing 14 changed files with 1,092 additions and 55 deletions.
10 changes: 6 additions & 4 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,14 @@
z_storage -k demo/**
```

### z_pub_shm & z_sub_shm
### z_pub_shm & z_sub

A pub/sub example involving the shared-memory feature.
Note that on subscriber side, the same `z_sub` example than for non-shared-memory example is used.

Typical Subscriber usage:
```bash
z_sub_shm
z_sub
```

Typical Publisher usage:
Expand Down Expand Up @@ -188,16 +189,17 @@
z_ping 1024
```

### z_pub_shm_thr & z_sub_shm_thr
### z_pub_shm_thr & z_sub_thr

Pub/Sub throughput test involving the shared-memory feature.
This example allows performing throughput measurements between a publisher performing
put operations with the shared-memory feature and a subscriber receiving notifications
of those puts.
Note that on subscriber side, the same `z_sub_thr` example than for non-shared-memory example is used.

Typical Subscriber usage:
```bash
z_sub_shm_thr
z_sub_thr
```

Typical Publisher usage:
Expand Down
9 changes: 7 additions & 2 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ async fn main() -> Result<(), zenoh::Error> {
// Initiate logging
env_logger::init();

let (config, path, value) = parse_args();
let (mut config, path, value) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand All @@ -39,6 +44,7 @@ async fn main() -> Result<(), zenoh::Error> {
let publisher = session.declare_publisher(&path).res().await.unwrap();

for idx in 0..(K * N as u32) {
sleep(Duration::from_secs(1)).await;
let mut sbuf = match shm.alloc(1024) {
Ok(buf) => buf,
Err(_) => {
Expand Down Expand Up @@ -88,7 +94,6 @@ async fn main() -> Result<(), zenoh::Error> {
let defrag = shm.defragment();
println!("De-framented {defrag} bytes");
}
// sleep(Duration::from_millis(100)).await;
// Dropping the SharedMemoryBuf means to free it.
drop(sbuf);
}
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ use zenoh::shm::SharedMemoryManager;
async fn main() {
// initiate logging
env_logger::init();
let (config, sm_size, size) = parse_args();
let (mut config, sm_size, size) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let z = zenoh::open(config).res().await.unwrap();
let id = z.zid();
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr) = parse_args();
let (mut config, key_expr) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ fn main() {
// initiate logging
env_logger::init();

let (config, m, n) = parse_args();
let (mut config, m, n) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let session = zenoh::open(config).res().unwrap();

Expand Down
61 changes: 40 additions & 21 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,9 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {

let key_expr = conf.key_expr?;

// declare subscriber at first
// register fetch handler
let handler = register_handler(state.clone(), callback.clone());
// declare subscriber
let subscriber = match conf.session.clone() {
SessionRef::Borrow(session) => match conf.key_space.into() {
crate::KeySpace::User => session
Expand Down Expand Up @@ -707,15 +709,15 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
},
};

let mut fetch_subscriber = FetchingSubscriber {
let fetch_subscriber = FetchingSubscriber {
subscriber,
callback,
state,
receiver,
};

// start fetch
fetch_subscriber.fetch(conf.fetch).res_sync()?;
// run fetch
run_fetch(conf.fetch, handler)?;

Ok(fetch_subscriber)
}
Expand Down Expand Up @@ -776,7 +778,7 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
>(
&mut self,
&self,
fetch: Fetch,
) -> impl Resolve<ZResult<()>>
where
Expand Down Expand Up @@ -882,22 +884,8 @@ where
<TryIntoSample as TryInto<Sample>>::Error: Into<zenoh_core::Error>,
{
fn res_sync(self) -> <Self as Resolvable>::To {
zlock!(self.state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
let handler = RepliesHandler {
state: self.state,
callback: self.callback,
};

log::debug!("Fetch");
(self.fetch)(Box::new(move |s: TryIntoSample| match s.try_into() {
Ok(s) => {
let mut state = zlock!(handler.state);
log::trace!("Fetched sample received: push it to merge_queue");
state.merge_queue.push(s);
}
Err(e) => log::debug!("Received error fetching data: {}", e.into()),
}))
let handler = register_handler(self.state, self.callback);
run_fetch(self.fetch, handler)
}
}

Expand All @@ -913,3 +901,34 @@ where
std::future::ready(self.res_sync())
}
}

fn register_handler(
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
) -> RepliesHandler {
zlock!(state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
RepliesHandler { state, callback }
}

fn run_fetch<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
fetch: Fetch,
handler: RepliesHandler,
) -> ZResult<()>
where
TryIntoSample: TryInto<Sample>,
<TryIntoSample as TryInto<Sample>>::Error: Into<zenoh_core::Error>,
{
log::debug!("Fetch data for FetchingSubscriber");
(fetch)(Box::new(move |s: TryIntoSample| match s.try_into() {
Ok(s) => {
let mut state = zlock!(handler.state);
log::trace!("Fetched sample received: push it to merge_queue");
state.merge_queue.push(s);
}
Err(e) => log::debug!("Received error fetching data: {}", e.into()),
}))
}
11 changes: 10 additions & 1 deletion zenoh/src/net/routing/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct FaceState {
pub(super) id: usize,
pub(super) zid: ZenohId,
pub(super) whatami: WhatAmI,
pub(super) local: bool,
#[cfg(feature = "stats")]
pub(super) stats: Option<Arc<TransportStats>>,
pub(super) primitives: Arc<dyn Primitives + Send + Sync>,
Expand All @@ -47,10 +48,12 @@ pub struct FaceState {
}

impl FaceState {
pub(super) fn new(
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
id: usize,
zid: ZenohId,
whatami: WhatAmI,
local: bool,
#[cfg(feature = "stats")] stats: Option<Arc<TransportStats>>,
primitives: Arc<dyn Primitives + Send + Sync>,
link_id: usize,
Expand All @@ -60,6 +63,7 @@ impl FaceState {
id,
zid,
whatami,
local,
#[cfg(feature = "stats")]
stats,
primitives,
Expand All @@ -76,6 +80,11 @@ impl FaceState {
})
}

#[inline]
pub fn is_local(&self) -> bool {
self.local
}

#[inline]
#[allow(clippy::trivially_copy_pass_by_ref)]
pub(super) fn get_mapping(
Expand Down
54 changes: 33 additions & 21 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,10 @@ pub(super) fn compute_data_routes_(tables: &Tables, res: &Arc<Resource>) -> Data
compute_data_route(tables, &mut expr, Some(idx.index()), WhatAmI::Router);
}

routes.peer_data_route = Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
if !tables.full_net(WhatAmI::Peer) {
routes.peer_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
}
}
if (tables.whatami == WhatAmI::Router || tables.whatami == WhatAmI::Peer)
&& tables.full_net(WhatAmI::Peer)
Expand Down Expand Up @@ -1479,8 +1482,10 @@ pub(crate) fn compute_data_routes(tables: &mut Tables, res: &mut Arc<Resource>)
compute_data_route(tables, &mut expr, Some(idx.index()), WhatAmI::Router);
}

res_mut.context_mut().peer_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
if !tables.full_net(WhatAmI::Peer) {
res_mut.context_mut().peer_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
}
}
if (tables.whatami == WhatAmI::Router || tables.whatami == WhatAmI::Peer)
&& tables.full_net(WhatAmI::Peer)
Expand Down Expand Up @@ -1593,75 +1598,75 @@ macro_rules! treat_timestamp {
}

#[inline]
fn get_data_route(
pub(crate) fn get_data_route(
tables: &Tables,
face: &FaceState,
whatami: WhatAmI,
link_id: usize,
res: &Option<Arc<Resource>>,
expr: &mut RoutingExpr,
routing_context: u64,
) -> Arc<Route> {
match tables.whatami {
WhatAmI::Router => match face.whatami {
WhatAmI::Router => match whatami {
WhatAmI::Router => {
let routers_net = tables.routers_net.as_ref().unwrap();
let local_context = routers_net.get_local_context(routing_context, face.link_id);
let local_context = routers_net.get_local_context(routing_context, link_id);
res.as_ref()
.and_then(|res| res.routers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, Some(local_context), whatami)
})
}
WhatAmI::Peer => {
if tables.full_net(WhatAmI::Peer) {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context = peers_net.get_local_context(routing_context, face.link_id);
let local_context = peers_net.get_local_context(routing_context, link_id);
res.as_ref()
.and_then(|res| res.peers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, Some(local_context), whatami)
})
} else {
res.as_ref()
.and_then(|res| res.peer_data_route())
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami))
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami))
}
}
_ => res
.as_ref()
.and_then(|res| res.routers_data_route(0))
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)),
},
WhatAmI::Peer => {
if tables.full_net(WhatAmI::Peer) {
match face.whatami {
match whatami {
WhatAmI::Router | WhatAmI::Peer => {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context =
peers_net.get_local_context(routing_context, face.link_id);
let local_context = peers_net.get_local_context(routing_context, link_id);
res.as_ref()
.and_then(|res| res.peers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, Some(local_context), whatami)
})
}
_ => res
.as_ref()
.and_then(|res| res.peers_data_route(0))
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)),
}
} else {
res.as_ref()
.and_then(|res| match face.whatami {
.and_then(|res| match whatami {
WhatAmI::Client => res.client_data_route(),
_ => res.peer_data_route(),
})
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami))
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami))
}
}
_ => res
.as_ref()
.and_then(|res| res.client_data_route())
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)),
}
}

Expand Down Expand Up @@ -1779,7 +1784,14 @@ pub fn full_reentrant_route_data(
== *tables.elect_router(expr.full_expr(), tables.get_router_links(face.zid))
{
let res = Resource::get_resource(&prefix, expr.suffix);
let route = get_data_route(&tables, face, &res, &mut expr, routing_context);
let route = get_data_route(
&tables,
face.whatami,
face.link_id,
&res,
&mut expr,
routing_context,
);
let matching_pulls = get_matching_pulls(&tables, &res, &mut expr);

if !(route.is_empty() && matching_pulls.is_empty()) {
Expand Down
Loading

0 comments on commit f51b337

Please sign in to comment.