Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing to tree_availability #21

Merged
merged 42 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c9fab14
updating server middleware
0xKitsune Oct 27, 2023
426b0c3
Merge branch 'main' into 0xkitsune/tracing
0xKitsune Oct 27, 2023
03884c6
added logging to tree_data
0xKitsune Oct 27, 2023
4fc52f2
updated tracing
0xKitsune Oct 28, 2023
ea0fdc4
add tracing-opentelemetry
0xKitsune Oct 29, 2023
54f4355
added open telemetry and datadog dependencies
0xKitsune Oct 29, 2023
a33530a
added tracing mod to common crate
0xKitsune Oct 29, 2023
dbd10bb
updated tracing mod in common
0xKitsune Oct 30, 2023
7bb11b7
updated tracing in common mod
0xKitsune Oct 30, 2023
ae5489a
added logging middleware for server, updated common::tracing
0xKitsune Oct 30, 2023
053d86e
cargo sort
0xKitsune Oct 30, 2023
f4b0870
cargo clippy
0xKitsune Oct 30, 2023
c20c8fa
cargo fmt
0xKitsune Oct 30, 2023
45b5acb
patched syncing
0xKitsune Oct 31, 2023
0476f43
added window size as a param for tree avail service
0xKitsune Nov 1, 2023
f8f6f99
updated block scanner logic
0xKitsune Nov 1, 2023
e3beba3
added reqwest client feature for otel dd dependency, enabling async r…
0xKitsune Nov 1, 2023
f91a1a9
added rt-tokio for dd batch exporter
0xKitsune Nov 1, 2023
71459ad
updated service name
0xKitsune Nov 1, 2023
108ff3a
fix: typo
0xKitsune Nov 1, 2023
3ffcec4
fixed merge conflicts
0xKitsune Nov 1, 2023
a01f26b
added abi mod to state bridge
0xKitsune Nov 1, 2023
16a8fe6
fixed compilation errors
0xKitsune Nov 1, 2023
17a6f49
updated docs
0xKitsune Nov 1, 2023
f12ba1d
updated world tree root docs
0xKitsune Nov 1, 2023
e984074
updating state bridge service binary
0xKitsune Nov 1, 2023
9868ddf
added todos
0xKitsune Nov 2, 2023
3d12d1e
Merge pull request #26 from worldcoin/0xkitsune/docs
0xKitsune Nov 2, 2023
deb337c
fixed comment, fixed typo
0xKitsune Nov 2, 2023
232d369
updated tracing fmt
0xKitsune Nov 2, 2023
0680afe
enabled middleware for server
0xKitsune Nov 2, 2023
833de71
added instrumentation
0xKitsune Nov 2, 2023
8049299
cargo clippy
0xKitsune Nov 2, 2023
768bdbf
updated default window size
0xKitsune Nov 2, 2023
c7dbea4
added todo to remove `deleteIdentities` with batch size from abi
0xKitsune Nov 2, 2023
6184014
fixed merge conflicts
0xKitsune Nov 2, 2023
3116960
cargo clippy
0xKitsune Nov 2, 2023
9a34906
fixing merge conflicts
0xKitsune Nov 2, 2023
2c8df91
Merge pull request #25 from worldcoin/0xkitsune/syncing
0xKitsune Nov 3, 2023
ef2db44
updated dd subscriber
0xKitsune Nov 3, 2023
c081a17
disabled dd for now
0xKitsune Nov 3, 2023
300dc74
Merge branch '0xkitsune/tracing' of https://github.com/worldcoin/iden…
0xKitsune Nov 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
808 changes: 535 additions & 273 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@ edition = "2021"

[workspace]
members = [
"crates/common"
,
"crates/common",
"crates/sequencer",
"crates/state_bridge",
"crates/tree_availability"]

