Skip to content

Commit

Permalink
Global flow rework (#89)
Browse files Browse the repository at this point in the history
* add block manager and logic

* refacto block manager

* start logic on collection manager

* add todo for storage

* add event manager logic

* add token manager logic

* cleanup

* update env example

* fix typo

* impl suggestions

* move logic to crate instead of main

* cleanup

* switch starknet to workspace
  • Loading branch information
glihm authored Sep 13, 2023
1 parent 634fcbb commit 3871416
Show file tree
Hide file tree
Showing 29 changed files with 951 additions and 1,321 deletions.
26 changes: 14 additions & 12 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
RPC_PROVIDER=
RUST_BACKTRACE=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
RUST_LOG=
IS_DEV=
INDEXER_VERSION=1
START_BLOCK=862297
# If END_BLOCK commented -> latest block and poll head of the chain.
#END_BLOCK=862120

# indexer start block
START_BLOCK=
RUST_BACKTRACE=0
RUST_LOG=0
IS_DEV=true

# kinesis stream names
KINESIS_TRANSFER_STREAM_NAME=
KINESIS_COLLECTION_STREAM_NAME=

# dynamo db table names
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=

LAMBDA_FUNCTION_NAME=

ARK_INDEXER_TABLE_NAME=
ARK_TOKENS_EVENTS_TABLE_NAME=
ARK_COLLECTIONS_TABLE_NAME=
ARK_TOKENS_TABLE_NAME=
ARK_BLOCKS_TABLE_NAME=
ARK_TOKENS_OWNERS_TABLE_NAME=

IPFS_GATEWAY_URI=
35 changes: 34 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 16 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"crates/ark-starknet",
"crates/ark-tables",
"crates/ark-storage",
"crates/ark-core",
"crates/arkchain-indexer",
]

Expand All @@ -13,6 +14,15 @@ name = "ark-indexer"
version = "0.1.0"
edition = "2021"

[workspace.dependencies]
ark-db = { path = "./crates/ark-db" }
ark-starknet = { path = "./crates/ark-starknet" }
ark-metadata = { path = "./crates/ark-metadata" }
ark-storage = { path = "./crates/ark-storage" }
async-trait = "0.1.73"
starknet = "0.5.0"

# CLEAN DEPENDENCIES BASED ON MAIN ONLY.
[dependencies]
chrono = "0.4.19"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
Expand All @@ -23,7 +33,6 @@ anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde_json = "1.0"
starknet = "0.5.0"
dotenv = "0.15.0"
serde = "1.0.164"
aws-config = "0.56.0"
Expand All @@ -35,8 +44,9 @@ aws-types = "0.56"
num-bigint = { version = "0.4.3", default-features = false }
log = "0.4"
simple_logger = "4.2.0"
ark-db = { path = "./crates/ark-db" }
ark-starknet = { path = "./crates/ark-starknet" }
ark-metadata = { path = "./crates/ark-metadata" }
ark-storage = { path = "./crates/ark-storage" }
openssl = { version = "0.10", features = ["vendored"] }
openssl = { version = "0.10", features = ["vendored"] }
ark-db.workspace = true
ark-starknet.workspace = true
ark-metadata.workspace = true
ark-storage.workspace = true
ark-core = { path = "./crates/ark-core" }
35 changes: 35 additions & 0 deletions crates/ark-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "ark-core"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = "0.4.19"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing = "0.1"
tracing-log = "0.1"
regex = "1.5.4"
anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde_json = "1.0"
dotenv = "0.15.0"
serde = "1.0.164"
aws-config = "0.56.0"
aws-sdk-dynamodb = "0.29.0"
clap = "4.3.8"
structopt = { version = "0.3", default-features = false }
hex = "0.4.3"
aws-types = "0.56"
num-bigint = { version = "0.4.3", default-features = false }
log = "0.4"
simple_logger = "4.2.0"
openssl = { version = "0.10", features = ["vendored"] }

ark-db.workspace = true
ark-starknet.workspace = true
ark-metadata.workspace = true
ark-storage.workspace = true
starknet.workspace = true
142 changes: 142 additions & 0 deletions crates/ark-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
mod managers;

use anyhow::Result;
use ark_starknet::client::{StarknetClient, StarknetClientHttp};
use ark_storage::storage_manager::StorageManager;
use dotenv::dotenv;
use managers::collection_manager::ContractType;
use managers::{BlockManager, CollectionManager, EventManager, TokenManager};
use starknet::core::types::*;
use std::env;
use tokio::time::{self, Duration};
use tracing::{span, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Registry};

