Skip to content

Commit

Permalink
RuntimeState private
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 10, 2023
1 parent 0164dd8 commit e0dbcf5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 50 deletions.
30 changes: 15 additions & 15 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ enum PluginDiff {

impl AdminSpace {
pub async fn start(runtime: &Runtime, plugins_mgr: plugins::PluginsManager, version: String) {
let zid_str = runtime.zid.to_string();
let metadata = runtime.metadata.clone();
let zid_str = runtime.state.zid.to_string();
let metadata = runtime.state.metadata.clone();
let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap();

let mut handlers: HashMap<_, Handler> = HashMap::new();
Expand Down Expand Up @@ -121,14 +121,14 @@ impl AdminSpace {
metadata,
});
let admin = Arc::new(AdminSpace {
zid: runtime.zid,
zid: runtime.zid(),
primitives: Mutex::new(None),
mappings: Mutex::new(HashMap::new()),
handlers,
context,
});

let cfg_rx = admin.context.runtime.config.subscribe();
let cfg_rx = admin.context.runtime.state.config.subscribe();
task::spawn({
let admin = admin.clone();
async move {
Expand All @@ -139,7 +139,7 @@ impl AdminSpace {
}

let requested_plugins = {
let cfg_guard = admin.context.runtime.config.lock();
let cfg_guard = admin.context.runtime.state.config.lock();
cfg_guard.plugins().load_requests().collect::<Vec<_>>()
};
let mut diffs = Vec::new();
Expand Down Expand Up @@ -195,7 +195,7 @@ impl AdminSpace {
Ok(Some((path, plugin))) => {
active_plugins.insert(name.into(), path.into());
let mut cfg_guard =
admin.context.runtime.config.lock();
admin.context.runtime.state.config.lock();
cfg_guard.add_plugin_validator(
name,
plugin.config_checker(),
Expand All @@ -221,7 +221,7 @@ impl AdminSpace {
}
});

let primitives = runtime.router.new_primitives(admin.clone());
let primitives = runtime.state.router.new_primitives(admin.clone());
zlock!(admin.primitives).replace(primitives.clone());

primitives.send_declare(Declare {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Primitives for AdminSpace {
fn send_push(&self, msg: Push) {
trace!("recv Push {:?}", msg);
{
let conf = self.context.runtime.config.lock();
let conf = self.context.runtime.state.config.lock();
if !conf.adminspace.permissions().write {
log::error!(
"Received PUT on '{}' but adminspace.permissions.write=false in configuration",
Expand All @@ -307,7 +307,7 @@ impl Primitives for AdminSpace {
key,
json
);
if let Err(e) = (&self.context.runtime.config).insert_json5(key, json) {
if let Err(e) = (&self.context.runtime.state.config).insert_json5(key, json) {
error!(
"Error inserting conf value /@/router/{}/config/{} : {} - {}",
&self.context.zid_str, key, json, e
Expand All @@ -325,7 +325,7 @@ impl Primitives for AdminSpace {
&self.context.zid_str,
key
);
if let Err(e) = self.context.runtime.config.remove(key) {
if let Err(e) = self.context.runtime.state.config.remove(key) {
log::error!("Error deleting conf value {} : {}", msg.wire_expr, e)
}
}
Expand All @@ -338,7 +338,7 @@ impl Primitives for AdminSpace {
if let RequestBody::Query(query) = msg.payload {
let primitives = zlock!(self.primitives).as_ref().unwrap().clone();
{
let conf = self.context.runtime.config.lock();
let conf = self.context.runtime.state.config.lock();
if !conf.adminspace.permissions().read {
log::error!(
"Received GET on '{}' but adminspace.permissions.read=false in configuration",
Expand Down Expand Up @@ -528,7 +528,7 @@ fn routers_linkstate_data(context: &AdminContext, query: Query) {
.try_into()
.unwrap();

let tables = zread!(context.runtime.router.tables.tables);
let tables = zread!(context.runtime.state.router.tables.tables);

if let Err(e) = query
.reply(Ok(Sample::new(
Expand All @@ -555,7 +555,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) {
.try_into()
.unwrap();

let tables = zread!(context.runtime.router.tables.tables);
let tables = zread!(context.runtime.state.router.tables.tables);

if let Err(e) = query
.reply(Ok(Sample::new(
Expand All @@ -578,7 +578,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) {
}

fn subscribers_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.router.tables.tables);
let tables = zread!(context.runtime.state.router.tables.tables);
for sub in tables.router_subs.iter() {
let key = KeyExpr::try_from(format!(
"@/router/{}/subscriber/{}",
Expand All @@ -595,7 +595,7 @@ fn subscribers_data(context: &AdminContext, query: Query) {
}

fn queryables_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.router.tables.tables);
let tables = zread!(context.runtime.state.router.tables.tables);
for qabl in tables.router_qabls.iter() {
let key = KeyExpr::try_from(format!(
"@/router/{}/queryable/{}",
Expand Down
41 changes: 17 additions & 24 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use zenoh_transport::{
TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, TransportUnicast,
};

pub struct RuntimeState {
struct RuntimeState {
zid: ZenohId,
whatami: WhatAmI,
metadata: serde_json::Value,
Expand All @@ -64,14 +64,6 @@ pub struct Runtime {
state: Arc<RuntimeState>,
}

impl std::ops::Deref for Runtime {
type Target = RuntimeState;

fn deref(&self) -> &RuntimeState {
self.state.deref()
}
}

impl Runtime {
pub async fn new(config: Config) -> ZResult<Runtime> {
let mut runtime = Runtime::init(config).await?;
Expand Down Expand Up @@ -150,7 +142,7 @@ impl Runtime {
}),
};
*handler.runtime.write().unwrap() = Some(runtime.clone());
get_mut_unchecked(&mut runtime.router.clone()).init_link_state(
get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state(
runtime.clone(),
router_link_state,
peer_link_state,
Expand Down Expand Up @@ -180,7 +172,7 @@ impl Runtime {

#[inline(always)]
pub fn manager(&self) -> &TransportManager {
&self.manager
&self.state.manager
}

pub fn new_handler(&self, handler: Arc<dyn TransportEventHandler>) {
Expand All @@ -189,49 +181,49 @@ impl Runtime {

pub async fn close(&self) -> ZResult<()> {
log::trace!("Runtime::close())");
drop(self.stop_source.write().unwrap().take());
drop(self.state.stop_source.write().unwrap().take());
self.manager().close().await;
Ok(())
}

pub fn new_timestamp(&self) -> Option<uhlc::Timestamp> {
self.hlc.as_ref().map(|hlc| hlc.new_timestamp())
self.state.hlc.as_ref().map(|hlc| hlc.new_timestamp())
}

pub fn get_locators(&self) -> Vec<Locator> {
self.locators.read().unwrap().clone()
self.state.locators.read().unwrap().clone()
}

pub(crate) fn spawn<F, T>(&self, future: F) -> Option<JoinHandle<Result<T, TimedOutError>>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self.stop_source
self.state.stop_source
.read()
.unwrap()
.as_ref()
.map(|source| async_std::task::spawn(future.timeout_at(source.token())))
}

pub(crate) fn router(&self) -> Arc<Router> {
self.router.clone()
self.state.router.clone()
}

pub fn config(&self) -> &Notifier<Config> {
&self.config
&self.state.config
}

pub fn hlc(&self) -> Option<&HLC> {
self.hlc.as_ref().map(Arc::as_ref)
self.state.hlc.as_ref().map(Arc::as_ref)
}

pub fn zid(&self) -> ZenohId {
self.zid
self.state.zid
}

pub fn whatami(&self) -> WhatAmI {
self.whatami
self.state.whatami
}
}

Expand All @@ -248,7 +240,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler {
match zread!(self.runtime).as_ref() {
Some(runtime) => {
let slave_handlers: Vec<Arc<dyn TransportPeerEventHandler>> =
zread!(runtime.transport_handlers)
zread!(runtime.state.transport_handlers)
.iter()
.filter_map(|handler| {
handler.new_unicast(peer.clone(), transport.clone()).ok()
Expand All @@ -257,7 +249,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler {
Ok(Arc::new(RuntimeSession {
runtime: runtime.clone(),
endpoint: std::sync::RwLock::new(None),
main_handler: runtime.router.new_transport_unicast(transport).unwrap(),
main_handler: runtime.state.router.new_transport_unicast(transport).unwrap(),
slave_handlers,
}))
}
Expand All @@ -272,11 +264,11 @@ impl TransportEventHandler for RuntimeTransportEventHandler {
match zread!(self.runtime).as_ref() {
Some(runtime) => {
let slave_handlers: Vec<Arc<dyn TransportMulticastEventHandler>> =
zread!(runtime.transport_handlers)
zread!(runtime.state.transport_handlers)
.iter()
.filter_map(|handler| handler.new_multicast(transport.clone()).ok())
.collect();
runtime.router.new_transport_multicast(transport.clone())?;
runtime.state.router.new_transport_multicast(transport.clone())?;
Ok(Arc::new(RuntimeMuticastGroup {
runtime: runtime.clone(),
transport,
Expand Down Expand Up @@ -365,6 +357,7 @@ impl TransportMulticastEventHandler for RuntimeMuticastGroup {
Ok(Arc::new(RuntimeMuticastSession {
main_handler: self
.runtime
.state
.router
.new_peer_multicast(self.transport.clone(), peer)?,
slave_handlers,
Expand Down
24 changes: 13 additions & 11 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub enum Loop {

impl Runtime {
pub(crate) async fn start(&mut self) -> ZResult<()> {
match self.whatami {
match self.whatami() {
WhatAmI::Client => self.start_client().await,
WhatAmI::Peer => self.start_peer().await,
WhatAmI::Router => self.start_router().await,
Expand All @@ -56,7 +56,7 @@ impl Runtime {

async fn start_client(&self) -> ZResult<()> {
let (peers, scouting, addr, ifaces, timeout) = {
let guard = self.config.lock();
let guard = self.state.config.lock();
(
guard.connect().endpoints().clone(),
unwrap_or_default!(guard.scouting().multicast().enabled()),
Expand Down Expand Up @@ -110,12 +110,13 @@ impl Runtime {

async fn start_peer(&self) -> ZResult<()> {
let (listeners, peers, scouting, listen, autoconnect, addr, ifaces, delay) = {
let guard = &self.config.lock();
let guard = &self.state.config.lock();
let listeners = if guard.listen().endpoints().is_empty() {
let endpoint: EndPoint = PEER_DEFAULT_LISTENER.parse().unwrap();
let protocol = endpoint.protocol();
let mut listeners = vec![];
if self
.state
.manager
.config
.protocols
Expand Down Expand Up @@ -155,12 +156,13 @@ impl Runtime {

async fn start_router(&self) -> ZResult<()> {
let (listeners, peers, scouting, listen, autoconnect, addr, ifaces) = {
let guard = self.config.lock();
let guard = self.state.config.lock();
let listeners = if guard.listen().endpoints().is_empty() {
let endpoint: EndPoint = ROUTER_DEFAULT_LISTENER.parse().unwrap();
let protocol = endpoint.protocol();
let mut listeners = vec![];
if self
.state
.manager
.config
.protocols
Expand Down Expand Up @@ -241,10 +243,10 @@ impl Runtime {
}

pub(crate) async fn update_peers(&self) -> ZResult<()> {
let peers = { self.config.lock().connect().endpoints().clone() };
let peers = { self.state.config.lock().connect().endpoints().clone() };
let tranports = self.manager().get_transports_unicast().await;

if self.whatami == WhatAmI::Client {
if self.state.whatami == WhatAmI::Client {
for transport in tranports {
let should_close = if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
Expand Down Expand Up @@ -301,7 +303,7 @@ impl Runtime {
}
}

let mut locators = self.locators.write().unwrap();
let mut locators = self.state.locators.write().unwrap();
*locators = self.manager().get_locators();
for locator in &*locators {
log::info!("Zenoh can be reached at: {}", locator);
Expand Down Expand Up @@ -771,15 +773,15 @@ impl Runtime {
if let Ok(msg) = res {
log::trace!("Received {:?} from {}", msg.body, peer);
if let ScoutingBody::Scout(Scout { what, .. }) = &msg.body {
if what.matches(self.whatami) {
if what.matches(self.whatami()) {
let mut wbuf = vec![];
let mut writer = wbuf.writer();
let codec = Zenoh080::new();

let zid = self.manager().zid();
let hello: ScoutingMessage = Hello {
version: zenoh_protocol::VERSION,
whatami: self.whatami,
whatami: self.whatami(),
zid,
locators: self.get_locators(),
}
Expand Down Expand Up @@ -811,7 +813,7 @@ impl Runtime {
}

pub(super) fn closing_session(session: &RuntimeSession) {
match session.runtime.whatami {
match session.runtime.whatami() {
WhatAmI::Client => {
let runtime = session.runtime.clone();
session.runtime.spawn(async move {
Expand All @@ -827,7 +829,7 @@ impl Runtime {
}
_ => {
if let Some(endpoint) = &*zread!(session.endpoint) {
let peers = { session.runtime.config.lock().connect().endpoints().clone() };
let peers = { session.runtime.state.config.lock().connect().endpoints().clone() };
if peers.contains(endpoint) {
let endpoint = endpoint.clone();
let runtime = session.runtime.clone();
Expand Down

0 comments on commit e0dbcf5

Please sign in to comment.