Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory leak fixes #625

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 20 additions & 20 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use std::convert::TryFrom;

Check failure on line 15 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused import: `std::convert::TryFrom`

Check failure on line 15 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused import: `std::convert::TryFrom`

Check failure on line 15 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused import: `std::convert::TryFrom`

Check failure on line 15 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused import: `std::convert::TryFrom`

Check failure on line 15 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused import: `std::convert::TryFrom`

Check failure on line 15 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused import: `std::convert::TryFrom`
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
Expand All @@ -23,31 +23,31 @@
// initiate logging
env_logger::init();

let (config, selector, value, target, timeout) = parse_args();

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `selector`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `value`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `target`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `timeout`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `selector`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `value`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `target`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `timeout`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `selector`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `value`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `target`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `timeout`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `selector`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `value`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `target`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `timeout`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `selector`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `value`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `target`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `timeout`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `selector`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `value`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `target`

Check failure on line 26 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `timeout`

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

Check failure on line 29 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `session`

Check failure on line 29 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on ubuntu-latest

unused variable: `session`

Check failure on line 29 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `session`

Check failure on line 29 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `session`

Check failure on line 29 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on windows-latest

unused variable: `session`

Check failure on line 29 in examples/examples/z_get.rs

View workflow job for this annotation

GitHub Actions / Run checks on macOS-latest

unused variable: `session`

println!("Sending Query '{selector}'...");
let replies = match value {
Some(value) => session.get(&selector).with_value(value),
None => session.get(&selector),
}
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!(
">> Received ('{}': '{}')",
sample.key_expr.as_str(),
sample.value,
),
Err(err) => println!(">> Received (ERROR: '{}')", String::try_from(&err).unwrap()),
}
}
// println!("Sending Query '{selector}'...");
// let replies = match value {
// Some(value) => session.get(&selector).with_value(value),
// None => session.get(&selector),
// }
// .target(target)
// .timeout(timeout)
// .res()
// .await
// .unwrap();
// while let Ok(reply) = replies.recv_async().await {
// match reply.sample {
// Ok(sample) => println!(
// ">> Received ('{}': '{}')",
// sample.key_expr.as_str(),
// sample.value,
// ),
// Err(err) => println!(">> Received (ERROR: '{}')", String::try_from(&err).unwrap()),
// }
// }
}

#[derive(clap::ValueEnum, Clone, Copy, Debug)]
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub trait LinkManagerUnicastTrait: Send + Sync {
fn get_locators(&self) -> Vec<Locator>;
}
pub type NewLinkChannelSender = flume::Sender<LinkUnicast>;
pub type WeakNewLinkChannelSender = flume::WeakSender<LinkUnicast>;
pub trait ConstructibleLinkManagerUnicast<T>: Sized {
fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult<Self>;
}
Expand Down
60 changes: 57 additions & 3 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use std::time::Duration;
use zenoh_config::{Config, LinkRxConf, QueueConf, QueueSizeConf};
use zenoh_crypto::{BlockCipher, PseudoRng};
use zenoh_link::NewLinkChannelSender;
use zenoh_link::{NewLinkChannelSender, WeakNewLinkChannelSender};
use zenoh_protocol::{
core::{EndPoint, Field, Locator, Priority, Resolution, WhatAmI, ZenohId},
transport::BatchSize,
Expand Down Expand Up @@ -105,6 +105,12 @@ pub struct TransportManagerConfig {
pub protocols: Vec<String>,
}

impl Drop for TransportManagerConfig {
fn drop(&mut self) {
panic!("TransportManagerConfig DROPPED!");
}
}

pub struct TransportManagerState {
pub unicast: TransportManagerStateUnicast,
pub multicast: TransportManagerStateMulticast,
Expand Down Expand Up @@ -362,7 +368,51 @@ pub struct TransportManager {
pub(crate) stats: Arc<crate::stats::TransportStats>,
}

struct WeakChannelTransportManager {
pub config: Arc<TransportManagerConfig>,
pub(crate) state: Arc<TransportManagerState>,
pub(crate) prng: Arc<AsyncMutex<PseudoRng>>,
pub(crate) cipher: Arc<BlockCipher>,
pub(crate) locator_inspector: zenoh_link::LocatorInspector,
pub(crate) new_unicast_link_sender: WeakNewLinkChannelSender,
pub(crate) tx_executor: TransportExecutor,
#[cfg(feature = "stats")]
pub(crate) stats: Arc<crate::stats::TransportStats>,
}

impl WeakChannelTransportManager {
fn upgrage(&self) -> Option<TransportManager> {
self.new_unicast_link_sender
.upgrade()
.map(|new_unicast_link_sender| TransportManager {
config: self.config.clone(),
state: self.state.clone(),
prng: self.prng.clone(),
cipher: self.cipher.clone(),
locator_inspector: self.locator_inspector.clone(),
new_unicast_link_sender,
tx_executor: self.tx_executor.clone(),
#[cfg(feature = "stats")]
stats: self.stats.clone(),
})
}
}

impl TransportManager {
fn downgrade(self) -> WeakChannelTransportManager {
WeakChannelTransportManager {
config: self.config,
state: self.state,
prng: self.prng,
cipher: self.cipher,
locator_inspector: self.locator_inspector,
new_unicast_link_sender: self.new_unicast_link_sender.downgrade(),
tx_executor: self.tx_executor,
#[cfg(feature = "stats")]
stats: self.stats,
}
}

pub fn new(params: TransportManagerParams, mut prng: PseudoRng) -> TransportManager {
// Initialize the Cipher
let mut key = [0_u8; BlockCipher::BLOCK_SIZE];
Expand All @@ -387,10 +437,14 @@ impl TransportManager {

// @TODO: this should be moved into the unicast module
async_std::task::spawn({
let this = this.clone();
let this = this.clone().downgrade();
async move {
while let Ok(link) = new_unicast_link_receiver.recv_async().await {
this.handle_new_link_unicast(link).await;
if let Some(tm) = this.upgrage() {
tm.handle_new_link_unicast(link).await;
} else {
break;
}
}
}
});
Expand Down
Loading
Loading