Skip to content

Commit

Permalink
thread leak investigation
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Dec 14, 2023
1 parent f3ec4ed commit 5a8df0f
Show file tree
Hide file tree
Showing 4 changed files with 807 additions and 3,038 deletions.
15 changes: 5 additions & 10 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,24 @@
//
use clap::Parser;
use log::trace;
use zenoh::prelude::r#async::AsyncResolve;
use zenoh::query::QueryTarget;
use zenoh::selector::Selector;
use std::convert::TryFrom;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::sync::SyncResolve;
use zenoh::query::QueryTarget;
use zenoh::selector::Selector;
use zenoh_examples::CommonArgs;

#[async_std::main]
async fn main() {
fn main() {
// initiate logging
env_logger::init();

let (config, selector, value, target, timeout) = parse_args();

let session = zenoh::open(config).res().await.unwrap();
let session = zenoh::open(config).res().unwrap();
drop(session);
trace!("Done");

async_std::task::sleep(Duration::from_secs(10)).await;
trace!("Sleep done");


// println!("Sending Query '{selector}'...");
// let replies = match value {
// Some(value) => session.get(&selector).with_value(value),
Expand Down
19 changes: 12 additions & 7 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,6 @@ pub struct TransportManagerConfig {
pub protocols: Vec<String>,
}

impl Drop for TransportManagerConfig {
fn drop(&mut self) {
panic!("TransportManagerConfig DROPPED!");
}
}

pub struct TransportManagerState {
pub unicast: TransportManagerStateUnicast,
pub multicast: TransportManagerStateMulticast,
Expand Down Expand Up @@ -328,22 +322,33 @@ pub(crate) struct TransportExecutor {
sender: async_std::channel::Sender<()>,
}

// log::warn!("TransportExecutor thread {} stopped", i);
// exec.run(async move || {recv.recv().await})
impl TransportExecutor {
fn new(num_threads: usize) -> Self {
let (sender, receiver) = async_std::channel::bounded(1);
let executor = Arc::new(async_executor::Executor::new());
for i in 0..num_threads {
log::warn!("Starting TransportExecutor thread {}", i);
let exec = executor.clone();
let recv = receiver.clone();
std::thread::Builder::new()
.name(format!("zenoh-tx-{}", i))
.spawn(move || async_std::task::block_on(exec.run(recv.recv())))
.spawn(move || {
async_std::task::block_on(exec.run(async move {

Check warning on line 338 in io/zenoh-transport/src/manager.rs

View workflow job for this annotation

GitHub Actions / Run tests on ubuntu-latest

unused `Result` that must be used

Check warning on line 338 in io/zenoh-transport/src/manager.rs

View workflow job for this annotation

GitHub Actions / Run tests on macOS-latest

unused `Result` that must be used

Check warning on line 338 in io/zenoh-transport/src/manager.rs

View workflow job for this annotation

GitHub Actions / Run tests on windows-latest

unused `Result` that must be used
let r = recv.recv().await;
log::warn!("TransportExecutor thread {} stopped", i);
r
}));
log::warn!("TransportExecutor thread {} finally stopped", i);
})
.unwrap();
}
Self { executor, sender }
}

async fn stop(&self) {
log::error!("Stopping TransportExecutor");
let _ = self.sender.send(()).await;
}

Expand Down
Loading

0 comments on commit 5a8df0f

Please sign in to comment.