Skip to content

Commit

Permalink
Fixes and simplifications for RequestSharing (#3086)
Browse files Browse the repository at this point in the history
## Fixed Cache Size Metric
The `RequestSharing` component was the prime suspect for a significant
memory leak due to its metric displaying that the internal cache grows
indefinitely.

After wasting waaay too much time on this it appears that the metric
simply has a bug. If somebody calls `shared_or_else()` and there is
already an **outdated** `WeakShared` item in the cache that has not been
removed by GC we'd:
1. build a new future
2. `insert()` it into the cache
3. increase the metrics for the cache size by 1

But in this specific edge case the `insert()` would actually overwrite
an existing item in the cache and not add a new one so increasing the
metrics by 1 is actually over counting the number of cached items. Over
time this accumulated more and more and made it look like the component
had a serious memory leak.

The easy fix is to just not use `inc()` and `sub()` for adjusting the
cache size metrics but rather `set(cache.len())` since that is
absolutely fool proof.


## Removed 1 `RequestSharing` instance
The `TradeEstimator` has a `RequestSharing` and contains a `TradeFinder`
which also contains a `RequestSharing` instance. This double caching is
overly complex and due to that I already suspected it 3 times that it
might be related to a memory leak. Now only the `TradeFinder` uses
`RequestSharing` which is the component that actually sends the HTTP
requests.
  • Loading branch information
MartinquaXD authored Oct 29, 2024
1 parent f808db2 commit 2a3c2a4
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 29 deletions.
1 change: 0 additions & 1 deletion crates/shared/src/price_estimation/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ impl ExternalPriceEstimator {
timeout,
)),
rate_limiter,
driver.to_string(),
))
}

Expand Down
28 changes: 8 additions & 20 deletions crates/shared/src/price_estimation/trade_finder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ use {
PriceEstimationError,
Query,
},
crate::{
request_sharing::RequestSharing,
trade_finding::{TradeError, TradeFinding},
},
crate::trade_finding::{TradeError, TradeFinding},
anyhow::{anyhow, Result},
futures::future::{BoxFuture, FutureExt as _},
futures::future::FutureExt,
rate_limit::RateLimiter,
std::sync::Arc,
};
Expand All @@ -26,7 +23,6 @@ use {
#[derive(Clone)]
pub struct TradeEstimator {
inner: Arc<Inner>,
sharing: RequestSharing<Arc<Query>, BoxFuture<'static, Result<Estimate, PriceEstimationError>>>,
rate_limiter: Arc<RateLimiter>,
}

Expand All @@ -38,17 +34,12 @@ struct Inner {
}

impl TradeEstimator {
pub fn new(
finder: Arc<dyn TradeFinding>,
rate_limiter: Arc<RateLimiter>,
label: String,
) -> Self {
pub fn new(finder: Arc<dyn TradeFinding>, rate_limiter: Arc<RateLimiter>) -> Self {
Self {
inner: Arc::new(Inner {
finder,
verifier: None,
}),
sharing: RequestSharing::labelled(format!("estimator_{}", label)),
rate_limiter,
}
}
Expand All @@ -62,14 +53,11 @@ impl TradeEstimator {
}

async fn estimate(&self, query: Arc<Query>) -> Result<Estimate, PriceEstimationError> {
let fut = move |query: &Arc<Query>| {
rate_limited(
self.rate_limiter.clone(),
self.inner.clone().estimate(query.clone()),
)
.boxed()
};
self.sharing.shared_or_else(query, fut).await
rate_limited(
self.rate_limiter.clone(),
self.inner.clone().estimate(query.clone()),
)
.await
}
}

Expand Down
8 changes: 3 additions & 5 deletions crates/shared/src/request_sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ where

fn collect_garbage(cache: &Cache<Request, Fut>, label: &str) {
let mut cache = cache.lock().unwrap();
let len_before = cache.len() as u64;
cache.retain(|_request, weak| weak.upgrade().is_some());
Metrics::get()
.request_sharing_cached_items
.with_label_values(&[label])
.sub(len_before - cache.len() as u64);
.set(cache.len() as u64);
}

fn spawn_gc(cache: Cache<Request, Fut>, label: String) {
Expand All @@ -76,11 +75,10 @@ where

impl<A, B: Future> Drop for RequestSharing<A, B> {
fn drop(&mut self) {
let cache = self.in_flight.lock().unwrap();
Metrics::get()
.request_sharing_cached_items
.with_label_values(&[&self.request_label])
.sub(cache.len() as u64);
.set(0);
}
}

Expand Down Expand Up @@ -130,7 +128,7 @@ where
Metrics::get()
.request_sharing_cached_items
.with_label_values(&[&self.request_label])
.inc();
.set(in_flight.len() as u64);
shared
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/shared/src/trade_finding/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
use {
crate::{
price_estimation::{PriceEstimationError, Query},
request_sharing::RequestSharing,
request_sharing::{BoxRequestSharing, RequestSharing},
trade_finding::{Interaction, Quote, Trade, TradeError, TradeFinding},
},
anyhow::{anyhow, Context},
ethrpc::block_stream::CurrentBlockWatcher,
futures::{future::BoxFuture, FutureExt},
futures::FutureExt,
reqwest::{header, Client},
url::Url,
};
Expand All @@ -20,7 +20,7 @@ pub struct ExternalTradeFinder {
/// Utility to make sure no 2 identical requests are in-flight at the same
/// time. Instead of issuing a duplicated request this awaits the
/// response of the in-flight request.
sharing: RequestSharing<Query, BoxFuture<'static, Result<Trade, PriceEstimationError>>>,
sharing: BoxRequestSharing<Query, Result<Trade, PriceEstimationError>>,

/// Client to issue http requests with.
client: Client,
Expand Down

0 comments on commit 2a3c2a4

Please sign in to comment.