Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leaks #708

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ members = [
"commons/zenoh-result",
"commons/zenoh-shm",
"commons/zenoh-sync",
"commons/zenoh-task",
"commons/zenoh-util",
"examples",
"io/zenoh-link",
Expand Down Expand Up @@ -172,6 +173,7 @@ zenoh-util = { version = "0.11.0-dev", path = "commons/zenoh-util" }
zenoh-crypto = { version = "0.11.0-dev", path = "commons/zenoh-crypto" }
zenoh-codec = { version = "0.11.0-dev", path = "commons/zenoh-codec" }
zenoh-sync = { version = "0.11.0-dev", path = "commons/zenoh-sync" }
zenoh-task = { version = "0.11.0-dev", path = "commons/zenoh-task" }
zenoh-collections = { version = "0.11.0-dev", path = "commons/zenoh-collections", default-features = false }
zenoh-macros = { version = "0.11.0-dev", path = "commons/zenoh-macros" }
zenoh-plugin-trait = { version = "0.11.0-dev", path = "plugins/zenoh-plugin-trait", default-features = false }
Expand Down
36 changes: 36 additions & 0 deletions commons/zenoh-task/Cargo.toml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An attempt of finer grain executor and task management is being proposed in #566 .
Moreover, every time a new crate is added it should be also added in the right order for publication in the CI.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright (c) 2023 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <[email protected]>
#
[package]
rust-version = { workspace = true }
name = "zenoh-task"
version = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
authors = [
"Denis <[email protected]>"
]
edition = { workspace = true }
license = { workspace = true }
categories = { workspace = true }
description = "Internal crate for zenoh."
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { workspace = true, features = ["default", "unstable"] }
tokio = { workspace = true, features = ["default", "sync"] }
futures = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
async-std = { workspace = true, features = ["default", "unstable", "attributes"] }
8 changes: 8 additions & 0 deletions commons/zenoh-task/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# ⚠️ WARNING ⚠️

This crate is intended for Zenoh's internal use.

- [Click here for Zenoh's main repository](https://github.com/eclipse-zenoh/zenoh)
- [Click here for Zenoh's documentation](https://zenoh.io)


88 changes: 88 additions & 0 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

//! ⚠️ WARNING ⚠️
//!
//! This module is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::future::Future;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::task::{self, JoinHandle};
use uuid::Uuid;

#[derive(Clone)]
pub struct TaskController {
running_task_id_to_handle: Arc<Mutex<HashMap<Uuid, Option<JoinHandle<()>>>>>,
}

impl TaskController {
pub fn new() -> TaskController {
TaskController {
running_task_id_to_handle: Arc::new(Mutex::new(
HashMap::<Uuid, Option<JoinHandle<()>>>::new(),
)),
}
}

/// Spawns a task (similarly to task::spawn) that can be later terminated by call to terminate_all()
/// Task output is ignored
pub fn spawn<F, T>(&self, future: F)
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let mut tasks = self.running_task_id_to_handle.lock().unwrap();
let id = TaskController::get_next_task_id(tasks.deref_mut());
let tasks_mutex = self.running_task_id_to_handle.clone();
let jh = task::spawn(futures::FutureExt::map(future, move |_| {
tasks_mutex.lock().unwrap().remove(&id);
()
}));
tasks.insert(id, Some(jh));
}

fn get_next_task_id(hm: &mut HashMap<Uuid, Option<JoinHandle<()>>>) -> Uuid {
loop {
let uuid = Uuid::new_v4();
match hm.entry(uuid.clone()) {
Entry::Occupied(_) => {
continue;
}
Entry::Vacant(v) => {
v.insert(None);
return uuid;
}
}
}
}

/// Terminates all prevously spawned tasks
pub fn terminate_all(&self) {
let tasks: Vec<(Uuid, Option<JoinHandle<()>>)> = self
.running_task_id_to_handle
.lock()
.unwrap()
.drain()
.collect();
for (_id, jh) in tasks {
let _ = jh.unwrap().abort();
}
}
}
1 change: 1 addition & 0 deletions io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-shm = { workspace = true, optional = true }
zenoh-sync = { workspace = true }
zenoh-task = { workspace = true }
zenoh-util = { workspace = true }

[dev-dependencies]
Expand Down
24 changes: 20 additions & 4 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use crate::multicast::manager::{
TransportManagerBuilderMulticast, TransportManagerConfigMulticast,
TransportManagerStateMulticast,
};
use async_std::channel::RecvError;
use async_std::{sync::Mutex as AsyncMutex, task};
use rand::{RngCore, SeedableRng};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use zenoh_config::{Config, LinkRxConf, QueueConf, QueueSizeConf};
use zenoh_crypto::{BlockCipher, PseudoRng};
Expand All @@ -33,6 +34,7 @@ use zenoh_protocol::{
VERSION,
};
use zenoh_result::{bail, ZResult};
use zenoh_task::TaskController;

/// # Examples
/// ```
Expand Down Expand Up @@ -320,25 +322,36 @@ impl Default for TransportManagerBuilder {
pub(crate) struct TransportExecutor {
executor: Arc<async_executor::Executor<'static>>,
sender: async_std::channel::Sender<()>,
exec_handles: Arc<Mutex<Vec<std::thread::JoinHandle<Result<(), RecvError>>>>>,
}

impl TransportExecutor {
fn new(num_threads: usize) -> Self {
let (sender, receiver) = async_std::channel::bounded(1);
let executor = Arc::new(async_executor::Executor::new());
let mut exec_handles = Vec::new();
for i in 0..num_threads {
let exec = executor.clone();
let recv = receiver.clone();
std::thread::Builder::new()
let h = std::thread::Builder::new()
.name(format!("zenoh-tx-{}", i))
.spawn(move || async_std::task::block_on(exec.run(recv.recv())))
.unwrap();
exec_handles.push(h);
}
Self {
executor,
sender,
exec_handles: Arc::new(Mutex::new(exec_handles)),
}
Self { executor, sender }
}

async fn stop(&self) {
let _ = self.sender.send(()).await;
let mut eh = self.exec_handles.lock().unwrap();
for h in eh.drain(0..) {
let _ = h.join();
}
}

pub(crate) fn spawn<T: Send + 'static>(
Expand All @@ -360,6 +373,7 @@ pub struct TransportManager {
pub(crate) tx_executor: TransportExecutor,
#[cfg(feature = "stats")]
pub(crate) stats: Arc<crate::stats::TransportStats>,
pub(crate) task_controller: TaskController,
}

impl TransportManager {
Expand All @@ -383,10 +397,11 @@ impl TransportManager {
tx_executor: TransportExecutor::new(tx_threads),
#[cfg(feature = "stats")]
stats: std::sync::Arc::new(crate::stats::TransportStats::default()),
task_controller: TaskController::new(),
};

// @TODO: this should be moved into the unicast module
async_std::task::spawn({
this.task_controller.spawn({
let this = this.clone();
async move {
while let Ok(link) = new_unicast_link_receiver.recv_async().await {
Expand All @@ -412,6 +427,7 @@ impl TransportManager {
}

pub async fn close(&self) {
self.task_controller.terminate_all();
self.close_unicast().await;
self.tx_executor.stop().await;
}
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ zenoh-protocol = { workspace = true, features = ["std"] }
zenoh-result = { workspace = true }
zenoh-shm = { workspace = true, optional = true }
zenoh-sync = { workspace = true }
zenoh-task = { workspace = true }
zenoh-transport = { workspace = true }
zenoh-util = { workspace = true }

Expand Down
10 changes: 10 additions & 0 deletions zenoh/src/net/routing/dispatcher/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ impl Resource {
})
}

pub fn close(res: &mut Arc<Resource>) {
let r = get_mut_unchecked(res);
for (_s, c) in &mut r.childs {
Self::close(c);
}
r.parent.take();
r.childs.clear();
r.nonwild_prefix.take();
}

pub fn clean(res: &mut Arc<Resource>) {
let mut resclone = res.clone();
let mutres = get_mut_unchecked(&mut resclone);
Expand Down
3 changes: 3 additions & 0 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ impl HatBaseTrait for HatCode {
fn info(&self, _tables: &Tables, _kind: WhatAmI) -> String {
"graph {}".to_string()
}

fn close(&self, _tables: &mut Tables) {
}
}

struct HatContext {}
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ impl HatBaseTrait for HatCode {
_ => "graph {}".to_string(),
}
}

fn close(&self, tables: &mut Tables) {
hat_mut!(tables).peer_qabls.clear();
hat_mut!(tables).peer_subs.clear();
hat_mut!(tables).peers_net.take();
}
}

struct HatContext {
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub(crate) trait HatBaseTrait {
) -> ZResult<()>;

fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>);
fn close(&self, tables: &mut Tables);
}

pub(crate) trait HatPubSubTrait {
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ impl HatBaseTrait for HatCode {
fn info(&self, _tables: &Tables, _kind: WhatAmI) -> String {
"graph {}".to_string()
}

fn close(&self, tables: &mut Tables) {
hat_mut!(tables).gossip.take();
}
}

struct HatContext {}
Expand Down
9 changes: 9 additions & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,15 @@ impl HatBaseTrait for HatCode {
_ => "graph {}".to_string(),
}
}

fn close(&self, tables: &mut Tables) {
hat_mut!(tables).peer_qabls.clear();
hat_mut!(tables).router_qabls.clear();
hat_mut!(tables).peer_subs.clear();
hat_mut!(tables).router_subs.clear();
hat_mut!(tables).peers_net.take();
hat_mut!(tables).routers_net.take();
}
}

struct HatContext {
Expand Down
Loading
Loading