Skip to content

Commit

Permalink
Merge pull request #330 from moka-rs/fix-get-with-in-future-cache
Browse files Browse the repository at this point in the history
Fix memory leak in `get_with` and friend methods of `future::Cache`
  • Loading branch information
tatsuya6502 authored Oct 3, 2023
2 parents 0b160cd + 233e53d commit b3074fc
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 40 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Moka Cache — Change Log

## Version 0.12.1

### Fixed

- Fixed memory leak in `future::Cache` that occurred when `get_with()`,
`entry().or_insert_with()`, and similar methods were used ([#329][gh-issue-0329]).
- This bug was introduced in `v0.12.0`. Versions prior to `v0.12.0` do not
have this bug.


## Version 0.12.0

> **Note**
Expand Down Expand Up @@ -702,6 +712,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-Swatinem]: https://github.com/Swatinem
[gh-tinou98]: https://github.com/tinou98

[gh-issue-0329]: https://github.com/moka-rs/moka/issues/329/
[gh-issue-0322]: https://github.com/moka-rs/moka/issues/322/
[gh-issue-0255]: https://github.com/moka-rs/moka/issues/255/
[gh-issue-0252]: https://github.com/moka-rs/moka/issues/252/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.0"
version = "0.12.1"
edition = "2018"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down
2 changes: 1 addition & 1 deletion src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ where

// TODO: Instead using Arc<AtomicU8> to check if the actual operation was
// insert or update, check the return value of insert_with_or_modify. If it
// is_some, the value was inserted, otherwise the value was updated.
// is_some, the value was updated, otherwise the value was inserted.

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
Expand Down
61 changes: 38 additions & 23 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ where
S: BuildHasher,
{
is_waiter_value_set: bool,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiters: TrioArc<WaiterMap<K, V, S>>,
write_lock: RwLockWriteGuard<'a, WaiterValue<V>>,
}
Expand All @@ -71,15 +71,15 @@ where
S: BuildHasher,
{
fn new(
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiters: TrioArc<WaiterMap<K, V, S>>,
write_lock: RwLockWriteGuard<'a, WaiterValue<V>>,
) -> Self {
Self {
is_waiter_value_set: false,
cht_key,
hash,
w_key,
w_hash,
waiters,
write_lock,
}
Expand All @@ -103,7 +103,7 @@ where
// has been aborted. Remove our waiter to prevent the issue described in
// https://github.com/moka-rs/moka/issues/59
*self.write_lock = WaiterValue::EnclosingFutureAborted;
remove_waiter(&self.waiters, self.cht_key.clone(), self.hash);
remove_waiter(&self.waiters, self.w_key.clone(), self.w_hash);
self.is_waiter_value_set = true;
}
}
Expand Down Expand Up @@ -147,8 +147,8 @@ where
#[allow(clippy::too_many_arguments)]
pub(crate) async fn try_init_or_read<'a, C, I, O, E>(
&'a self,
key: &Arc<K>,
hash: u64,
c_key: &Arc<K>,
c_hash: u64,
type_id: TypeId,
cache: &C,
ignore_if: Arc<Mutex<Option<I>>>,
Expand All @@ -169,35 +169,35 @@ where
const MAX_RETRIES: usize = 200;
let mut retries = 0;

let cht_key = (Arc::clone(key), type_id);
let (w_key, w_hash) = waiter_key_hash(&self.waiters, c_key, type_id);

loop {
let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing));
let lock = waiter.write().await;

match try_insert_waiter(&self.waiters, cht_key.clone(), hash, &waiter) {
match try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter) {
None => {
// Our waiter was inserted.

// Create a guard. This will ensure to remove our waiter when the
// enclosing future has been aborted:
// https://github.com/moka-rs/moka/issues/59
let mut waiter_guard = WaiterGuard::new(
cht_key.clone(),
hash,
w_key.clone(),
w_hash,
TrioArc::clone(&self.waiters),
lock,
);

// Check if the value has already been inserted by other thread.
if let Some(value) = cache
.get_without_recording(key, hash, ignore_if.lock().await.as_mut())
.get_without_recording(c_key, c_hash, ignore_if.lock().await.as_mut())
.await
{
// Yes. Set the waiter value, remove our waiter, and return
// the existing value.
waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone())));
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
return InitResult::ReadExisting(value);
}

