Skip to content

Commit

Permalink
Merge pull request #31 from worldcoin/0xkitsune/sync-to-head
Browse files Browse the repository at this point in the history
Fix: `WorldTree::sync_to_head`
  • Loading branch information
0xKitsune authored Nov 9, 2023
2 parents 8ca32ec + 54a1d2a commit e84babd
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 65 deletions.
82 changes: 82 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ edition = "2021"
[dependencies]
anyhow = "1.0.75"
axum = "0.6.20"
axum-middleware = { path = "crates/axum_middleware" }
axum-middleware = { path = "crates/axum-middleware" }
clap = { version = "4.4.7", features = ["derive"] }
common = { path = "crates/common" }
ethers = { version = "2.0.10", features = ["abigen", "ws", "ipc", "rustls", "openssl"] }
ethers-throttle = { path = "crates/ethers-throttle" }
eyre = "0.6.8"
futures = "0.3.28"
governor = "0.6.0"
hex = "0.4.3"
hyper = { version = "^0.14.27", features = ["server", "tcp", "http1", "http2"] }
metrics = "0.21.1"
Expand All @@ -28,6 +30,7 @@ thiserror = "1.0.49"
tokio = { version = "1.32.0", features = ["sync", "macros"] }
toml = "0.8.8"
tracing = "0.1.37"
url = "2.4.1"

[dev-dependencies]
reqwest = { version = "0.11.22", features = ["json"] }
Expand Down
24 changes: 23 additions & 1 deletion bin/tree_availability_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::sync::Arc;
use std::time::Duration;

use clap::Parser;
use common::metrics::init_statsd_exporter;
use common::shutdown_tracer_provider;
use common::tracing::{init_datadog_subscriber, init_subscriber};
use ethers::providers::{Http, Provider};
use ethers::types::H160;
use ethers_throttle::ThrottledProvider;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use governor::Jitter;
use tracing::Level;
use url::Url;
use world_tree::tree::service::TreeAvailabilityService;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -52,6 +56,14 @@ struct Opts {
port: u16,
#[clap(long, help = "Enable datadog backend for instrumentation")]
datadog: bool,

#[clap(
short,
long,
help = "Request per minute limit for rpc endpoint",
default_value = "0"
)]
throttle: u32,
}

const SERVICE_NAME: &str = "tree-availability-service";
Expand All @@ -69,7 +81,17 @@ pub async fn main() -> eyre::Result<()> {
init_subscriber(Level::INFO);
}

let middleware = Arc::new(Provider::<Http>::try_from(opts.rpc_endpoint)?);
let http_provider = Http::new(Url::parse(&opts.rpc_endpoint)?);
let throttled_http_provider = ThrottledProvider::new(
http_provider,
opts.throttle,
Some(Jitter::new(
Duration::from_millis(10),
Duration::from_millis(100),
)),
);
let middleware = Arc::new(Provider::new(throttled_http_provider));

let handles = TreeAvailabilityService::new(
opts.tree_depth,
opts.dense_prefix_depth,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
12 changes: 12 additions & 0 deletions crates/ethers-throttle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "ethers-throttle"
version = "0.1.0"
edition = "2021"

[dependencies]
async-trait = "0.1.74"
ethers = "2.0.10"
eyre = "0.6.8"
governor = "0.6.0"
serde = { version = "1.0.189", features = ["derive"] }

68 changes: 68 additions & 0 deletions crates/ethers-throttle/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::num::NonZeroU32;
use std::sync::Arc;

use async_trait::async_trait;
use ethers::providers::{JsonRpcClient, ProviderError};
use governor::clock::{QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::{Jitter, Quota, RateLimiter};
use serde::de::DeserializeOwned;
use serde::Serialize;

pub type Throttle = RateLimiter<
NotKeyed,
InMemoryState,
QuantaClock,
NoOpMiddleware<QuantaInstant>,
>;

#[derive(Clone, Debug)]
pub struct ThrottledProvider<P: JsonRpcClient> {
throttle: Arc<Throttle>,
jitter: Option<Jitter>,
inner: P,
}

impl<P: JsonRpcClient> ThrottledProvider<P> {
pub fn new(
provider: P,
requests_per_second: u32,
jitter: Option<Jitter>,
) -> Self {
let throttle = Arc::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(requests_per_second)
.expect("Could not initialize NonZeroU32"),
)));

ThrottledProvider {
throttle,
jitter,
inner: provider,
}
}
}