[dependencies]
clap = { version = "4.4.6", features = ["derive"] }
common = { path = "./crates/common" }
ethers = "2.0.10"
eyre = "0.6.8"
futures = "0.3.28"
metrics = "0.21.1"
reqwest = "0.11.22"
serde = "1.0.188"
state_bridge = { path = "crates/state_bridge" }
tokio = { version = "1.33.0", features = ["full"] }
Expand Down
177 changes: 47 additions & 130 deletions bin/state_bridge_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,30 @@ use ethers::prelude::{
SignerMiddleware, H160,
};
use ethers::providers::Middleware;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use state_bridge::bridge::{IBridgedWorldID, IStateBridge, StateBridge};
use state_bridge::root::IWorldIDIdentityManager;
use state_bridge::abi::{
IBridgedWorldID, IStateBridge, IWorldIDIdentityManager,
};
use state_bridge::bridge::StateBridge;
use state_bridge::StateBridgeService;
use tracing::info;

// The state bridge service propagates roots from the world tree. Frequency of root propagation is specified
// by the relaying_period. This service will not propagate roots that have already been propagated before.
#[derive(Parser, Debug)]
#[clap(
name = "State Bridge Service",
about = "The state bridge service propagates roots according to the specified relaying_period by calling the propagateRoot() method on each specified World ID StateBridge. The state bridge service will also make sure that it doesn't propagate roots that have already been propagated and have finalized on the BridgedWorldID side."
about = "The state bridge service listens to root changes from the WorldIdIdentityManager and propagates them to each of the corresponding Layer 2s specified in the configuration file."
)]
struct Options {
#[clap(long, help = "Path to the TOML state bridge service config file")]
struct Opts {
#[clap(
short,
long,
help = "Path to the TOML state bridge service config file"
)]
config: PathBuf,
}

// Converts a u64 into a Duration using Duration::from_secs
mod duration_seconds {
use std::time::Duration;

use serde::{Deserialize, Deserializer, Serializer};

pub fn serialize<S>(duration: &Duration, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
s.serialize_u64(duration.as_secs())
}

pub fn deserialize<'de, D>(d: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(d)?;
Ok(Duration::from_secs(secs))
}
}

