Skip to content

Commit

Permalink
fix: Use Arc to reduce clones and improve memory usage in dedupe
Browse files Browse the repository at this point in the history
Fixes: #3028
  • Loading branch information
palash25 committed Nov 29, 2024
1 parent ceefed9 commit d9e10e8
Showing 1 changed file with 20 additions and 18 deletions.
38 changes: 20 additions & 18 deletions src/core/data_loader/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,33 @@ pub struct Dedupe<Key, Value> {
/// Represents the current state of the operation.
enum State<Value> {
/// Means that the operation has been executed and the result is stored.
Ready(Value),
Ready(Arc<Value>),

/// Means that the operation is in progress and the result can be sent via
/// the stored sender whenever it's available in the future.
Pending(Weak<broadcast::Sender<Value>>),
Pending(Weak<broadcast::Sender<Arc<Value>>>),
}

/// Represents the next steps
enum Step<Value> {
/// The operation has been executed and the result must be returned.
Return(Value),
Return(Arc<Value>),

/// The operation is in progress and the result must be awaited on the
/// receiver.
Await(broadcast::Receiver<Value>),
Await(broadcast::Receiver<Arc<Value>>),

/// The operation needs to be executed and the result needs to be sent to
/// the provided sender.
Init(Arc<broadcast::Sender<Value>>),
Init(Arc<broadcast::Sender<Arc<Value>>>),
}

impl<K: Key, V: Value> Dedupe<K, V> {
pub fn new(size: usize, persist: bool) -> Self {
Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist }
}

pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc<V>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = V>,
Expand All @@ -70,14 +70,14 @@ impl<K: Key, V: Value> Dedupe<K, V> {
}
},
Step::Init(tx) => {
let value = or_else().await;
let value = Arc::new(or_else().await);
let mut guard = self.cache.lock().unwrap();
if self.persist {
guard.insert(key.to_owned(), State::Ready(value.clone()));
guard.insert(key.to_owned(), State::Ready(Arc::clone(&value)));
} else {
guard.remove(key);
}
let _ = tx.send(value.clone());
let _ = tx.send(Arc::clone(&value));
value
}
};
Expand All @@ -91,7 +91,7 @@ impl<K: Key, V: Value> Dedupe<K, V> {

if let Some(state) = this.get(key) {
match state {
State::Ready(value) => return Step::Return(value.clone()),
State::Ready(value) => return Step::Return(Arc::clone(value)),
State::Pending(tx) => {
// We can upgrade from Weak to Arc only in case when
// original tx is still alive
Expand Down Expand Up @@ -120,15 +120,17 @@ impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub fn new(persist: bool) -> Self {
Self(Dedupe::new(1, persist))
}
}

impl<K: Key, V: Value, E: Value> DedupeResult<K, V, E> {
pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Result<V, E>
where
Fn: FnOnce() -> Fut,
Fut: Future<Output = Result<V, E>>,
{
self.0.dedupe(key, or_else).await
let result = self.0.dedupe(key, or_else).await;
match Arc::try_unwrap(result) {
Ok(result) => result,
Err(arc) => (*arc).clone(),
}
}
}

Expand All @@ -148,7 +150,7 @@ mod tests {
async fn test_no_key() {
let cache = Arc::new(Dedupe::<u64, u64>::new(1000, true));
let actual = cache.dedupe(&1, || Box::pin(async { 1 })).await;
assert_eq!(actual, 1);
assert_eq!(*actual, 1);
}

#[tokio::test]
Expand All @@ -157,7 +159,7 @@ mod tests {
cache.dedupe(&1, || Box::pin(async { 1 })).await;

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 1);
assert_eq!(*actual, 1);
}

#[tokio::test]
Expand All @@ -169,7 +171,7 @@ mod tests {
}

let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await;
assert_eq!(actual, 0);
assert_eq!(*actual, 0);
}

#[tokio::test]
Expand All @@ -190,7 +192,7 @@ mod tests {
});
let (a, b) = join!(a, b);

assert_eq!(a, b);
assert_eq!(*a, *b);
}

async fn compute_value(counter: Arc<AtomicUsize>) -> String {
Expand Down Expand Up @@ -283,7 +285,7 @@ mod tests {
task_1.abort();

let actual = task_2.await.unwrap();
assert_eq!(actual, 200)
assert_eq!(*actual, 200)
}

// TODO: This is a failing test
Expand Down

0 comments on commit d9e10e8

Please sign in to comment.