Skip to content

Commit

Permalink
Merge pull request #25 from worldcoin/0xkitsune/syncing
Browse files Browse the repository at this point in the history
Fix: Syncing
  • Loading branch information
0xKitsune authored Nov 3, 2023
2 parents 8049299 + 9a34906 commit 2c8df91
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 43 deletions.
34 changes: 24 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ edition = "2021"

[workspace]
members = [
"crates/common"
,
"crates/common",
"crates/sequencer",
"crates/state_bridge",
"crates/tree_availability"]
Expand All @@ -18,7 +17,6 @@ common = { path = "./crates/common" }
ethers = "2.0.10"
eyre = "0.6.8"
futures = "0.3.28"
reqwest = "0.11.22"
serde = "1.0.188"
state_bridge = { path = "crates/state_bridge" }
tokio = { version = "1.33.0", features = ["full"] }
Expand Down
12 changes: 10 additions & 2 deletions bin/tree_availability_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ struct Opts {
address: H160,
#[clap(short, long, help = "Creation block of the World Tree")]
creation_block: u64,
#[clap(
short,
long,
help = "Maximum window size when scanning blocks for TreeChanged events",
default_value = "1000"
)]
window_size: u64,
#[clap(short, long, help = "Ethereum RPC endpoint")]
rpc_endpoint: String,
#[clap(
Expand All @@ -41,7 +48,6 @@ struct Opts {
default_value = "8080"
)]
port: u16,

#[clap(long, help = "Enable datadog backend for instrumentation")]
datadog: bool,
}
Expand All @@ -51,7 +57,7 @@ pub async fn main() -> eyre::Result<()> {
let opts = Opts::parse();

if opts.datadog {
init_datadog_subscriber("tree_availability_service", Level::INFO);
init_datadog_subscriber("tree-availability-service", Level::INFO);
} else {
init_subscriber(Level::INFO);
}
Expand All @@ -63,13 +69,15 @@ pub async fn main() -> eyre::Result<()> {
opts.tree_history_size,
opts.address,
opts.creation_block,
opts.window_size,
middleware,
)
.serve(opts.port)
.await;

let mut handles = handles.into_iter().collect::<FuturesUnordered<_>>();
while let Some(result) = handles.next().await {
tracing::error!("TreeAvailabilityError: {:?}", result);
result??;
}

Expand Down
4 changes: 2 additions & 2 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ ethers = { version = "2.0.10", features = [
eyre = "0.6.8"
serde = "1.0.189"
metrics = "0.21.1"
opentelemetry = "0.20.0"
opentelemetry-datadog = "0.8.0"
opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
opentelemetry-datadog = {verison = "0.8.0", features = ["reqwest-client"]}
tracing = "0.1.40"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = {version = "0.3.17", features = ["env-filter"]}
Expand Down
2 changes: 2 additions & 0 deletions crates/tree_availability/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl<M: Middleware> TreeAvailabilityService<M> {
tree_history_size: usize,
world_tree_address: H160,
world_tree_creation_block: u64,
window_size: u64,
middleware: Arc<M>,
) -> Self {
let tree = PoseidonTree::<Canonical>::new_with_dense_prefix(
Expand All @@ -64,6 +65,7 @@ impl<M: Middleware> TreeAvailabilityService<M> {
tree_history_size,
world_tree_address,
world_tree_creation_block,
window_size,
middleware,
));

Expand Down
3 changes: 3 additions & 0 deletions crates/tree_availability/src/world_tree/abi.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use ethers::middleware::contract::abigen;

//TODO: Update staging deployment and remove `deleteIdentities` containing a batch size from the abi below
abigen!(
IWorldIDIdentityManager,
r#"[
event TreeChanged(uint256 indexed preRoot, uint8 indexed kind, uint256 indexed postRoot)
function registerIdentities(uint256[8] calldata insertionProof, uint256 preRoot, uint32 startIndex, uint256[] calldata identityCommitments, uint256 postRoot) external
function deleteIdentities(uint256[8] calldata deletionProof, bytes calldata packedDeletionIndices, uint256 preRoot, uint256 postRoot) external
function deleteIdentities(uint256[8] calldata deletionProof, uint32 batchSize, bytes calldata packedDeletionIndices, uint256 preRoot, uint256 postRoot) external
]"#;
);
14 changes: 7 additions & 7 deletions crates/tree_availability/src/world_tree/block_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct BlockScanner<M> {
/// The onchain data provider
middleware: M,
/// The block from which to start parsing a given event
current_block: AtomicU64,
pub last_synced_block: AtomicU64,
/// The maximum block range to parse
window_size: u64,
}
Expand All @@ -27,7 +27,7 @@ where
) -> Self {
Self {
middleware,
current_block: AtomicU64::new(current_block),
last_synced_block: AtomicU64::new(current_block),
window_size,
}
}
Expand All @@ -45,16 +45,16 @@ where
) -> Result<Vec<Log>, M::Error> {
let latest_block = self.middleware.get_block_number().await?.as_u64();

let current_block = self.current_block.load(Ordering::SeqCst);
let last_synced_block = self.last_synced_block.load(Ordering::SeqCst);

if current_block >= latest_block {
if last_synced_block >= latest_block {
return Ok(Vec::new());
}

let from_block = current_block;
let from_block = last_synced_block + 1;
let to_block = latest_block.min(from_block + self.window_size);

tracing::info!(?current_block, ?latest_block, "Scanning blocks");
tracing::info!(?from_block, ?to_block, "Scanning blocks");

let logs = self
.middleware
Expand All @@ -68,7 +68,7 @@ where
})
.await?;

self.current_block.store(to_block, Ordering::SeqCst);
self.last_synced_block.store(to_block, Ordering::SeqCst);

tracing::info!(?to_block, "Current block updated");

Expand Down
3 changes: 2 additions & 1 deletion crates/tree_availability/src/world_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ impl<M: Middleware> WorldTree<M> {
tree_history_size: usize,
address: H160,
creation_block: u64,
window_size: u64,
middleware: Arc<M>,
) -> Self {
Self {
tree_data: Arc::new(TreeData::new(tree, tree_history_size)),
tree_updater: Arc::new(TreeUpdater::new(
address,
creation_block,
window_size,
middleware,
)),
synced: Arc::new(AtomicBool::new(false)),
Expand All @@ -79,7 +81,6 @@ impl<M: Middleware> WorldTree<M> {
loop {
tree_updater.sync_to_head(&tree_data).await?;

// Sleep a little to unblock the executor
tokio::time::sleep(Duration::from_secs(5)).await;
}
})
Expand Down
3 changes: 3 additions & 0 deletions crates/tree_availability/src/world_tree/tree_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crate::server::InclusionProof;
pub struct TreeData {
/// A canonical in-memory representation of the World Tree.
pub tree: RwLock<PoseidonTree<Derived>>,
/// Depth of the merkle tree.
pub depth: usize,
/// The number of historical tree roots to cache for serving older proofs.
pub tree_history_size: usize,
/// Cache of historical tree state, used to serve proofs against older roots. If the cache becomes larger than `tree_history_size`, the oldest roots are removed on a FIFO basis.
Expand All @@ -26,6 +28,7 @@ impl TreeData {
) -> Self {
Self {
tree_history_size,
depth: tree.depth(),
tree: RwLock::new(tree.derived()),
tree_history: RwLock::new(VecDeque::new()),
}
Expand Down
Loading

0 comments on commit 2c8df91

Please sign in to comment.