From e0dbcf561f6d3b379476237cb532f2b5e21d1126 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Tue, 10 Oct 2023 08:44:04 +0000 Subject: [PATCH] RuntimeState private --- zenoh/src/net/runtime/adminspace.rs | 30 ++++++++++---------- zenoh/src/net/runtime/mod.rs | 41 +++++++++++---------------- zenoh/src/net/runtime/orchestrator.rs | 24 +++++++++------- 3 files changed, 45 insertions(+), 50 deletions(-) diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 0eb099a098..92ab103bb6 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -66,8 +66,8 @@ enum PluginDiff { impl AdminSpace { pub async fn start(runtime: &Runtime, plugins_mgr: plugins::PluginsManager, version: String) { - let zid_str = runtime.zid.to_string(); - let metadata = runtime.metadata.clone(); + let zid_str = runtime.state.zid.to_string(); + let metadata = runtime.state.metadata.clone(); let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap(); let mut handlers: HashMap<_, Handler> = HashMap::new(); @@ -121,14 +121,14 @@ impl AdminSpace { metadata, }); let admin = Arc::new(AdminSpace { - zid: runtime.zid, + zid: runtime.zid(), primitives: Mutex::new(None), mappings: Mutex::new(HashMap::new()), handlers, context, }); - let cfg_rx = admin.context.runtime.config.subscribe(); + let cfg_rx = admin.context.runtime.state.config.subscribe(); task::spawn({ let admin = admin.clone(); async move { @@ -139,7 +139,7 @@ impl AdminSpace { } let requested_plugins = { - let cfg_guard = admin.context.runtime.config.lock(); + let cfg_guard = admin.context.runtime.state.config.lock(); cfg_guard.plugins().load_requests().collect::>() }; let mut diffs = Vec::new(); @@ -195,7 +195,7 @@ impl AdminSpace { Ok(Some((path, plugin))) => { active_plugins.insert(name.into(), path.into()); let mut cfg_guard = - admin.context.runtime.config.lock(); + admin.context.runtime.state.config.lock(); cfg_guard.add_plugin_validator( name, plugin.config_checker(), @@ -221,7 +221,7 @@ impl AdminSpace { } }); - let primitives = runtime.router.new_primitives(admin.clone()); + let primitives = runtime.state.router.new_primitives(admin.clone()); zlock!(admin.primitives).replace(primitives.clone()); primitives.send_declare(Declare { @@ -283,7 +283,7 @@ impl Primitives for AdminSpace { fn send_push(&self, msg: Push) { trace!("recv Push {:?}", msg); { - let conf = self.context.runtime.config.lock(); + let conf = self.context.runtime.state.config.lock(); if !conf.adminspace.permissions().write { log::error!( "Received PUT on '{}' but adminspace.permissions.write=false in configuration", @@ -307,7 +307,7 @@ impl Primitives for AdminSpace { key, json ); - if let Err(e) = (&self.context.runtime.config).insert_json5(key, json) { + if let Err(e) = (&self.context.runtime.state.config).insert_json5(key, json) { error!( "Error inserting conf value /@/router/{}/config/{} : {} - {}", &self.context.zid_str, key, json, e @@ -325,7 +325,7 @@ impl Primitives for AdminSpace { &self.context.zid_str, key ); - if let Err(e) = self.context.runtime.config.remove(key) { + if let Err(e) = self.context.runtime.state.config.remove(key) { log::error!("Error deleting conf value {} : {}", msg.wire_expr, e) } } @@ -338,7 +338,7 @@ impl Primitives for AdminSpace { if let RequestBody::Query(query) = msg.payload { let primitives = zlock!(self.primitives).as_ref().unwrap().clone(); { - let conf = self.context.runtime.config.lock(); + let conf = self.context.runtime.state.config.lock(); if !conf.adminspace.permissions().read { log::error!( "Received GET on '{}' but adminspace.permissions.read=false in configuration", @@ -528,7 +528,7 @@ fn routers_linkstate_data(context: &AdminContext, query: Query) { .try_into() .unwrap(); - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); if let Err(e) = query .reply(Ok(Sample::new( @@ -555,7 +555,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) { .try_into() .unwrap(); - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); if let Err(e) = query .reply(Ok(Sample::new( @@ -578,7 +578,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) { } fn subscribers_data(context: &AdminContext, query: Query) { - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); for sub in tables.router_subs.iter() { let key = KeyExpr::try_from(format!( "@/router/{}/subscriber/{}", @@ -595,7 +595,7 @@ fn subscribers_data(context: &AdminContext, query: Query) { } fn queryables_data(context: &AdminContext, query: Query) { - let tables = zread!(context.runtime.router.tables.tables); + let tables = zread!(context.runtime.state.router.tables.tables); for qabl in tables.router_qabls.iter() { let key = KeyExpr::try_from(format!( "@/router/{}/queryable/{}", diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index c776e229d4..fbfe24aadc 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -46,7 +46,7 @@ use zenoh_transport::{ TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, TransportUnicast, }; -pub struct RuntimeState { +struct RuntimeState { zid: ZenohId, whatami: WhatAmI, metadata: serde_json::Value, @@ -64,14 +64,6 @@ pub struct Runtime { state: Arc, } -impl std::ops::Deref for Runtime { - type Target = RuntimeState; - - fn deref(&self) -> &RuntimeState { - self.state.deref() - } -} - impl Runtime { pub async fn new(config: Config) -> ZResult { let mut runtime = Runtime::init(config).await?; @@ -150,7 +142,7 @@ impl Runtime { }), }; *handler.runtime.write().unwrap() = Some(runtime.clone()); - get_mut_unchecked(&mut runtime.router.clone()).init_link_state( + get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state( runtime.clone(), router_link_state, peer_link_state, @@ -180,7 +172,7 @@ impl Runtime { #[inline(always)] pub fn manager(&self) -> &TransportManager { - &self.manager + &self.state.manager } pub fn new_handler(&self, handler: Arc) { @@ -189,17 +181,17 @@ impl Runtime { pub async fn close(&self) -> ZResult<()> { log::trace!("Runtime::close())"); - drop(self.stop_source.write().unwrap().take()); + drop(self.state.stop_source.write().unwrap().take()); self.manager().close().await; Ok(()) } pub fn new_timestamp(&self) -> Option { - self.hlc.as_ref().map(|hlc| hlc.new_timestamp()) + self.state.hlc.as_ref().map(|hlc| hlc.new_timestamp()) } pub fn get_locators(&self) -> Vec { - self.locators.read().unwrap().clone() + self.state.locators.read().unwrap().clone() } pub(crate) fn spawn(&self, future: F) -> Option>> @@ -207,7 +199,7 @@ impl Runtime { F: Future + Send + 'static, T: Send + 'static, { - self.stop_source + self.state.stop_source .read() .unwrap() .as_ref() @@ -215,23 +207,23 @@ impl Runtime { } pub(crate) fn router(&self) -> Arc { - self.router.clone() + self.state.router.clone() } pub fn config(&self) -> &Notifier { - &self.config + &self.state.config } pub fn hlc(&self) -> Option<&HLC> { - self.hlc.as_ref().map(Arc::as_ref) + self.state.hlc.as_ref().map(Arc::as_ref) } pub fn zid(&self) -> ZenohId { - self.zid + self.state.zid } pub fn whatami(&self) -> WhatAmI { - self.whatami + self.state.whatami } } @@ -248,7 +240,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { match zread!(self.runtime).as_ref() { Some(runtime) => { let slave_handlers: Vec> = - zread!(runtime.transport_handlers) + zread!(runtime.state.transport_handlers) .iter() .filter_map(|handler| { handler.new_unicast(peer.clone(), transport.clone()).ok() @@ -257,7 +249,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { Ok(Arc::new(RuntimeSession { runtime: runtime.clone(), endpoint: std::sync::RwLock::new(None), - main_handler: runtime.router.new_transport_unicast(transport).unwrap(), + main_handler: runtime.state.router.new_transport_unicast(transport).unwrap(), slave_handlers, })) } @@ -272,11 +264,11 @@ impl TransportEventHandler for RuntimeTransportEventHandler { match zread!(self.runtime).as_ref() { Some(runtime) => { let slave_handlers: Vec> = - zread!(runtime.transport_handlers) + zread!(runtime.state.transport_handlers) .iter() .filter_map(|handler| handler.new_multicast(transport.clone()).ok()) .collect(); - runtime.router.new_transport_multicast(transport.clone())?; + runtime.state.router.new_transport_multicast(transport.clone())?; Ok(Arc::new(RuntimeMuticastGroup { runtime: runtime.clone(), transport, @@ -365,6 +357,7 @@ impl TransportMulticastEventHandler for RuntimeMuticastGroup { Ok(Arc::new(RuntimeMuticastSession { main_handler: self .runtime + .state .router .new_peer_multicast(self.transport.clone(), peer)?, slave_handlers, diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index ccd2e68f6a..d0b420306d 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -47,7 +47,7 @@ pub enum Loop { impl Runtime { pub(crate) async fn start(&mut self) -> ZResult<()> { - match self.whatami { + match self.whatami() { WhatAmI::Client => self.start_client().await, WhatAmI::Peer => self.start_peer().await, WhatAmI::Router => self.start_router().await, @@ -56,7 +56,7 @@ impl Runtime { async fn start_client(&self) -> ZResult<()> { let (peers, scouting, addr, ifaces, timeout) = { - let guard = self.config.lock(); + let guard = self.state.config.lock(); ( guard.connect().endpoints().clone(), unwrap_or_default!(guard.scouting().multicast().enabled()), @@ -110,12 +110,13 @@ impl Runtime { async fn start_peer(&self) -> ZResult<()> { let (listeners, peers, scouting, listen, autoconnect, addr, ifaces, delay) = { - let guard = &self.config.lock(); + let guard = &self.state.config.lock(); let listeners = if guard.listen().endpoints().is_empty() { let endpoint: EndPoint = PEER_DEFAULT_LISTENER.parse().unwrap(); let protocol = endpoint.protocol(); let mut listeners = vec![]; if self + .state .manager .config .protocols @@ -155,12 +156,13 @@ impl Runtime { async fn start_router(&self) -> ZResult<()> { let (listeners, peers, scouting, listen, autoconnect, addr, ifaces) = { - let guard = self.config.lock(); + let guard = self.state.config.lock(); let listeners = if guard.listen().endpoints().is_empty() { let endpoint: EndPoint = ROUTER_DEFAULT_LISTENER.parse().unwrap(); let protocol = endpoint.protocol(); let mut listeners = vec![]; if self + .state .manager .config .protocols @@ -241,10 +243,10 @@ impl Runtime { } pub(crate) async fn update_peers(&self) -> ZResult<()> { - let peers = { self.config.lock().connect().endpoints().clone() }; + let peers = { self.state.config.lock().connect().endpoints().clone() }; let tranports = self.manager().get_transports_unicast().await; - if self.whatami == WhatAmI::Client { + if self.state.whatami == WhatAmI::Client { for transport in tranports { let should_close = if let Ok(Some(orch_transport)) = transport.get_callback() { if let Some(orch_transport) = orch_transport @@ -301,7 +303,7 @@ impl Runtime { } } - let mut locators = self.locators.write().unwrap(); + let mut locators = self.state.locators.write().unwrap(); *locators = self.manager().get_locators(); for locator in &*locators { log::info!("Zenoh can be reached at: {}", locator); @@ -771,7 +773,7 @@ impl Runtime { if let Ok(msg) = res { log::trace!("Received {:?} from {}", msg.body, peer); if let ScoutingBody::Scout(Scout { what, .. }) = &msg.body { - if what.matches(self.whatami) { + if what.matches(self.whatami()) { let mut wbuf = vec![]; let mut writer = wbuf.writer(); let codec = Zenoh080::new(); @@ -779,7 +781,7 @@ impl Runtime { let zid = self.manager().zid(); let hello: ScoutingMessage = Hello { version: zenoh_protocol::VERSION, - whatami: self.whatami, + whatami: self.whatami(), zid, locators: self.get_locators(), } @@ -811,7 +813,7 @@ impl Runtime { } pub(super) fn closing_session(session: &RuntimeSession) { - match session.runtime.whatami { + match session.runtime.whatami() { WhatAmI::Client => { let runtime = session.runtime.clone(); session.runtime.spawn(async move { @@ -827,7 +829,7 @@ impl Runtime { } _ => { if let Some(endpoint) = &*zread!(session.endpoint) { - let peers = { session.runtime.config.lock().connect().endpoints().clone() }; + let peers = { session.runtime.state.config.lock().connect().endpoints().clone() }; if peers.contains(endpoint) { let endpoint = endpoint.clone(); let runtime = session.runtime.clone();