#[async_trait]
impl<P: JsonRpcClient> JsonRpcClient for ThrottledProvider<P> {
type Error = P::Error;

/// Sends a request with the provided JSON-RPC and parameters serialized as JSON
async fn request<T, R>(
&self,
method: &str,
params: T,
) -> Result<R, Self::Error>
where
T: std::fmt::Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send,
{
if let Some(jitter) = self.jitter {
self.throttle.until_ready_with_jitter(jitter).await;
} else {
self.throttle.until_ready().await;
}

self.inner.request(method, params).await
}
}
60 changes: 26 additions & 34 deletions src/tree/block_scanner.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::sync::atomic::{AtomicU64, Ordering};

use ethers::providers::Middleware;
use ethers::types::{
Address, BlockNumber, Filter, FilterBlockOption, Log, Topic, ValueOrArray,
};
use ethers::types::{BlockNumber, Filter, Log};

/// The `BlockScanner` utility tool enables allows parsing arbitrary onchain events
pub struct BlockScanner<M> {
Expand All @@ -13,6 +11,8 @@ pub struct BlockScanner<M> {
pub last_synced_block: AtomicU64,
/// The maximum block range to parse
window_size: u64,
//TODO:
filter: Filter,
}

impl<M> BlockScanner<M>
Expand All @@ -24,53 +24,45 @@ where
middleware: M,
window_size: u64,
current_block: u64,
filter: Filter,
) -> Self {
Self {
middleware,
last_synced_block: AtomicU64::new(current_block),
window_size,
filter,
}
}

/// Retrieves events matching the specified address and topics from the last synced block to the latest block.
///
/// # Arguments
///
/// * `address` - Optional address to target when fetching logs.
/// * `topics` - Optional topics to target when fetching logs, enabling granular filtering when looking for specific event signatures or topic values.
pub async fn next(
&self,
address: Option<ValueOrArray<Address>>,
topics: [Option<Topic>; 4],
) -> Result<Vec<Log>, M::Error> {
/// Retrieves events matching the specified address and topics from the last synced block to the latest block, stepping by `window_size`.
pub async fn next(&self) -> Result<Vec<Log>, M::Error> {
let latest_block = self.middleware.get_block_number().await?.as_u64();
let mut last_synced_block =
self.last_synced_block.load(Ordering::SeqCst);
let mut logs = Vec::new();

let last_synced_block = self.last_synced_block.load(Ordering::SeqCst);
while last_synced_block < latest_block {
let from_block = last_synced_block + 1;
let to_block = (from_block + self.window_size).min(latest_block);

if last_synced_block >= latest_block {
return Ok(Vec::new());
}
tracing::info!(?from_block, ?to_block, "Scanning blocks");

let from_block = last_synced_block + 1;
let to_block = latest_block.min(from_block + self.window_size);
let filter = self
.filter
.clone()
.from_block(BlockNumber::Number(from_block.into()))
.to_block(BlockNumber::Number(to_block.into()));

tracing::info!(?from_block, ?to_block, "Scanning blocks");
//TODO: can probably also use futures ordered here to get all of the logs quickly
logs.extend(self.middleware.get_logs(&filter).await?);

let logs = self
.middleware
.get_logs(&Filter {
block_option: FilterBlockOption::Range {
from_block: Some(BlockNumber::Number(from_block.into())),
to_block: Some(BlockNumber::Number(to_block.into())),
},
address,
topics,
})
.await?;
last_synced_block = to_block;
}

self.last_synced_block.store(to_block, Ordering::SeqCst);
self.last_synced_block
.store(last_synced_block, Ordering::SeqCst);

tracing::info!(?to_block, "Last synced block updated");
tracing::info!(?last_synced_block, "Last synced block updated");

Ok(logs)
}
Expand Down
Loading

0 comments on commit e84babd

Please sign in to comment.