From d9e10e8eb8cbcb20e51ec8b64a62a80811b84d8c Mon Sep 17 00:00:00 2001 From: Palash Nigam Date: Thu, 28 Nov 2024 21:16:40 +0530 Subject: [PATCH] fix: Use Arc to reduce clones and improve memory usage in dedupe Fixes: #3028 --- src/core/data_loader/dedupe.rs | 38 ++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/core/data_loader/dedupe.rs b/src/core/data_loader/dedupe.rs index 4ab458aa4e1..fefd74c618a 100644 --- a/src/core/data_loader/dedupe.rs +++ b/src/core/data_loader/dedupe.rs @@ -25,25 +25,25 @@ pub struct Dedupe { /// Represents the current state of the operation. enum State { /// Means that the operation has been executed and the result is stored. - Ready(Value), + Ready(Arc), /// Means that the operation is in progress and the result can be sent via /// the stored sender whenever it's available in the future. - Pending(Weak>), + Pending(Weak>>), } /// Represents the next steps enum Step { /// The operation has been executed and the result must be returned. - Return(Value), + Return(Arc), /// The operation is in progress and the result must be awaited on the /// receiver. - Await(broadcast::Receiver), + Await(broadcast::Receiver>), /// The operation needs to be executed and the result needs to be sent to /// the provided sender. - Init(Arc>), + Init(Arc>>), } impl Dedupe { @@ -51,7 +51,7 @@ impl Dedupe { Self { cache: Arc::new(Mutex::new(HashMap::new())), size, persist } } - pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> V + pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Arc where Fn: FnOnce() -> Fut, Fut: Future, @@ -70,14 +70,14 @@ impl Dedupe { } }, Step::Init(tx) => { - let value = or_else().await; + let value = Arc::new(or_else().await); let mut guard = self.cache.lock().unwrap(); if self.persist { - guard.insert(key.to_owned(), State::Ready(value.clone())); + guard.insert(key.to_owned(), State::Ready(Arc::clone(&value))); } else { guard.remove(key); } - let _ = tx.send(value.clone()); + let _ = tx.send(Arc::clone(&value)); value } }; @@ -91,7 +91,7 @@ impl Dedupe { if let Some(state) = this.get(key) { match state { - State::Ready(value) => return Step::Return(value.clone()), + State::Ready(value) => return Step::Return(Arc::clone(value)), State::Pending(tx) => { // We can upgrade from Weak to Arc only in case when // original tx is still alive @@ -120,15 +120,17 @@ impl DedupeResult { pub fn new(persist: bool) -> Self { Self(Dedupe::new(1, persist)) } -} -impl DedupeResult { pub async fn dedupe<'a, Fn, Fut>(&'a self, key: &'a K, or_else: Fn) -> Result where Fn: FnOnce() -> Fut, Fut: Future>, { - self.0.dedupe(key, or_else).await + let result = self.0.dedupe(key, or_else).await; + match Arc::try_unwrap(result) { + Ok(result) => result, + Err(arc) => (*arc).clone(), + } } } @@ -148,7 +150,7 @@ mod tests { async fn test_no_key() { let cache = Arc::new(Dedupe::::new(1000, true)); let actual = cache.dedupe(&1, || Box::pin(async { 1 })).await; - assert_eq!(actual, 1); + assert_eq!(*actual, 1); } #[tokio::test] @@ -157,7 +159,7 @@ mod tests { cache.dedupe(&1, || Box::pin(async { 1 })).await; let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await; - assert_eq!(actual, 1); + assert_eq!(*actual, 1); } #[tokio::test] @@ -169,7 +171,7 @@ mod tests { } let actual = cache.dedupe(&1, || Box::pin(async { 2 })).await; - assert_eq!(actual, 0); + assert_eq!(*actual, 0); } #[tokio::test] @@ -190,7 +192,7 @@ mod tests { }); let (a, b) = join!(a, b); - assert_eq!(a, b); + assert_eq!(*a, *b); } async fn compute_value(counter: Arc) -> String { @@ -283,7 +285,7 @@ mod tests { task_1.abort(); let actual = task_2.await.unwrap(); - assert_eq!(actual, 200) + assert_eq!(*actual, 200) } // TODO: This is a failing test