#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
struct BridgeConfig {
name: String,
Expand All @@ -58,14 +42,24 @@ struct BridgeConfig {
bridged_rpc_url: String,
}

// The config TOML file defines all the necessary parameters to spawn a state bridge service.
// rpc_url - HTTP rpc url for an Ethereum node (string)
// private_key - pk to an address that will call the propagateRoot() method on the StateBridge contract (string)
// world_id_address - WorldIDIdentityManager contract address (string)
// bridge_pair_addresses - List of StateBridge and BridgedWorldID contract address pairs (strings)
// bridged_world_id_addresses - List of BridgedWorldID contract addresses (strings)
// relaying_period: propagateRoot() call period time in seconds (u64)
// block_confirmations - Number of block confirmations required for the propagateRoot call on the StateBridge contract (optional number)
//TODO: lets update this to be a yaml file and then we can do something like the following:
// rpc_url: ""
// private_key: ""
// world_id_address: ""
// block_confirmations: ""
// state_bridges:
// optimism:
// state_bridge_address: ""
// bridged_world_id_address: ""
// bridged_rpc_url: ""
// relaying_period_seconds: 5
//
// arbitrum:
// state_bridge_address: ""
// bridged_world_id_address: ""
// bridged_rpc_url: ""
// relaying_period_seconds: 5

#[derive(Deserialize, Serialize, Debug, Clone)]
struct Config {
// RPC URL for the HTTP provider (World ID IdentityManager)
Expand All @@ -77,37 +71,25 @@ struct Config {
// List of `StateBridge` and `BridgedWorldID` pair addresses
bridge_configs: Vec<BridgeConfig>,
// `propagateRoot()` call period time in seconds
#[serde(with = "duration_seconds")]
relaying_period_seconds: Duration,
// Number of block confirmations required for the `propagateRoot` call on the `StateBridge`
// contract
#[serde(default = "default_block_confirmations")]
block_confirmations: usize,
}

fn default_block_confirmations() -> usize {
1usize
block_confirmations: Option<usize>,
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
let options = Options::parse();

let contents = fs::read_to_string(&options.config)?;

let opts = Opts::parse();
let contents = fs::read_to_string(&opts.config)?;
let config: Config = toml::from_str(&contents)?;

let rpc_url = config.rpc_url;

let block_confirmations = config.block_confirmations;

spawn_state_bridge_service(
rpc_url,
config.rpc_url,
config.private_key,
config.world_id_address,
config.bridge_configs,
config.relaying_period_seconds,
block_confirmations,
config.block_confirmations.unwrap_or(0),
)
.await?;

Expand All @@ -127,15 +109,13 @@ async fn spawn_state_bridge_service(

let chain_id = provider.get_chainid().await?.as_u64();

let wallet = private_key
.parse::<LocalWallet>()
.expect("couldn't instantiate wallet from private key")
.with_chain_id(chain_id);
let wallet = private_key.parse::<LocalWallet>()?.with_chain_id(chain_id);
let wallet_address = wallet.address();

let middleware = SignerMiddleware::new(provider, wallet);
let middleware = NonceManagerMiddleware::new(middleware, wallet_address);
let middleware = Arc::new(middleware);
let signer_middleware = SignerMiddleware::new(provider, wallet);
let nonce_manager_middleware =
NonceManagerMiddleware::new(signer_middleware, wallet_address);
let middleware = Arc::new(nonce_manager_middleware);

let world_id_interface =
IWorldIDIdentityManager::new(world_id_address, middleware.clone());
Expand Down Expand Up @@ -187,75 +167,12 @@ async fn spawn_state_bridge_service(
info!("Added a bridge to {} to the state-bridge-service", name);
}

state_bridge_service.spawn().await?;

Ok(())
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
use std::time::Duration;

use super::{Config, H160};
use crate::BridgeConfig;
let handles = state_bridge_service.spawn().await?;

#[tokio::test]
async fn test_deserialize_toml() -> eyre::Result<()> {
let config: Config = toml::from_str(
r#"
rpc_url = "127.0.0.1:8545"
private_key = "4c0883a69102937d6231471b5dbb6204fe5129617082792ae468d01a3f362318"
world_id_address = "0x3f0BF744bb79A0b919f7DED73724ec20c43572B9"
bridge_configs = [
[
"Optimism",
"0x3f0BF744bb79A0b919f7DED73724ec20c43572B9",
"0x4f0BF744bb79A0b919f7DED73724ec20c43572B9",
"127.0.0.1:8545",
]
]
relaying_period_seconds = 5
"#)
.expect("couldn't deserialize toml-encoded string");

assert_eq!(
config.rpc_url,
String::from("127.0.0.1:8545"),
"RPC didn't match"
);
assert_eq!(config.private_key, String::from("4c0883a69102937d6231471b5dbb6204fe5129617082792ae468d01a3f362318"), "private key didn't match");
assert_eq!(
config.world_id_address,
H160::from_str("0x3f0BF744bb79A0b919f7DED73724ec20c43572B9")
.unwrap(),
"World ID address didn't match"
);
let bridged_configs: Vec<BridgeConfig> = vec![BridgeConfig {
name: "Optimism".to_string(),
state_bridge_address: H160::from_str(
"0x3f0BF744bb79A0b919f7DED73724ec20c43572B9",
)
.unwrap(),
bridged_world_id_address: H160::from_str(
"0x4f0BF744bb79A0b919f7DED73724ec20c43572B9",
)
.unwrap(),
bridged_rpc_url: "127.0.0.1:8545".to_string(),
}];

assert_eq!(config.bridge_configs, bridged_configs);
// assert it uses serde default 1 block confirmation
assert_eq!(
config.block_confirmations, 1usize,
"block confirmations didn't match"
);
assert_eq!(
config.relaying_period_seconds,
Duration::from_secs(5u64),
"relaying period didn't match"
);

Ok(())
let mut handles = handles.into_iter().collect::<FuturesUnordered<_>>();
while let Some(result) = handles.next().await {
result??;
}

Ok(())
}
20 changes: 20 additions & 0 deletions bin/tree_availability_service.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::sync::Arc;

use clap::Parser;
use common::tracing::init_subscriber;
use ethers::providers::{Http, Provider};
use ethers::types::H160;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tracing::Level;
use tree_availability::TreeAvailabilityService;

#[derive(Parser, Debug)]
Expand All @@ -30,6 +32,13 @@ struct Opts {
address: H160,
#[clap(short, long, help = "Creation block of the World Tree")]
creation_block: u64,
#[clap(
short,
long,
help = "Maximum window size when scanning blocks for TreeChanged events",
default_value = "1000"
)]
window_size: u64,
#[clap(short, long, help = "Ethereum RPC endpoint")]
rpc_endpoint: String,
#[clap(
Expand All @@ -39,26 +48,37 @@ struct Opts {
default_value = "8080"
)]
port: u16,
#[clap(long, help = "Enable datadog backend for instrumentation")]
datadog: bool,
}

#[tokio::main]
pub async fn main() -> eyre::Result<()> {
let opts = Opts::parse();

if opts.datadog {
todo!("Initialize datadog tracing backend");
// init_datadog_subscriber("tree-availability-service", Level::INFO);
} else {
init_subscriber(Level::INFO);
}

let middleware = Arc::new(Provider::<Http>::try_from(opts.rpc_endpoint)?);
let handles = TreeAvailabilityService::new(
opts.tree_depth,
opts.dense_prefix_depth,
opts.tree_history_size,
opts.address,
opts.creation_block,
opts.window_size,
middleware,
)
.serve(opts.port)
.await;

let mut handles = handles.into_iter().collect::<FuturesUnordered<_>>();
while let Some(result) = handles.next().await {
tracing::error!("TreeAvailabilityError: {:?}", result);
result??;
}

Expand Down
10 changes: 9 additions & 1 deletion crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,12 @@ ethers = { version = "2.0.10", features = [
] }
eyre = "0.6.8"
serde = "1.0.189"
tracing = "0.1.39"
metrics = "0.21.1"
opentelemetry = { version = "0.20.0", features = ["rt-tokio"] }
opentelemetry-datadog = {verison = "0.8.0", features = ["reqwest-client"]}
tracing = "0.1.40"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = {version = "0.3.17", features = ["env-filter"]}
tokio = "1.33.0"
http = "0.2.9"
opentelemetry-http = "0.9.0"
1 change: 1 addition & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod test_utilities;
pub mod tracing;
55 changes: 55 additions & 0 deletions crates/common/src/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use tracing::Level;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

pub fn init_subscriber(level: Level) {
let fmt_layer = fmt::layer().with_target(false).with_level(true);

let filter = EnvFilter::from_default_env().add_directive(level.into());

tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.init();
}

pub fn init_datadog_subscriber(service_name: &str, level: Level) {
let tracer = opentelemetry_datadog::new_pipeline()
.with_service_name(service_name)
.with_api_version(opentelemetry_datadog::ApiVersion::Version05)
.install_batch(opentelemetry::runtime::Tokio)
.expect("Could not initialize tracer");

let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer);

let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
.add_directive(level.into());

let fmt_layer = fmt::layer().with_target(false).with_level(true);

tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.with(otel_layer)
.init();
}

pub fn trace_from_headers(headers: &http::HeaderMap) {
tracing::Span::current().set_parent(
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&opentelemetry_http::HeaderExtractor(headers))
}),
);
}

pub fn trace_to_headers(headers: &mut http::HeaderMap) {
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(
&tracing::Span::current().context(),
&mut opentelemetry_http::HeaderInjector(headers),
);
});
}
Loading
Loading