pub async fn indexer_main_loop<T: StorageManager>(storage: T) -> Result<()> {
dotenv().ok();

init_tracing();

let rpc_provider = env::var("RPC_PROVIDER").expect("RPC_PROVIDER must be set");
let sn_client = StarknetClientHttp::new(&rpc_provider.clone())?;
let block_manager = BlockManager::new(&storage, &sn_client);
let mut event_manager = EventManager::new(&storage, &sn_client);
let mut token_manager = TokenManager::new(&storage, &sn_client);
let mut collection_manager = CollectionManager::new(&storage, &sn_client);

let (from_block, to_block, poll_head_of_chain) = block_manager.get_block_range();

let mut current_u64 = sn_client.block_id_to_u64(&from_block).await?;
let mut to_u64 = sn_client.block_id_to_u64(&to_block).await?;

loop {
log::trace!("Indexing block: {} {}", current_u64, to_u64);

to_u64 = check_range(&sn_client, current_u64, to_u64, poll_head_of_chain).await;
if current_u64 > to_u64 {
continue;
}

if !block_manager.check_candidate(current_u64) {
continue;
}

let block_ts = sn_client.block_time(BlockId::Number(current_u64)).await?;

let blocks_events = sn_client
.fetch_events(
BlockId::Number(current_u64),
BlockId::Number(current_u64),
event_manager.keys_selector(),
)
.await?;

for (_, events) in blocks_events {
for e in events {
let contract_address = e.from_address;

let contract_info =
match collection_manager.identify_contract(contract_address).await {
Ok(info) => info,
Err(e) => {
log::error!("Can't identify contract {contract_address}: {:?}", e);
continue;
}
};

let contract_type = contract_info.r#type;
if contract_type == ContractType::Other {
continue;
}

let token_event = match event_manager
.format_event(&e, contract_type, block_ts)
.await
{
Ok(te) => te,
Err(err) => {
log::error!("Can't format event {:?}\nevent: {:?}", err, e);
continue;
}
};

match token_manager.format_token(&token_event).await {
Ok(()) => (),
Err(err) => {
log::error!("Can't format token {:?}\ntevent: {:?}", err, token_event);
continue;
}
}
}
}

current_u64 += 1;
}
}

async fn check_range(
client: &StarknetClientHttp,
current: u64,
to: u64,
poll_head_of_chain: bool,
) -> u64 {
if current >= to {
if !poll_head_of_chain {
// TODO: can print some stats here if necessary.
log::info!("End of indexing block range");
std::process::exit(0);
}

// TODO: make this duration configurable (DELAY_HEAD_OF_CHAIN).
// But we are at HOC, so for now the block interval is 3 min.
// However, we want the block as soon as it's mined.
time::sleep(Duration::from_secs(1)).await;

// Head of the chain requested -> check the last block and continue
// indexing loop.
return client
.block_number()
.await
.expect("Can't fetch last block number");

Check warning on line 120 in crates/ark-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement

warning: unneeded `return` statement --> crates/ark-core/src/lib.rs:117:9 | 117 | / return client 118 | | .block_number() 119 | | .await 120 | | .expect("Can't fetch last block number"); | |____________________________________________________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return help: remove `return` | 117 ~ client 118 + .block_number() 119 + .await 120 ~ .expect("Can't fetch last block number") |
} else {
return to;

Check warning on line 122 in crates/ark-core/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement

warning: unneeded `return` statement --> crates/ark-core/src/lib.rs:122:9 | 122 | return to; | ^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return help: remove `return` | 122 - return to; 122 + to |
}
}

fn init_tracing() {
// Initialize the LogTracer to convert `log` records to `tracing` events
tracing_log::LogTracer::init().expect("Setting log tracer failed.");

// Create the layers
let env_filter = EnvFilter::from_default_env();
let fmt_layer = fmt::layer();

// Combine layers and set as global default
let subscriber = Registry::default().with(env_filter).with(fmt_layer);

tracing::subscriber::set_global_default(subscriber)
.expect("Setting default subscriber failed.");

let main_span = span!(Level::TRACE, "main");
let _main_guard = main_span.enter();
}
Loading

0 comments on commit 3871416

Please sign in to comment.