Skip to content

Commit

Permalink
Router reorg (#587)
Browse files Browse the repository at this point in the history
* Code move

* Code move

* Use RoutingContext type

* Structs split

* Renaming

* Visibility

* Move ingress/egress filters out of pubsub

* Make hat abstract

* Abstract missing close_face fn

* Duplicate hat

* Move Primitives

* Move link_id into HatFace

* Change face initialization

* Interceptors

* Interceptor types renaming

* Rename RoutingContext

* Add RoutingContext and LoggerInterceptor

* Interceptors can access the Config at construction

* Split linkstate and p2p peer hats

* Simplify HatTrait init function

* Hats cleanup

* Reintroduce routes precomputation

* Improve routes precomputation

* Reintroduce matching pulls precomputation

* Perf improvements

* Perf improvements

* Remove files wrongly reintroduced by merge

* Fix complete_n build

* Remove useless checks

* Fix OAM handling

* Remove commented code

* Simplified routes computation hats api

* Move matching pulls computation out of hats

* Fix query routes update

* Fix copy-paste error

* Renaming

* Add missing query routes deactivations

* Refactor code

* Improve perfromances

* Remove unnecessary clippy alllows

* Code format

* Rename TREES_COMPUTATION_DELAY constant

* Address review comment

* Reexpose subsribers and queryables in adminspace

* Reexpose linkstate graphs in adminspace

* Fix bug propagating subscriptions

* Reintroduce tables tests

* Remove unneeded clippy allows

* Improve TODO comments

* Improve TODO comments
  • Loading branch information
OlivierHecart authored Jan 31, 2024
1 parent 71e224e commit 5601669
Show file tree
Hide file tree
Showing 42 changed files with 12,392 additions and 6,262 deletions.
1 change: 0 additions & 1 deletion io/zenoh-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
pub mod common;
pub mod manager;
pub mod multicast;
pub mod primitives;
pub mod unicast;

#[cfg(feature = "stats")]
Expand Down
58 changes: 0 additions & 58 deletions io/zenoh-transport/src/primitives/demux.rs

This file was deleted.

130 changes: 0 additions & 130 deletions io/zenoh-transport/src/primitives/mux.rs

This file was deleted.

3 changes: 1 addition & 2 deletions zenoh/src/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ use zenoh_protocol::{
network::{declare, DeclareBody, Mapping, UndeclareKeyExpr},
};
use zenoh_result::ZResult;
use zenoh_transport::primitives::Primitives;

use crate::{prelude::Selector, Session, Undeclarable};
use crate::{net::primitives::Primitives, prelude::Selector, Session, Undeclarable};

#[derive(Clone, Debug)]
pub(crate) enum KeyExprInner<'a> {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#[doc(hidden)]
pub(crate) mod codec;
#[doc(hidden)]
pub(crate) mod primitives;
#[doc(hidden)]
pub(crate) mod protocol;
#[doc(hidden)]
pub(crate) mod routing;
#[doc(hidden)]
pub mod runtime;
#[doc(hidden)]
pub(crate) use zenoh_transport as transport;

#[cfg(test)]
pub(crate) mod tests;
95 changes: 95 additions & 0 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use super::Primitives;
use crate::net::routing::{
dispatcher::face::Face,
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
};
use std::any::Any;
use zenoh_link::Link;
use zenoh_protocol::network::{NetworkBody, NetworkMessage};
use zenoh_result::ZResult;
use zenoh_transport::unicast::TransportUnicast;
use zenoh_transport::TransportPeerEventHandler;

pub struct DeMux {
face: Face,
pub(crate) transport: Option<TransportUnicast>,
pub(crate) interceptor: InterceptorsChain,
}

impl DeMux {
pub(crate) fn new(
face: Face,
transport: Option<TransportUnicast>,
interceptor: InterceptorsChain,
) -> Self {
Self {
face,
transport,
interceptor,
}
}
}

impl TransportPeerEventHandler for DeMux {
#[inline]
fn handle_message(&self, mut msg: NetworkMessage) -> ZResult<()> {
if !self.interceptor.interceptors.is_empty() {
let ctx = RoutingContext::new_in(msg, self.face.clone());
let ctx = match self.interceptor.intercept(ctx) {
Some(ctx) => ctx,
None => return Ok(()),
};
msg = ctx.msg;
}

match msg.body {
NetworkBody::Push(m) => self.face.send_push(m),
NetworkBody::Declare(m) => self.face.send_declare(m),
NetworkBody::Request(m) => self.face.send_request(m),
NetworkBody::Response(m) => self.face.send_response(m),
NetworkBody::ResponseFinal(m) => self.face.send_response_final(m),
NetworkBody::OAM(m) => {
if let Some(transport) = self.transport.as_ref() {
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
ctrl_lock.handle_oam(&mut tables, &self.face.tables, m, transport)?
}
}
}

Ok(())
}

fn new_link(&self, _link: Link) {}

fn del_link(&self, _link: Link) {}

fn closing(&self) {
self.face.send_close();
if let Some(transport) = self.transport.as_ref() {
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport);
}
}

fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub use demux::*;
pub use mux::*;
use zenoh_protocol::network::{Declare, Push, Request, Response, ResponseFinal};

use super::routing::RoutingContext;

pub trait Primitives: Send + Sync {
fn send_declare(&self, msg: Declare);

Expand All @@ -32,15 +34,23 @@ pub trait Primitives: Send + Sync {
fn send_close(&self);
}

#[derive(Default)]
pub struct DummyPrimitives;
pub(crate) trait EPrimitives: Send + Sync {
fn send_declare(&self, ctx: RoutingContext<Declare>);

fn send_push(&self, msg: Push);

fn send_request(&self, ctx: RoutingContext<Request>);

impl DummyPrimitives {
pub fn new() -> Self {
Self
}
fn send_response(&self, ctx: RoutingContext<Response>);

fn send_response_final(&self, ctx: RoutingContext<ResponseFinal>);

fn send_close(&self);
}

#[derive(Default)]
pub struct DummyPrimitives;

impl Primitives for DummyPrimitives {
fn send_declare(&self, _msg: Declare) {}

Expand All @@ -54,3 +64,17 @@ impl Primitives for DummyPrimitives {

fn send_close(&self) {}
}

impl EPrimitives for DummyPrimitives {
fn send_declare(&self, _ctx: RoutingContext<Declare>) {}

fn send_push(&self, _msg: Push) {}

fn send_request(&self, _ctx: RoutingContext<Request>) {}

fn send_response(&self, _ctx: RoutingContext<Response>) {}

fn send_response_final(&self, _ctx: RoutingContext<ResponseFinal>) {}

fn send_close(&self) {}
}
Loading

0 comments on commit 5601669

Please sign in to comment.