Skip to content

Commit

Permalink
- break cyclic reference in Mux->Face->State->Mux
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Feb 7, 2024
1 parent ae0811e commit a84e419
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
16 changes: 8 additions & 8 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use super::{EPrimitives, Primitives};
use crate::net::routing::{
dispatcher::face::Face,
dispatcher::face::{Face, WeakFace},
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
};
Expand All @@ -25,7 +25,7 @@ use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};

pub struct Mux {
pub handler: TransportUnicast,
pub(crate) face: OnceLock<Face>,
pub(crate) face: OnceLock<WeakFace>,
pub(crate) interceptor: InterceptorsChain,
}

Expand All @@ -48,7 +48,7 @@ impl Primitives for Mux {
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
Expand All @@ -66,7 +66,7 @@ impl Primitives for Mux {
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
Expand All @@ -84,7 +84,7 @@ impl Primitives for Mux {
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
Expand All @@ -102,7 +102,7 @@ impl Primitives for Mux {
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
Expand All @@ -120,7 +120,7 @@ impl Primitives for Mux {
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
Expand Down Expand Up @@ -161,7 +161,7 @@ impl EPrimitives for Mux {
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
Expand Down
33 changes: 32 additions & 1 deletion zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::net::primitives::Primitives;
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::sync::{Arc, Weak};
use zenoh_protocol::zenoh::RequestBody;
use zenoh_protocol::{
core::{ExprId, WhatAmI, ZenohId},
Expand Down Expand Up @@ -108,12 +108,43 @@ impl fmt::Display for FaceState {
}
}

#[derive(Clone)]
pub struct WeakFace {
pub(crate) tables: Weak<TablesLock>,
pub(crate) state: Weak<FaceState>,
}

impl WeakFace {
pub fn new() -> WeakFace {
WeakFace {
tables: Weak::new(),
state: Weak::new()
}
}

pub fn upgrade(&self) -> Option<Face> {
Some(Face {
tables: self.tables.upgrade()?,
state: self.state.upgrade()?
})
}
}

#[derive(Clone)]
pub struct Face {
pub(crate) tables: Arc<TablesLock>,
pub(crate) state: Arc<FaceState>,
}

impl Face {
pub fn downgrade(&self) -> WeakFace {
WeakFace {
tables: Arc::downgrade(&self.tables),
state: Arc::downgrade(&self.state)
}
}
}

impl Primitives for Face {
fn send_declare(&self, msg: zenoh_protocol::network::Declare) {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Router {
state: newface,
};

let _ = mux.face.set(face.clone());
let _ = mux.face.set(Face::downgrade(&face));

ctrl_lock.new_transport_unicast_face(&mut tables, &self.tables, &mut face, &transport)?;

Expand Down

0 comments on commit a84e419

Please sign in to comment.