Skip to content

Commit

Permalink
sender link downgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Dec 14, 2023
1 parent e0abf9d commit e92af28
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
1 change: 1 addition & 0 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub trait LinkManagerUnicastTrait: Send + Sync {
fn get_locators(&self) -> Vec<Locator>;
}
pub type NewLinkChannelSender = flume::Sender<LinkUnicast>;
pub type WeakNewLinkChannelSender = flume::WeakSender<LinkUnicast>;
pub trait ConstructibleLinkManagerUnicast<T>: Sized {
fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult<Self>;
}
Expand Down
60 changes: 57 additions & 3 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use std::time::Duration;
use zenoh_config::{Config, LinkRxConf, QueueConf, QueueSizeConf};
use zenoh_crypto::{BlockCipher, PseudoRng};
use zenoh_link::NewLinkChannelSender;
use zenoh_link::{NewLinkChannelSender, WeakNewLinkChannelSender};
use zenoh_protocol::{
core::{EndPoint, Field, Locator, Priority, Resolution, WhatAmI, ZenohId},
transport::BatchSize,
Expand Down Expand Up @@ -105,6 +105,12 @@ pub struct TransportManagerConfig {
pub protocols: Vec<String>,
}

impl Drop for TransportManagerConfig {
fn drop(&mut self) {
panic!("TransportManagerConfig DROPPED!");
}
}

pub struct TransportManagerState {
pub unicast: TransportManagerStateUnicast,
pub multicast: TransportManagerStateMulticast,
Expand Down Expand Up @@ -362,7 +368,51 @@ pub struct TransportManager {
pub(crate) stats: Arc<crate::stats::TransportStats>,
}

struct WeakChannelTransportManager {
pub config: Arc<TransportManagerConfig>,
pub(crate) state: Arc<TransportManagerState>,
pub(crate) prng: Arc<AsyncMutex<PseudoRng>>,
pub(crate) cipher: Arc<BlockCipher>,
pub(crate) locator_inspector: zenoh_link::LocatorInspector,
pub(crate) new_unicast_link_sender: WeakNewLinkChannelSender,
pub(crate) tx_executor: TransportExecutor,
#[cfg(feature = "stats")]
pub(crate) stats: Arc<crate::stats::TransportStats>,
}

impl WeakChannelTransportManager {
fn upgrage(&self) -> Option<TransportManager> {
self.new_unicast_link_sender
.upgrade()
.map(|new_unicast_link_sender| TransportManager {
config: self.config.clone(),
state: self.state.clone(),
prng: self.prng.clone(),
cipher: self.cipher.clone(),
locator_inspector: self.locator_inspector.clone(),
new_unicast_link_sender,
tx_executor: self.tx_executor.clone(),
#[cfg(feature = "stats")]
stats: self.stats.clone(),
})
}
}

impl TransportManager {
fn downgrade(self) -> WeakChannelTransportManager {
WeakChannelTransportManager {
config: self.config,
state: self.state,
prng: self.prng,
cipher: self.cipher,
locator_inspector: self.locator_inspector,
new_unicast_link_sender: self.new_unicast_link_sender.downgrade(),
tx_executor: self.tx_executor,
#[cfg(feature = "stats")]
stats: self.stats,
}
}

pub fn new(params: TransportManagerParams, mut prng: PseudoRng) -> TransportManager {
// Initialize the Cipher
let mut key = [0_u8; BlockCipher::BLOCK_SIZE];
Expand All @@ -387,10 +437,14 @@ impl TransportManager {

// @TODO: this should be moved into the unicast module
async_std::task::spawn({
let this = this.clone();
let this = this.clone().downgrade();
async move {
while let Ok(link) = new_unicast_link_receiver.recv_async().await {
this.handle_new_link_unicast(link).await;
if let Some(tm) = this.upgrage() {
tm.handle_new_link_unicast(link).await;
} else {
break;
}
}
}
});
Expand Down

0 comments on commit e92af28

Please sign in to comment.