Expand All @@ -209,7 +209,7 @@ where
Ok(value) => {
let (waiter_val, init_res) = match post_init(value) {
Ok(value) => {
cache.insert(Arc::clone(key), hash, value.clone()).await;
cache.insert(Arc::clone(c_key), c_hash, value.clone()).await;
(
WaiterValue::Ready(Ok(value.clone())),
InitResult::Initialized(value),
Expand All @@ -224,14 +224,14 @@ where
}
};
waiter_guard.set_waiter_value(waiter_val);
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
return init_res;
}
// Panicked.
Err(payload) => {
waiter_guard.set_waiter_value(WaiterValue::InitFuturePanicked);
// Remove the waiter so that others can retry.
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
resume_unwind(payload);
}
} // The lock will be unlocked here.
Expand Down Expand Up @@ -311,27 +311,42 @@ where
}

#[inline]
fn remove_waiter<K, V, S>(waiter_map: &WaiterMap<K, V, S>, cht_key: (Arc<K>, TypeId), hash: u64)
fn remove_waiter<K, V, S>(waiter_map: &WaiterMap<K, V, S>, w_key: (Arc<K>, TypeId), w_hash: u64)
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
waiter_map.remove(hash, |k| k == &cht_key);
waiter_map.remove(w_hash, |k| k == &w_key);
}

#[inline]
fn try_insert_waiter<K, V, S>(
waiter_map: &WaiterMap<K, V, S>,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiter: &Waiter<V>,
) -> Option<Waiter<V>>
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
let waiter = TrioArc::clone(waiter);
waiter_map.insert_if_not_present(cht_key, hash, waiter)
waiter_map.insert_if_not_present(w_key, w_hash, waiter)
}

#[inline]
fn waiter_key_hash<K, V, S>(
waiter_map: &WaiterMap<K, V, S>,
c_key: &Arc<K>,
type_id: TypeId,
) -> ((Arc<K>, TypeId), u64)
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
let w_key = (Arc::clone(c_key), type_id);
let w_hash = waiter_map.hash(&w_key);
(w_key, w_hash)
}

fn panic_if_retry_exhausted_for_panicking(retries: usize, max: usize) {
Expand Down
28 changes: 14 additions & 14 deletions src/sync/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,21 @@ where
const MAX_RETRIES: usize = 200;
let mut retries = 0;

let (cht_key, hash) = self.cht_key_hash(key, type_id);
let (w_key, w_hash) = self.waiter_key_hash(key, type_id);

loop {
let waiter = TrioArc::new(RwLock::new(None));
let mut lock = waiter.write();

match self.try_insert_waiter(cht_key.clone(), hash, &waiter) {
match self.try_insert_waiter(w_key.clone(), w_hash, &waiter) {
None => {
// Our waiter was inserted.
// Check if the value has already been inserted by other thread.
if let Some(value) = get() {
// Yes. Set the waiter value, remove our waiter, and return
// the existing value.
*lock = Some(Ok(value.clone()));
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
return InitResult::ReadExisting(value);
}

Expand All @@ -106,14 +106,14 @@ where
}
};
*lock = waiter_val;
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
return init_res;
}
// Panicked.
Err(payload) => {
*lock = None;
// Remove the waiter so that others can retry.
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
resume_unwind(payload);
}
} // The write lock will be unlocked here.
Expand Down Expand Up @@ -184,25 +184,25 @@ where
}

#[inline]
fn remove_waiter(&self, cht_key: (Arc<K>, TypeId), hash: u64) {
self.waiters.remove(hash, |k| k == &cht_key);
fn remove_waiter(&self, w_key: (Arc<K>, TypeId), w_hash: u64) {
self.waiters.remove(w_hash, |k| k == &w_key);
}

#[inline]
fn try_insert_waiter(
&self,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiter: &Waiter<V>,
) -> Option<Waiter<V>> {
let waiter = TrioArc::clone(waiter);
self.waiters.insert_if_not_present(cht_key, hash, waiter)
self.waiters.insert_if_not_present(w_key, w_hash, waiter)
}

#[inline]
fn cht_key_hash(&self, key: &Arc<K>, type_id: TypeId) -> ((Arc<K>, TypeId), u64) {
let cht_key = (Arc::clone(key), type_id);
let hash = self.waiters.hash(&cht_key);
(cht_key, hash)
fn waiter_key_hash(&self, c_key: &Arc<K>, type_id: TypeId) -> ((Arc<K>, TypeId), u64) {
let w_key = (Arc::clone(c_key), type_id);
let w_hash = self.waiters.hash(&w_key);
(w_key, w_hash)
}
}
2 changes: 1 addition & 1 deletion src/sync_base/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ where

// TODO: Instead using Arc<AtomicU8> to check if the actual operation was
// insert or update, check the return value of insert_with_or_modify. If it
// is_some, the value was inserted, otherwise the value was updated.
// is_some, the value was updated, otherwise the value was inserted.

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
Expand Down

0 comments on commit b3074fc

Please sign in to comment.