diff --git a/.env.example b/.env.example index 5a1d50428..bcea61db5 100644 --- a/.env.example +++ b/.env.example @@ -1,23 +1,25 @@ RPC_PROVIDER= -RUST_BACKTRACE= -AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= -AWS_DEFAULT_REGION= -RUST_LOG= -IS_DEV= +INDEXER_VERSION=1 +START_BLOCK=862297 +# If END_BLOCK commented -> latest block and poll head of the chain. +#END_BLOCK=862120 -# indexer start block -START_BLOCK= +RUST_BACKTRACE=0 +RUST_LOG=0 +IS_DEV=true -# kinesis stream names KINESIS_TRANSFER_STREAM_NAME= KINESIS_COLLECTION_STREAM_NAME= -# dynamo db table names +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_DEFAULT_REGION= + +LAMBDA_FUNCTION_NAME= + +ARK_INDEXER_TABLE_NAME= ARK_TOKENS_EVENTS_TABLE_NAME= ARK_COLLECTIONS_TABLE_NAME= ARK_TOKENS_TABLE_NAME= ARK_BLOCKS_TABLE_NAME= ARK_TOKENS_OWNERS_TABLE_NAME= - -IPFS_GATEWAY_URI= \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index f60de2c6c..1423c6f9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,38 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "ark-core" +version = "0.1.0" +dependencies = [ + "anyhow", + "ark-db", + "ark-metadata", + "ark-starknet", + "ark-storage", + "aws-config", + "aws-sdk-dynamodb", + "aws-types", + "chrono", + "clap 4.4.2", + "dotenv", + "hex", + "log", + "num-bigint", + "openssl", + "regex", + "reqwest", + "serde", + "serde_json", + "simple_logger", + "starknet", + "structopt", + "tokio", + "tracing", + "tracing-log", + "tracing-subscriber", +] + [[package]] name = "ark-db" version = "0.1.0" @@ -179,6 +211,7 @@ name = "ark-indexer" version = "0.1.0" dependencies = [ "anyhow", + "ark-core", "ark-db", "ark-metadata", "ark-starknet", @@ -198,7 +231,6 @@ dependencies = [ "serde", "serde_json", "simple_logger", - "starknet", "structopt", "tokio", "tracing", @@ -235,6 +267,7 @@ version = "0.1.0" dependencies = [ "anyhow", "ark-db", + "async-trait", "dotenv", "hex", "log", diff --git a/Cargo.toml b/Cargo.toml index 5e0714460..d5f71f0ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/ark-starknet", "crates/ark-tables", "crates/ark-storage", + "crates/ark-core", "crates/arkchain-indexer", ] @@ -13,6 +14,15 @@ name = "ark-indexer" version = "0.1.0" edition = "2021" +[workspace.dependencies] +ark-db = { path = "./crates/ark-db" } +ark-starknet = { path = "./crates/ark-starknet" } +ark-metadata = { path = "./crates/ark-metadata" } +ark-storage = { path = "./crates/ark-storage" } +async-trait = "0.1.73" +starknet = "0.5.0" + +# CLEAN DEPENDENCIES BASED ON MAIN ONLY. [dependencies] chrono = "0.4.19" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } @@ -23,7 +33,6 @@ anyhow = "1.0" tokio = { version = "1", features = ["full"] } reqwest = { version = "0.11", features = ["json"] } serde_json = "1.0" -starknet = "0.5.0" dotenv = "0.15.0" serde = "1.0.164" aws-config = "0.56.0" @@ -35,8 +44,9 @@ aws-types = "0.56" num-bigint = { version = "0.4.3", default-features = false } log = "0.4" simple_logger = "4.2.0" -ark-db = { path = "./crates/ark-db" } -ark-starknet = { path = "./crates/ark-starknet" } -ark-metadata = { path = "./crates/ark-metadata" } -ark-storage = { path = "./crates/ark-storage" } -openssl = { version = "0.10", features = ["vendored"] } \ No newline at end of file +openssl = { version = "0.10", features = ["vendored"] } +ark-db.workspace = true +ark-starknet.workspace = true +ark-metadata.workspace = true +ark-storage.workspace = true +ark-core = { path = "./crates/ark-core" } \ No newline at end of file diff --git a/crates/ark-core/Cargo.toml b/crates/ark-core/Cargo.toml new file mode 100644 index 000000000..e967914a6 --- /dev/null +++ b/crates/ark-core/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "ark-core" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4.19" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +tracing = "0.1" +tracing-log = "0.1" +regex = "1.5.4" +anyhow = "1.0" +tokio = { version = "1", features = ["full"] } +reqwest = { version = "0.11", features = ["json"] } +serde_json = "1.0" +dotenv = "0.15.0" +serde = "1.0.164" +aws-config = "0.56.0" +aws-sdk-dynamodb = "0.29.0" +clap = "4.3.8" +structopt = { version = "0.3", default-features = false } +hex = "0.4.3" +aws-types = "0.56" +num-bigint = { version = "0.4.3", default-features = false } +log = "0.4" +simple_logger = "4.2.0" +openssl = { version = "0.10", features = ["vendored"] } + +ark-db.workspace = true +ark-starknet.workspace = true +ark-metadata.workspace = true +ark-storage.workspace = true +starknet.workspace = true diff --git a/crates/ark-core/src/lib.rs b/crates/ark-core/src/lib.rs new file mode 100644 index 000000000..59d11320e --- /dev/null +++ b/crates/ark-core/src/lib.rs @@ -0,0 +1,142 @@ +mod managers; + +use anyhow::Result; +use ark_starknet::client::{StarknetClient, StarknetClientHttp}; +use ark_storage::storage_manager::StorageManager; +use dotenv::dotenv; +use managers::collection_manager::ContractType; +use managers::{BlockManager, CollectionManager, EventManager, TokenManager}; +use starknet::core::types::*; +use std::env; +use tokio::time::{self, Duration}; +use tracing::{span, Level}; +use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Registry}; + +pub async fn indexer_main_loop(storage: T) -> Result<()> { + dotenv().ok(); + + init_tracing(); + + let rpc_provider = env::var("RPC_PROVIDER").expect("RPC_PROVIDER must be set"); + let sn_client = StarknetClientHttp::new(&rpc_provider.clone())?; + let block_manager = BlockManager::new(&storage, &sn_client); + let mut event_manager = EventManager::new(&storage, &sn_client); + let mut token_manager = TokenManager::new(&storage, &sn_client); + let mut collection_manager = CollectionManager::new(&storage, &sn_client); + + let (from_block, to_block, poll_head_of_chain) = block_manager.get_block_range(); + + let mut current_u64 = sn_client.block_id_to_u64(&from_block).await?; + let mut to_u64 = sn_client.block_id_to_u64(&to_block).await?; + + loop { + log::trace!("Indexing block: {} {}", current_u64, to_u64); + + to_u64 = check_range(&sn_client, current_u64, to_u64, poll_head_of_chain).await; + if current_u64 > to_u64 { + continue; + } + + if !block_manager.check_candidate(current_u64) { + continue; + } + + let block_ts = sn_client.block_time(BlockId::Number(current_u64)).await?; + + let blocks_events = sn_client + .fetch_events( + BlockId::Number(current_u64), + BlockId::Number(current_u64), + event_manager.keys_selector(), + ) + .await?; + + for (_, events) in blocks_events { + for e in events { + let contract_address = e.from_address; + + let contract_info = + match collection_manager.identify_contract(contract_address).await { + Ok(info) => info, + Err(e) => { + log::error!("Can't identify contract {contract_address}: {:?}", e); + continue; + } + }; + + let contract_type = contract_info.r#type; + if contract_type == ContractType::Other { + continue; + } + + let token_event = match event_manager + .format_event(&e, contract_type, block_ts) + .await + { + Ok(te) => te, + Err(err) => { + log::error!("Can't format event {:?}\nevent: {:?}", err, e); + continue; + } + }; + + match token_manager.format_token(&token_event).await { + Ok(()) => (), + Err(err) => { + log::error!("Can't format token {:?}\ntevent: {:?}", err, token_event); + continue; + } + } + } + } + + current_u64 += 1; + } +} + +async fn check_range( + client: &StarknetClientHttp, + current: u64, + to: u64, + poll_head_of_chain: bool, +) -> u64 { + if current >= to { + if !poll_head_of_chain { + // TODO: can print some stats here if necessary. + log::info!("End of indexing block range"); + std::process::exit(0); + } + + // TODO: make this duration configurable (DELAY_HEAD_OF_CHAIN). + // But we are at HOC, so for now the block interval is 3 min. + // However, we want the block as soon as it's mined. + time::sleep(Duration::from_secs(1)).await; + + // Head of the chain requested -> check the last block and continue + // indexing loop. + return client + .block_number() + .await + .expect("Can't fetch last block number"); + } else { + return to; + } +} + +fn init_tracing() { + // Initialize the LogTracer to convert `log` records to `tracing` events + tracing_log::LogTracer::init().expect("Setting log tracer failed."); + + // Create the layers + let env_filter = EnvFilter::from_default_env(); + let fmt_layer = fmt::layer(); + + // Combine layers and set as global default + let subscriber = Registry::default().with(env_filter).with(fmt_layer); + + tracing::subscriber::set_global_default(subscriber) + .expect("Setting default subscriber failed."); + + let main_span = span!(Level::TRACE, "main"); + let _main_guard = main_span.enter(); +} diff --git a/crates/ark-core/src/managers/block_manager.rs b/crates/ark-core/src/managers/block_manager.rs new file mode 100644 index 000000000..f4f3dc4e5 --- /dev/null +++ b/crates/ark-core/src/managers/block_manager.rs @@ -0,0 +1,96 @@ +use ark_starknet::client::StarknetClient; +use ark_storage::storage_manager::StorageManager; +use starknet::core::types::*; + +use std::env; + +#[derive(Debug)] +pub struct BlockManager<'a, T: StorageManager, C: StarknetClient> { + storage: &'a T, + client: &'a C, + indexer_version: u64, +} + +// TODO: this struct must come from Storage crate. +#[derive(Debug, PartialEq)] +pub enum BlockIndexingStatus { + None, + Processing, + Terminated, +} + +// TODO: this struct must come from Storage crate. +pub struct BlockInfo { + pub indexer_version: u64, + pub status: BlockIndexingStatus, +} + +impl<'a, T: StorageManager, C: StarknetClient> BlockManager<'a, T, C> { + pub fn new(storage: &'a T, client: &'a C) -> Self { + let v: &u64 = &env::var("INDEXER_VERSION") + .expect("INDEXER_VERSION env var is missing") + .parse() + .expect("INDEXER_VERSION env var is invalid"); + + Self { + storage, + client, + indexer_version: *v, + } + } + + /// Returns the block range to be fetched during this run. + pub fn get_block_range(&self) -> (BlockId, BlockId, bool) { + let (from_block, to_block) = self + .client + .parse_block_range( + &env::var("START_BLOCK").expect("START_BLOCK env variable is missing"), + &env::var("END_BLOCK").unwrap_or("latest".to_string()), + ) + .expect("Can't parse block range from env"); + + let is_head_of_chain = to_block == BlockId::Tag(BlockTag::Latest); + log::debug!( + "Indexing range: {:?} {:?} (head of chain: {})", + from_block, + to_block, + is_head_of_chain + ); + + (from_block, to_block, is_head_of_chain) + } + + /// Returns true if the given block number must be indexed. + /// False otherwise. + pub fn check_candidate(&self, block_number: u64) -> bool { + let do_force: &bool = &env::var("FORCE_MODE") + .unwrap_or("false".to_string()) + .parse() + .unwrap_or(false); + + if *do_force { + log::debug!("Block #{} forced", block_number); + // TODO: self.storage.clean_block(block_number); + return true; + } + + // TODO: self.storage.get_block_info(...); + let info = BlockInfo { + indexer_version: 0, + status: BlockIndexingStatus::None, + }; + + if info.status == BlockIndexingStatus::None { + return true; + } + + if info.indexer_version > self.indexer_version { + log::debug!("Block #{} new version", block_number); + // TODO: self.storage.clean_block(block_number); + return true; + } + + log::debug!("Block #{} not candidate", block_number); + false + } +} diff --git a/crates/ark-core/src/managers/collection_manager.rs b/crates/ark-core/src/managers/collection_manager.rs new file mode 100644 index 000000000..570a7992e --- /dev/null +++ b/crates/ark-core/src/managers/collection_manager.rs @@ -0,0 +1,177 @@ +use anyhow::{anyhow, Result}; +use ark_starknet::client::StarknetClient; +use ark_storage::storage_manager::StorageManager; + +use serde::{Deserialize, Serialize}; +use starknet::core::types::{BlockId, BlockTag, FieldElement}; +use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; + +use std::collections::HashMap; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum ContractType { + Other, + ERC721, + ERC1155, +} + +impl ToString for ContractType { + fn to_string(&self) -> String { + match self { + ContractType::Other => "other".to_string(), + ContractType::ERC721 => "erc721".to_string(), + ContractType::ERC1155 => "erc1155".to_string(), + } + } +} + +// TODO: this struct must come from Storage crate. +#[derive(Debug, Clone)] +pub struct ContractInfo { + pub name: String, + pub symbol: String, + pub r#type: ContractType, +} + +pub struct CollectionManager<'a, T: StorageManager, C: StarknetClient> { + storage: &'a T, + client: &'a C, + /// A cache with contract address mapped to it's type. + cache: HashMap, +} + +impl<'a, T: StorageManager, C: StarknetClient> CollectionManager<'a, T, C> { + /// Initializes a new instance. + pub fn new(storage: &'a T, client: &'a C) -> Self { + Self { + storage, + client, + cache: HashMap::new(), + } + } + + /// Gets the contract info from local cache, or fetch is from the DB. + fn get_cached_or_fetch_info(&mut self, address: FieldElement) -> Result { + match self.cache.get(&address) { + Some(info) => Ok(info.clone()), + None => { + log::trace!("Cache miss for contract {address}"); + // TODO: self.storage.get_contract_info(); + // If no info available -> return error. + // For now, return error to simulate it's not available. + Err(anyhow!("Info not found in storage for contract {address}")) + } + } + } + + /// Identifies a contract from it's address only. + pub async fn identify_contract(&mut self, address: FieldElement) -> Result { + // The cache is more efficient that formatting to check the BLACKLIST. + match self.get_cached_or_fetch_info(address) { + Ok(info) => Ok(info), + Err(_) => { + // Can't find info, try to identify with calls. + let contract_type = self.get_contract_type(address).await?; + + log::info!( + "New contract identified [{:#064x}] : {}", + address, + contract_type.to_string() + ); + + let info = ContractInfo { + name: String::new(), + symbol: String::new(), + r#type: contract_type, + }; + + self.cache.insert(address, info.clone()); + // TODO: self.storage.register_contract_info(...); + + Ok(info) + } + } + } + + pub async fn get_contract_type(&self, contract_address: FieldElement) -> Result { + let block = BlockId::Tag(BlockTag::Latest); + let token_uri_cairo_0 = self + .get_contract_property_string( + contract_address, + "tokenURI", + vec![FieldElement::ONE, FieldElement::ZERO], + block, + ) + .await + .unwrap_or("undefined".to_string()); + + let token_uri = self + .get_contract_property_string( + contract_address, + "token_uri", + vec![FieldElement::ONE, FieldElement::ZERO], + block, + ) + .await + .unwrap_or("undefined".to_string()); + + let uri_result = self + .get_contract_property_string(contract_address, "uri", vec![], block) + .await + .unwrap_or("undefined".to_string()); + + if (token_uri_cairo_0 != "undefined" && !token_uri_cairo_0.is_empty()) + || (token_uri != "undefined" && !token_uri.is_empty()) + { + Ok(ContractType::ERC721) + } else if uri_result != "undefined" { + Ok(ContractType::ERC1155) + } else { + Ok(ContractType::Other) + } + } + + pub async fn get_contract_property_string( + &self, + contract_address: FieldElement, + selector_name: &str, + calldata: Vec, + block: BlockId, + ) -> Result { + let response = self + .client + .call_contract( + contract_address, + get_selector_from_name(selector_name)?, + calldata, + block, + ) + .await?; + + decode_string_array(&response) + } +} + +pub fn decode_string_array(string_array: &Vec) -> Result { + match string_array.len() { + 0 => Ok("".to_string()), + 1 => Ok(parse_cairo_short_string(&string_array[0])?), + 2 => Ok(format!( + "{}{}", + parse_cairo_short_string(&string_array[0])?, + parse_cairo_short_string(&string_array[1])?, + )), + _ => { + // The first element is the length of the string, + // we can skip it as it's implicitely given by the vector itself. + let mut result = String::new(); + + for s in &string_array[1..] { + result.push_str(&parse_cairo_short_string(s)?); + } + + Ok(result) + } + } +} diff --git a/crates/ark-core/src/managers/event_manager.rs b/crates/ark-core/src/managers/event_manager.rs new file mode 100644 index 000000000..467ef8c66 --- /dev/null +++ b/crates/ark-core/src/managers/event_manager.rs @@ -0,0 +1,116 @@ +use crate::ContractType; +use anyhow::{anyhow, Result}; +use ark_starknet::client::StarknetClient; +use ark_storage::storage_manager::StorageManager; +use ark_storage::types::{EventType, TokenEvent, TokenId}; +use log::info; +use starknet::core::types::{EmittedEvent, FieldElement}; +use starknet::macros::selector; + +const TRANSFER_SELECTOR: FieldElement = selector!("Transfer"); + +#[derive(Debug)] +pub struct EventManager<'a, T: StorageManager, C: StarknetClient> { + storage: &'a T, + client: &'a C, + token_event: TokenEvent, +} + +impl<'a, T: StorageManager, C: StarknetClient> EventManager<'a, T, C> { + /// Initializes a new instance. + pub fn new(storage: &'a T, client: &'a C) -> Self { + EventManager { + storage, + client, + token_event: TokenEvent::default(), + } + } + + /// Returns the selectors used to filter events. + pub fn keys_selector(&self) -> Option>> { + return Some(vec![vec![TRANSFER_SELECTOR]]); + } + + /// Formats a token event based on the event content. + /// Returns the token_id if the event were identified, + /// an Err otherwise. + pub async fn format_event( + &mut self, + event: &EmittedEvent, + contract_type: ContractType, + timestamp: u64, + ) -> Result { + self.reset_event(); + + // As cairo didn't have keys before, we first check if the data + // contains the info. If not, we check into the keys, skipping the first + // element which is the selector. + let event_info = if let Some(d_info) = Self::get_event_info_from_felts(&event.data) { + d_info + } else if let Some(k_info) = Self::get_event_info_from_felts(&event.keys[1..]) { + k_info + } else { + log::warn!("Can't find event data into this event"); + return Err(anyhow!("Can't format event")); + }; + + let (from, to, token_id) = event_info; + + // TODO: why do we need this entry 2 times for the felt and the string? + self.token_event.from_address_field_element = from; + self.token_event.from_address = format!("{:#064x}", from); + self.token_event.to_address_field_element = to; + self.token_event.to_address = format!("{:#064x}", to); + self.token_event.contract_address = format!("{:#064x}", event.from_address); + self.token_event.transaction_hash = format!("{:#064x}", event.transaction_hash); + self.token_event.token_id = token_id.clone(); + self.token_event.formated_token_id = self.token_event.token_id.format(); + self.token_event.block_number = event.block_number; + self.token_event.timestamp = timestamp; + self.token_event.contract_type = contract_type.to_string(); + self.token_event.event_type = Self::get_event_type(from, to); + + info!("Event identified: {:?}", self.token_event.event_type); + + // TODO: self.storage.register_event(self.token_event); + // TODO: check depending on event type if it's a create/update etc...? + + Ok(self.token_event.clone()) + } + + pub fn get_event_type(from: FieldElement, to: FieldElement) -> EventType { + if from == FieldElement::ZERO { + EventType::Mint + } else if to == FieldElement::ZERO { + EventType::Burn + } else { + EventType::Transfer + } + } + + /// Returns the event info from vector of felts. + /// Event info are (from, to, token_id). + /// + /// This methods considers that the info of the + /// event is starting at index 0 of the input vector. + fn get_event_info_from_felts( + felts: &[FieldElement], + ) -> Option<(FieldElement, FieldElement, TokenId)> { + if felts.len() < 4 { + return None; + } + let from = felts[0]; + let to = felts[1]; + + let token_id = TokenId { + low: felts[2], + high: felts[3], + }; + + Some((from, to, token_id)) + } + + fn reset_event(&mut self) { + self.token_event = TokenEvent::default(); + } +} diff --git a/crates/ark-core/src/managers/mod.rs b/crates/ark-core/src/managers/mod.rs new file mode 100644 index 000000000..3628a293c --- /dev/null +++ b/crates/ark-core/src/managers/mod.rs @@ -0,0 +1,11 @@ +pub mod collection_manager; +pub use collection_manager::CollectionManager; + +pub mod event_manager; +pub use event_manager::EventManager; + +pub mod token_manager; +pub use token_manager::TokenManager; + +pub mod block_manager; +pub use block_manager::BlockManager; diff --git a/crates/ark-core/src/managers/token_manager.rs b/crates/ark-core/src/managers/token_manager.rs new file mode 100644 index 000000000..b9fe3f4bf --- /dev/null +++ b/crates/ark-core/src/managers/token_manager.rs @@ -0,0 +1,108 @@ +use anyhow::{anyhow, Result}; +use ark_starknet::client::StarknetClient; +use ark_storage::storage_manager::StorageManager; +use ark_storage::types::{EventType, TokenEvent, TokenFromEvent}; +use starknet::core::types::*; +use starknet::macros::selector; + +#[derive(Debug)] +pub struct TokenManager<'a, T: StorageManager, C: StarknetClient> { + storage: &'a T, + client: &'a C, + // TODO: Same as event manager, we should use the stack instead. + // check with @kwiss. + token: TokenFromEvent, +} + +impl<'a, T: StorageManager, C: StarknetClient> TokenManager<'a, T, C> { + /// Initializes a new instance. + pub fn new(storage: &'a T, client: &'a C) -> Self { + Self { + storage, + client, + token: TokenFromEvent::default(), + } + } + + /// Formats a token registry from the token event data. + pub async fn format_token(&mut self, event: &TokenEvent) -> Result<()> { + self.reset_token(); + + self.token.address = event.contract_address.clone(); + self.token.padded_token_id = event.padded_token_id.clone(); + self.token.from_address = event.from_address.clone(); + self.token.to_address = event.to_address.clone(); + self.token.timestamp = event.timestamp.clone(); + self.token.mint_transaction_hash = if event.event_type == EventType::Mint { + Some(event.transaction_hash.clone()) + } else { + None + }; + self.token.block_number_minted = if event.event_type == EventType::Mint { + Some(event.block_number) + } else { + None + }; + + // TODO: @kwiss, do we want a default value in case we can't get the token owner? + // or do we want to return an error and abort before saving in the storage? + let token_owner = self + .get_token_owner( + FieldElement::from_hex_be(&event.contract_address) + .expect("Contract address bad format"), + FieldElement::from(event.token_id.low), + FieldElement::from(event.token_id.high), + ) + .await?[0]; + + self.token.owner = format!("{:#064x}", token_owner); + + log::trace!( + "Registering token: {} {}", + event.token_id.format().token_id, + event.contract_address + ); + + // TODO: self.storage.register_token(self.token.clone()).await?; + + Ok(()) + } + + /// + pub fn reset_token(&mut self) { + self.token = TokenFromEvent::default(); + } + + /// Retrieves the token owner for the last block. + pub async fn get_token_owner( + &self, + contract_address: FieldElement, + token_id_low: FieldElement, + token_id_high: FieldElement, + ) -> Result> { + let block = BlockId::Tag(BlockTag::Latest); + + match self + .client + .call_contract( + contract_address, + selector!("owner_of"), + vec![token_id_low, token_id_high], + block, + ) + .await + { + Ok(res) => Ok(res), + Err(_) => self + .client + .call_contract( + contract_address, + selector!("ownerOf"), + vec![token_id_low, token_id_high], + block, + ) + .await + .map_err(|_| anyhow!("Failed to get token owner from chain")), + } + } +} diff --git a/crates/ark-db/src/token/update.rs b/crates/ark-db/src/token/update.rs index 19e69e501..68ae6cf18 100644 --- a/crates/ark-db/src/token/update.rs +++ b/crates/ark-db/src/token/update.rs @@ -94,7 +94,9 @@ pub async fn update_token_listing( .table_name(token_table) .key("address", AttributeValue::S(collection_address.clone())) .key("token_id", AttributeValue::S(padded_token_id)) - .update_expression("SET listing_price = :price, listing_status = :status, order_hash = :order_hash") + .update_expression( + "SET listing_price = :price, listing_status = :status, order_hash = :order_hash", + ) .expression_attribute_values(":status", AttributeValue::S(status)) .expression_attribute_values(":price", AttributeValue::S(price)) .expression_attribute_values(":order_hash", AttributeValue::S(order_hash)); diff --git a/crates/ark-starknet/Cargo.toml b/crates/ark-starknet/Cargo.toml index 26d44f6bc..93eea986c 100644 --- a/crates/ark-starknet/Cargo.toml +++ b/crates/ark-starknet/Cargo.toml @@ -15,4 +15,5 @@ hex = "0.4.3" num-bigint = { version = "0.4.3", default-features = false } url = "2.3.1" regex = "1.9.1" -serde = "1.0" \ No newline at end of file +serde = "1.0" +async-trait.workspace = true diff --git a/crates/ark-starknet/src/client.rs b/crates/ark-starknet/src/client.rs index e6d476403..816c8ebd1 100644 --- a/crates/ark-starknet/src/client.rs +++ b/crates/ark-starknet/src/client.rs @@ -1,245 +1,183 @@ -use super::utils::{get_contract_property_string, get_selector_as_string}; use anyhow::{anyhow, Result}; -use log::info; -use reqwest::Client as ReqwestClient; -use serde_json::{json, Value}; -use starknet::core::types::FieldElement; +use async_trait::async_trait; +use regex::Regex; +use starknet::{ + core::{types::FieldElement, types::*}, + providers::{jsonrpc::HttpTransport, AnyProvider, JsonRpcClient, Provider}, +}; use std::collections::HashMap; -use std::env; -use std::time::Instant; - -pub async fn fetch_block( - client: &ReqwestClient, - block_number: u64, -) -> Result> { - let rpc_provider = env::var("RPC_PROVIDER").expect("RPC_PROVIDER must be set"); - let payload = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "starknet_getEvents", - "params": { - "filter": { - "from_block": { - "block_number": block_number - }, - "to_block": { - "block_number": block_number - }, - "chunk_size": 1000, - } - } - }); - - let start_time = Instant::now(); - let resp = client.post(rpc_provider).json(&payload).send().await?; - let block: HashMap = resp.json().await?; - - let elapsed_time = start_time.elapsed(); - let elapsed_time_ms = elapsed_time.as_millis(); - info!( - "RPC starknet_getEvents response time: {} ms", - elapsed_time_ms - ); - - Ok(block) +use url::Url; + +#[async_trait] +pub trait StarknetClient { + /// + fn new(rpc_url: &str) -> Result; + + /// + async fn block_id_to_u64(&self, id: &BlockId) -> Result; + + /// + fn parse_block_range(&self, from: &str, to: &str) -> Result<(BlockId, BlockId)>; + + /// + fn parse_block_id(&self, id: &str) -> Result; + + /// + async fn block_time(&self, block: BlockId) -> Result; + + /// + async fn block_number(&self) -> Result; + + /// On Starknet, a chunk size limits the maximum number of events + /// that can be retrieved with one call. + /// To ensure all events are fetched, we must ensure all events pages + /// are correctly fechted. + /// + /// TODO: for now this version is ok, but it can be RAM consuming + /// as the events are accumulated before this function returns. + /// We can think of an other version that returns each page, and let + /// the caller process the pages. + async fn fetch_events( + &self, + from_block: BlockId, + to_block: BlockId, + keys: Option>>, + ) -> Result>>; + + /// Call a contract trying all the given selectors. + /// All selector must accept the same arguments. + async fn call_contract( + &self, + contract_address: FieldElement, + selector: FieldElement, + calldata: Vec, + block: BlockId, + ) -> Result>; } -pub async fn get_block_with_txs(client: &ReqwestClient, block_number: u64) -> Result { - let rpc_provider = env::var("RPC_PROVIDER").expect("RPC_PROVIDER must be set"); - let payload = json!({ - "id": 1, - "jsonrpc": "2.0", - "method": "starknet_getBlockWithTxs", - "params": { - "block_id": { - "block_number": block_number - } - } - }); - - let start_time = Instant::now(); - let response = client.post(rpc_provider).json(&payload).send().await?; - let result: Value = response.json().await?; - - let elapsed_time = start_time.elapsed(); - let elapsed_time_ms = elapsed_time.as_millis(); - info!( - "RPC starknet_getEvents response time: {} ms", - elapsed_time_ms - ); - - Ok(result.get("result").cloned().unwrap_or(Value::Null)) +#[derive(Debug)] +pub struct StarknetClientHttp { + provider: AnyProvider, } -pub async fn call_contract( - client: &ReqwestClient, - contract_address: &str, - selector_name: &str, - calldata: Vec, - block_number: u64, -) -> Result { - let rpc_provider = env::var("RPC_PROVIDER").expect("RPC_PROVIDER must be set"); - let selector_string = selector_name.to_string(); - let selector = get_selector_as_string(&selector_string); - - let payload = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "starknet_call", - "params": { - "request": { - "contract_address": contract_address, - "entry_point_selector": format!("0x{}", selector), - "calldata": calldata, - "signature": [] - }, - "block_id": { - "block_number": block_number - } - } - }); +#[async_trait] +impl StarknetClient for StarknetClientHttp { + /// + fn new(rpc_url: &str) -> Result { + let rpc_url = Url::parse(rpc_url)?; + let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new(rpc_url))); - info!("RPC Payload: {:?} - Selector: {:?}", payload, selector_name); + Ok(StarknetClientHttp { provider }) + } - let start_time = Instant::now(); - let response = client.post(rpc_provider).json(&payload).send().await?; - let result: Value = response.json().await?; + /// + async fn block_id_to_u64(&self, id: &BlockId) -> Result { + match id { + BlockId::Tag(BlockTag::Latest) => Ok(self.provider.block_number().await?), + BlockId::Number(n) => Ok(*n), + _ => Err(anyhow!("BlockID can´t be converted to u64")), + } + } - info!("RPC Result: {:?}", result); + /// + fn parse_block_range(&self, from: &str, to: &str) -> Result<(BlockId, BlockId)> { + let from_block = self.parse_block_id(from)?; + let to_block = self.parse_block_id(to)?; - let elapsed_time = start_time.elapsed(); - let elapsed_time_ms = elapsed_time.as_millis(); - info!( - "RPC starknet_getEvents response time: {} ms", - elapsed_time_ms - ); + Ok((from_block, to_block)) + } - if let Some(error) = result.get("error") { - let error_code = error["code"].as_u64().unwrap_or(0); - let error_message = error["message"].as_str().unwrap_or(""); - if error_code == 21 && error_message == "Invalid message selector" { - return Err(anyhow!("Invalid message selector")); + /// + fn parse_block_id(&self, id: &str) -> Result { + let regex_block_number = Regex::new("^[0-9]{1,}$").unwrap(); + + if id == "latest" { + Ok(BlockId::Tag(BlockTag::Latest)) + } else if id == "pending" { + Ok(BlockId::Tag(BlockTag::Pending)) + } else if regex_block_number.is_match(id) { + Ok(BlockId::Number(id.parse::()?)) + } else { + Ok(BlockId::Hash(FieldElement::from_hex_be(id)?)) } } - Ok(result.get("result").cloned().unwrap_or(Value::Null)) -} + /// + async fn block_time(&self, block: BlockId) -> Result { + let block = self.provider.get_block_with_tx_hashes(block).await?; + let timestamp = match block { + MaybePendingBlockWithTxHashes::Block(block) => block.timestamp, + MaybePendingBlockWithTxHashes::PendingBlock(block) => block.timestamp, + }; -pub async fn get_latest_block(client: &ReqwestClient) -> Result { - let rpc_provider = env::var("RPC_PROVIDER").expect("RPC_PROVIDER must be set"); - let payload: Value = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "starknet_blockNumber", - "params": {} - }); - - let start_time = Instant::now(); - let response = client.post(rpc_provider).json(&payload).send().await?; - let result: Value = response.json().await?; - - let elapsed_time = start_time.elapsed(); - let elapsed_time_ms = elapsed_time.as_millis(); - info!( - "RPC starknet_getEvents response time: {} ms", - elapsed_time_ms - ); - - let block_number = result - .get("result") - .and_then(Value::as_u64) - .ok_or(anyhow!("Failed to parse block number"))?; - Ok(block_number) -} - -pub async fn get_contract_type( - client: &ReqwestClient, - contract_address: &str, - block_number: u64, -) -> String { - let token_uri_cairo_0 = get_contract_property_string( - client, - contract_address, - "tokenURI", - vec!["0".to_string(), "0".to_string()], - block_number, - ) - .await; - - let token_uri = get_contract_property_string( - client, - contract_address, - "token_uri", - vec!["0".to_string(), "0".to_string()], - block_number, - ) - .await; - - // Get uri - let uri_result: String = - get_contract_property_string(client, contract_address, "uri", [].to_vec(), block_number) - .await; - - // Init contract type - let mut contract_type = "unknown".to_string(); - if (token_uri_cairo_0 != "undefined" && !token_uri_cairo_0.is_empty()) - || (token_uri != "undefined" && !token_uri.is_empty()) - { - contract_type = "erc721".to_string() - } else if uri_result != "undefined" { - contract_type = "erc1155".to_string() + Ok(timestamp) } - contract_type -} + /// + async fn block_number(&self) -> Result { + Ok(self.provider.block_number().await?) + } -pub async fn get_token_owner( - client: &ReqwestClient, - token_id_low: FieldElement, - token_id_high: FieldElement, - contract_address: &str, - block_number: u64, -) -> String { - let token_id_low_hex = format!("{:x}", token_id_low); - let token_id_high_hex = format!("{:x}", token_id_high); - - match call_contract( - client, - contract_address, - "ownerOf", - vec![token_id_low_hex.clone(), token_id_high_hex.clone()], - block_number, - ) - .await - { - Ok(result) => { - if let Some(token_owner) = result.get(0) { - token_owner.to_string().replace('\"', "") - } else { - "".to_string() + /// + async fn fetch_events( + &self, + from_block: BlockId, + to_block: BlockId, + keys: Option>>, + ) -> Result>> { + let mut events: HashMap> = HashMap::new(); + + let filter = EventFilter { + from_block: Some(from_block), + to_block: Some(to_block), + address: None, + keys, + }; + + let chunk_size = 1000; + let mut continuation_token: Option = None; + + loop { + let event_page = self + .provider + .get_events(filter.clone(), continuation_token, chunk_size) + .await?; + + event_page.events.iter().for_each(|e| { + events + .entry(e.block_number) + .and_modify(|v| v.push(e.clone())) + .or_insert(vec![e.clone()]); + }); + + continuation_token = event_page.continuation_token; + + if continuation_token.is_none() { + break; } } - Err(_error) => { - match call_contract( - client, - contract_address, - "owner_of", - vec![token_id_low_hex, token_id_high_hex], - block_number, + + Ok(events) + } + + /// + async fn call_contract( + &self, + contract_address: FieldElement, + selector: FieldElement, + calldata: Vec, + block: BlockId, + ) -> Result> { + Ok(self + .provider + .call( + FunctionCall { + contract_address, + entry_point_selector: selector, + calldata, + }, + block, ) - .await - { - Ok(result) => { - info!("owner_of result: {:?}", result); - - if let Some(token_owner) = result.get(0) { - token_owner.to_string().replace('\"', "") - } else { - "".to_string() - } - } - Err(_error) => "".to_string(), - } - } + .await?) } } diff --git a/crates/ark-starknet/src/client2.rs b/crates/ark-starknet/src/client2.rs deleted file mode 100644 index 8b712c98f..000000000 --- a/crates/ark-starknet/src/client2.rs +++ /dev/null @@ -1,133 +0,0 @@ -use anyhow::{anyhow, Result}; -use regex::Regex; -use starknet::{ - core::{types::FieldElement, types::*}, - providers::{jsonrpc::HttpTransport, AnyProvider, JsonRpcClient, Provider}, -}; -use std::collections::HashMap; -use url::Url; - -pub struct StarknetClient { - provider: AnyProvider, -} - -impl StarknetClient { - /// - pub fn new(rpc_url: &str) -> Result { - let rpc_url = Url::parse(rpc_url)?; - let provider = AnyProvider::JsonRpcHttp(JsonRpcClient::new(HttpTransport::new(rpc_url))); - - Ok(StarknetClient { provider }) - } - - /// - pub async fn block_id_to_u64(&self, id: &BlockId) -> Result { - match id { - BlockId::Tag(BlockTag::Latest) => Ok(self.provider.block_number().await?), - BlockId::Number(n) => Ok(*n), - _ => Err(anyhow!("BlockID can´t be converted to u64")), - } - } - - /// - pub fn parse_block_id(&self, id: &str) -> Result { - let regex_block_number = Regex::new("^[0-9]{1,}$").unwrap(); - - if id == "latest" { - Ok(BlockId::Tag(BlockTag::Latest)) - } else if id == "pending" { - Ok(BlockId::Tag(BlockTag::Pending)) - } else if regex_block_number.is_match(id) { - Ok(BlockId::Number(id.parse::()?)) - } else { - Ok(BlockId::Hash(FieldElement::from_hex_be(id)?)) - } - } - - /// - pub async fn block_time(&self, block: BlockId) -> Result { - let block = self.provider.get_block_with_tx_hashes(block).await?; - let timestamp = match block { - MaybePendingBlockWithTxHashes::Block(block) => block.timestamp, - MaybePendingBlockWithTxHashes::PendingBlock(block) => block.timestamp, - }; - - Ok(timestamp) - } - - /// - pub async fn block_number(&self) -> Result { - Ok(self.provider.block_number().await?) - } - - /// On Starknet, a chunk size limits the maximum number of events - /// that can be retrieved with one call. - /// To ensure all events are fetched, we must ensure all events pages - /// are correctly fechted. - /// - /// TODO: for now this version is ok, but it can be RAM consuming - /// as the events are accumulated before this function returns. - /// We can think of an other version that returns each page, and let - /// the caller process the pages. - pub async fn fetch_events( - &self, - from_block: BlockId, - to_block: BlockId, - keys: Option>>, - ) -> Result>> { - let mut events: HashMap> = HashMap::new(); - - let filter = EventFilter { - from_block: Some(from_block), - to_block: Some(to_block), - address: None, - keys, - }; - - let chunk_size = 1000; - let mut continuation_token: Option = None; - - loop { - let event_page = self - .provider - .get_events(filter.clone(), continuation_token, chunk_size) - .await?; - - event_page.events.iter().for_each(|e| { - events - .entry(e.block_number) - .and_modify(|v| v.push(e.clone())) - .or_insert(vec![e.clone()]); - }); - - continuation_token = event_page.continuation_token; - - if continuation_token.is_none() { - break; - } - } - - Ok(events) - } - - /// - pub async fn call_contract( - &self, - contract_address: FieldElement, - selector: FieldElement, - calldata: Vec, - block: BlockId, - ) -> Result> { - Ok(self - .provider - .call( - FunctionCall { - contract_address, - entry_point_selector: selector, - calldata, - }, - block, - ) - .await?) - } -} diff --git a/crates/ark-starknet/src/lib.rs b/crates/ark-starknet/src/lib.rs index 2b33a6b95..b9babe5bc 100644 --- a/crates/ark-starknet/src/lib.rs +++ b/crates/ark-starknet/src/lib.rs @@ -1,3 +1 @@ pub mod client; -pub mod client2; -pub mod utils; diff --git a/crates/ark-starknet/src/utils.rs b/crates/ark-starknet/src/utils.rs deleted file mode 100644 index a9b2671b0..000000000 --- a/crates/ark-starknet/src/utils.rs +++ /dev/null @@ -1,134 +0,0 @@ -use log::{error, info}; -use reqwest::Client as ReqwestClient; -use serde_json::Value; -use starknet::core::utils::get_selector_from_name; -use starknet::core::{types::FieldElement, utils::parse_cairo_short_string}; -use std::error::Error; - -use super::client::call_contract; - -fn convert_felt_array_to_string(value1: &str, value2: &str) -> String { - // Decode short string with 2 felts - - let felt1: FieldElement = FieldElement::from_hex_be(value1).unwrap(); - info!("Felt1: {:?}", felt1); - let short_string1 = parse_cairo_short_string(&felt1).unwrap(); - info!("Short string1: {:?}", short_string1); - - let felt2: FieldElement = FieldElement::from_hex_be(value2).unwrap(); - info!("Felt2: {:?}", felt2); - let short_string2 = parse_cairo_short_string(&felt2).unwrap(); - info!("Short string2: {:?}", short_string2); - - short_string1 + &short_string2 -} - -pub fn decode_string_array(string_array: &Vec) -> String { - info!("String array: {:?}", string_array); - - match string_array.len() { - 0 => "".to_string(), - 1 => { - let felt: FieldElement = FieldElement::from_hex_be(&string_array[0]).unwrap(); - parse_cairo_short_string(&felt).unwrap() - } - 2 => { - let value1 = &string_array[0]; - let value2 = &string_array[1]; - info!("Values: {:?} - {:?}", value1, value2); - convert_felt_array_to_string(value1, value2) - } - 3 => convert_felt_array_to_string(&string_array[1], &string_array[2]), - _ => { - if let Some((_array_size, new_string_array)) = string_array.split_first() { - info!("New string array: {:?}", new_string_array); - let new_string_array: Vec = new_string_array.to_vec(); - let long_string = decode_long_string(&new_string_array).unwrap(); - info!("Long string: {}", long_string); - long_string - } else { - panic!("String array is empty!"); - } - } - } -} - -pub async fn get_contract_property_string( - client: &ReqwestClient, - contract_address: &str, - selector_name: &str, - calldata: Vec, - block_number: u64, -) -> String { - info!("Getting contract property: {:?}", selector_name); - - match call_contract( - client, - contract_address, - selector_name, - calldata, - block_number, - ) - .await - { - Ok(property) => match &property { - Value::String(string) => string.to_string(), - Value::Null => "undefined".to_string(), - Value::Array(array) => { - info!("Array: {:?}", array); - - let string_array: Vec = array - .clone() - .into_iter() - .map(|v| v.as_str().unwrap().to_string()) - .collect(); - - info!("String array: {:?}", string_array); - - let long_string = decode_string_array(&string_array); - info!("Long string: {}", long_string); - long_string - } - _ => "undefined".to_string(), - }, - Err(_) => "undefined".to_string(), - } -} - -pub fn decode_long_string(array: &Vec) -> Result> { - let mut result = String::new(); - for hex_str in array { - let hex_str_without_prefix = hex_str.strip_prefix("0x").unwrap_or(hex_str); - - // Prepend a zero if the length is odd - let hex_str_fixed_length = if hex_str_without_prefix.len() % 2 != 0 { - format!("0{}", hex_str_without_prefix) - } else { - hex_str_without_prefix.to_string() - }; - - info!("Hex string: {}", hex_str_fixed_length); - - let bytes = hex::decode(hex_str_fixed_length)?; - match String::from_utf8(bytes) { - Ok(str) => { - if !str.is_empty() { - info!("result: {}", result); - result.push_str(&str); - } - } - Err(err) => { - error!("UTF-8 parsing error: {:?}", err); - } - } - } - - info!("result: {}", result); - Ok(result) -} - -pub fn get_selector_as_string(selector: &str) -> String { - let selector_field = get_selector_from_name(selector).unwrap(); - let bytes = selector_field.to_bytes_be(); - hex::encode(bytes) -} diff --git a/crates/ark-storage/src/lib.rs b/crates/ark-storage/src/lib.rs index ba9424be9..baef0e41f 100644 --- a/crates/ark-storage/src/lib.rs +++ b/crates/ark-storage/src/lib.rs @@ -1,3 +1,3 @@ pub mod storage_manager; pub mod types; -pub mod utils; \ No newline at end of file +pub mod utils; diff --git a/crates/ark-storage/src/utils.rs b/crates/ark-storage/src/utils.rs index 2cf8d6ec8..954b6ea1f 100644 --- a/crates/ark-storage/src/utils.rs +++ b/crates/ark-storage/src/utils.rs @@ -1,3 +1,3 @@ pub fn format_token_id(token_id: String) -> String { - format!("{:0>width$}", token_id, width = 78) -} \ No newline at end of file + format!("{:0>width$}", token_id, width = 78) +} diff --git a/src/block.rs b/src/block.rs deleted file mode 100644 index 54b4a9ab7..000000000 --- a/src/block.rs +++ /dev/null @@ -1,194 +0,0 @@ -use crate::contract::identify_contract_types_from_transfers; -use crate::managers::{ - collection_manager::CollectionManager, event_manager::EventManager, token_manager::TokenManager, -}; -use anyhow::Result; -use ark_db::indexer::get::{get_block, get_indexer_sk}; -use ark_db::indexer::update::{update_block, update_indexer}; -use ark_storage::storage_manager::StorageManager; -use aws_sdk_dynamodb::Client as DynamoClient; -use chrono::Utc; -use log::{error, info}; -use reqwest::Client as ReqwestClient; -use starknet::core::types::{BlockId, EmittedEvent}; -use starknet::core::utils::get_selector_from_name; -use std::collections::HashMap; -use std::env; -use std::time::{Duration, Instant}; -use tokio::time::sleep; -use tracing::{span, Level}; - -// Helper function to determine the destination block number -async fn get_destination_block_number( - collection_manager: &CollectionManager, -) -> Result { - match env::var("END_BLOCK") { - Ok(val) => Ok(val.parse::().unwrap()), - Err(_) => collection_manager - .client - .block_number() - .await - .map_err(|_| env::VarError::NotPresent), - } -} - -// Helper function to fetch and filter only transfer events from a block -async fn get_transfer_events( - collection_manager: &CollectionManager, - block_number: u64, -) -> Result>> { - let span = span!(Level::TRACE, "get_transfer_events"); - let _enter = span.enter(); - - let events = collection_manager - .client - .fetch_events( - BlockId::Number(block_number), - BlockId::Number(block_number), - None, - ) - .await?; - - if let Some(block_events) = events.get(&block_number) { - Ok(Some( - block_events - .clone() - .into_iter() - .filter(|e| { - e.keys.get(0).map_or(false, |key| { - *key == get_selector_from_name("Transfer").unwrap() - }) - }) - .collect(), - )) - } else { - Ok(None) - } -} - -pub async fn process_blocks_continuously<'a, T: StorageManager>( - collection_manager: &CollectionManager, - reqwest_client: &ReqwestClient, - dynamo_client: &DynamoClient, - ecs_task_id: &str, - is_continous: bool, - token_manager: &mut TokenManager<'a, T>, - event_manager: &mut EventManager<'a, T>, -) -> Result<()> { - let starting_block = env::var("START_BLOCK") - .expect("START_BLOCK must be set") - .parse::() - .unwrap(); - - let mut current_block_number: u64 = starting_block; - let mut contract_cache: HashMap> = HashMap::new(); - - let indexer_sk = match get_indexer_sk(dynamo_client, ecs_task_id).await { - Ok(indexer_sk) => indexer_sk, - Err(_) => { - let now = Utc::now(); - let unix_timestamp = now.timestamp(); - format!("TASK#{}#{}", unix_timestamp, ecs_task_id) - } - }; - - loop { - // Start a span for the current block - let span = span!(Level::TRACE, "Block loop ", block = current_block_number); - let _enter = span.enter(); - - let execution_time = Instant::now(); - let dest_block_number = get_destination_block_number(collection_manager).await?; - let indexation_progress = (current_block_number as f64 / dest_block_number as f64) * 100.0; - - info!( - "Dest block: {}, Current block: {}, Indexing progress: {:.2}%", - dest_block_number, current_block_number, indexation_progress - ); - - if !is_continous { - update_indexer( - dynamo_client, - ecs_task_id, - indexer_sk.as_str(), - "running".to_string(), - starting_block, - dest_block_number, - indexation_progress as u64, - ) - .await?; - } - - // If the current block number is less than or equal to the destination block number - if current_block_number <= dest_block_number { - let is_block_fetched = get_block(dynamo_client, current_block_number).await?; - - // Skip already fetched blocks - if is_block_fetched { - info!("Current block {} is already fetched", current_block_number); - current_block_number += 1; - continue; - } - - if let Some(events_only) = - get_transfer_events(collection_manager, current_block_number).await? - { - info!( - "{:?} events to process for block {:?}", - events_only.len(), - current_block_number - ); - - match identify_contract_types_from_transfers( - collection_manager, - reqwest_client, - &events_only, - dynamo_client, - current_block_number, - &mut contract_cache, - token_manager, - event_manager, - ) - .await - { - Ok(_) => { - update_block(dynamo_client, ecs_task_id, current_block_number).await?; - info!( - "Indexing time: {}ms (block {})", - execution_time.elapsed().as_millis(), - current_block_number - ); - current_block_number += 1; - } - Err(_err) => { - error!("Error processing block: {:?}", current_block_number); - break; - } - } - } else { - info!("No event to process for block {:?}", current_block_number); - update_block(dynamo_client, ecs_task_id, current_block_number).await?; - current_block_number += 1; - } - } else if !is_continous { - break; - } else { - sleep(Duration::from_secs(1)).await; - } - } - - if !is_continous { - update_indexer( - dynamo_client, - ecs_task_id, - &indexer_sk, - String::from("stopped"), - starting_block, - current_block_number, - 100, - ) - .await?; - } - - Ok(()) -} diff --git a/src/constants.rs b/src/constants.rs deleted file mode 100644 index 7f742baf4..000000000 --- a/src/constants.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub const BLACKLIST: [&str; 5] = [ - "0x49d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7", - "0xda114221cb83fa859dbdb4c44beeaa0bb37c7537ad5ae66fe5e0efd20e6eb3", - "0x68f5c6a61780768455de69077e07e89787839bf8166decfbf92b645209c0fb8", - "0x53c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8", - "0x3fe2b97c1fd336e750087d68b9b867997fd64a2661ff3ca5a7c771641e8e7ac", -]; diff --git a/src/contract.rs b/src/contract.rs deleted file mode 100644 index 93f3a5511..000000000 --- a/src/contract.rs +++ /dev/null @@ -1,146 +0,0 @@ -use crate::constants::BLACKLIST; -use crate::managers::{ - collection_manager::CollectionManager, event_manager::EventManager, token_manager::TokenManager, -}; -use crate::transfer::process_transfer; -use ark_db::collection::create::create_collection; -use ark_db::contract::get::get_contract; -use ark_starknet::client::get_contract_type; -use ark_storage::storage_manager::StorageManager; -use aws_sdk_dynamodb::Client as DynamoClient; -use log::{debug, error, info}; -use reqwest::Client as ReqwestClient; -use starknet::core::types::EmittedEvent; -use std::collections::HashMap; -use std::env; -use std::error::Error; -use std::time::Instant; - -// Identifies contract types based on events from ABIs, checks for their presence in a Redis server, and if not found, calls contract methods to determine the type, stores this information back in Redis, and finally prints the contract type. -pub async fn identify_contract_types_from_transfers<'a, T: StorageManager>( - collection_manager: &CollectionManager, - client: &ReqwestClient, - events: &[EmittedEvent], - dynamo_client: &DynamoClient, - block_number: u64, - contract_cache: &mut HashMap>, - token_manager: &mut TokenManager<'a, T>, - event_manager: &mut EventManager<'a, T>, -) -> Result<(), Box> { - let start_time = Instant::now(); - - // Get dynamo table to work with - let collections_table = - env::var("ARK_COLLECTIONS_TABLE_NAME").expect("ARK_COLLECTIONS_TABLE_NAME must be set"); - - // Get block timestamp - let block_id = collection_manager - .client - .parse_block_id(block_number.to_string().as_str()) - .unwrap(); - let timestamp = collection_manager.client.block_time(block_id).await?; - - // // Iterate over events - for event in events { - let contract_address = format!("{:#064x}", &event.from_address); - - // Filter contract with most transactions from identification - if BLACKLIST.contains(&contract_address.as_str()) { - continue; - } - - // Check cache before making a call to get_contract - let contract_status = if let Some(cached_status) = contract_cache.get(&contract_address) { - cached_status.clone() - } else { - let result = get_contract(dynamo_client, &contract_address) - .await - .unwrap_or(None); - - // Cache the result - contract_cache.insert(contract_address.clone(), result.clone()); - result - }; - - if let Some(existing_contract_type) = contract_status { - if existing_contract_type == "unknown" { - continue; // If it's unknown, skip this iteration of the loop - } else if existing_contract_type == "erc721" || existing_contract_type == "erc1155" { - match process_transfer( - &event, - existing_contract_type.as_str(), - timestamp, - token_manager, - event_manager, - ) - .await - { - Ok(_) => {} - Err(e) => { - error!("Failed to process transfer: {:?}", e); - continue; - } - } - continue; - } - } - - info!("CONTRACT NOT IDENTIFIED: {:?}", contract_address); - let contract_type = get_contract_type(client, &contract_address, block_number).await; - debug!("contract_type: {:?}", contract_type); - - match create_collection( - dynamo_client, - &collections_table, - &contract_address, - &contract_type, - ) - .await - { - Ok(success) => { - info!( - "[Success] New collection item added successfully.\n\ - - Item Details: {:?}\n\ - - Table: {}", - success, &collections_table - ); - if contract_type != "unknown" { - match process_transfer( - &event, - contract_type.as_str(), - timestamp, - token_manager, - event_manager, - ) - .await - { - Ok(_) => {} - Err(e) => { - error!("Failed to process transfer: {:?}", e); - continue; - } - } - continue; - } - } - Err(e) => { - error!( - "[Error] Failed to add a new item to the collection.\n\ - - Error Details: {:?}\n\ - - Target Table: {}", - e, &collections_table - ); - } - } - } - - let elapsed_time = start_time.elapsed(); - info!( - "Event loop took {}.{:03} seconds for block {}", - elapsed_time.as_secs(), - elapsed_time.subsec_millis(), - block_number - ); - - Ok(()) -} diff --git a/src/main.rs b/src/main.rs index 54e2208ca..fb91a6fe1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,82 +1,19 @@ -mod block; -mod constants; -mod contract; -mod managers; -mod transfer; -mod utils; +mod storage; use anyhow::Result; -use ark_starknet::client2::StarknetClient; -use ark_storage::storage_manager::DefaultStorage; -use aws_config::meta::region::RegionProviderChain; -use aws_sdk_dynamodb::Client as DynamoClient; -use block::process_blocks_continuously; -use dotenv::dotenv; -use log::info; -use managers::{ - collection_manager::CollectionManager, event_manager::EventManager, token_manager::TokenManager, -}; -use reqwest::Client as ReqwestClient; -use std::env; -use tracing::{span, Level}; -use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Registry}; -use utils::get_ecs_task_id; +use ark_core; #[tokio::main] async fn main() -> Result<()> { - dotenv().ok(); + // Init AWS. - // Initialize the LogTracer to convert `log` records to `tracing` events - tracing_log::LogTracer::init().expect("Setting log tracer failed."); + // Init desired storage. + let storage = storage::init_default(); - // Create the layers - let env_filter = EnvFilter::from_default_env(); - let fmt_layer = fmt::layer(); + // TODO: add a monitor manager impl here, to be passed to the main loop. - // Combine layers and set as global default - let subscriber = Registry::default().with(env_filter).with(fmt_layer); - - tracing::subscriber::set_global_default(subscriber) - .expect("Setting default subscriber failed."); - - let main_span = span!(Level::TRACE, "main"); - let _main_guard = main_span.enter(); - - let rpc_provider = env::var("RPC_PROVIDER").expect("RPC_PROVIDER must be set"); - let sn_client = StarknetClient::new(&rpc_provider.clone())?; - let collection_manager = CollectionManager::new(sn_client); - - // let rpc_client = JsonRpcClient::new(HttpTransport::new( - // Url::parse(rpc_provider.as_str()).unwrap(), - // )); - - let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); - let config = aws_config::from_env().region(region_provider).load().await; - let dynamo_client = DynamoClient::new(&config); - let reqwest_client = ReqwestClient::new(); - let ecs_task_id = get_ecs_task_id(); - let is_continous = env::var("END_BLOCK").is_err(); - - let storage_manager = DefaultStorage::new(); - - let mut token_manager = TokenManager::new(&storage_manager); - let mut event_manager = EventManager::new(&storage_manager); - - info!( - "\n=== Indexing started ===\n\necs_task_id: {}\nis_continous: {}", - ecs_task_id, is_continous - ); - - process_blocks_continuously( - &collection_manager, - &reqwest_client, - &dynamo_client, - &ecs_task_id, - is_continous, - &mut token_manager, - &mut event_manager, - ) - .await?; + // Start the loop. + ark_core::indexer_main_loop(storage).await?; Ok(()) } diff --git a/src/managers/collection_manager.rs b/src/managers/collection_manager.rs deleted file mode 100644 index ab1b510fb..000000000 --- a/src/managers/collection_manager.rs +++ /dev/null @@ -1,166 +0,0 @@ -use anyhow::{anyhow, Result}; -use ark_starknet::client2::StarknetClient; -use log::info; -use serde::{Deserialize, Serialize}; -use starknet::core::types::{BlockId, BlockTag, FieldElement}; -use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum ContractType { - Unknown, - ERC721, - ERC1155, -} - -impl ToString for ContractType { - fn to_string(&self) -> String { - match self { - ContractType::Unknown => "unknown".to_string(), - ContractType::ERC721 => "erc721".to_string(), - ContractType::ERC1155 => "erc1155".to_string(), - } - } -} - -pub struct CollectionManager { - pub client: StarknetClient, -} - -impl CollectionManager { - pub fn new(client: StarknetClient) -> Self { - Self { client } - } - - pub async fn get_token_owner( - &self, - contract_address: FieldElement, - token_id_low: FieldElement, - token_id_high: FieldElement, - block_id: Option, - ) -> Result> { - let effective_block_id = block_id.unwrap_or(BlockId::Tag(BlockTag::Latest)); - - match self - .call_contract_helper( - contract_address, - "ownerOf", - token_id_low, - token_id_high, - effective_block_id, - ) - .await - { - Ok(res) => Ok(res), - Err(_) => self - .call_contract_helper( - contract_address, - "owner_of", - token_id_low, - token_id_high, - effective_block_id, - ) - .await - .map_err(|_| anyhow!("Failed to get token owner")), - } - } - - async fn call_contract_helper( - &self, - contract_address: FieldElement, - selector_name: &str, - token_id_low: FieldElement, - token_id_high: FieldElement, - block_id: BlockId, - ) -> Result> { - self.client - .call_contract( - contract_address, - get_selector_from_name(selector_name)?, - vec![token_id_low, token_id_high], - block_id, - ) - .await - } - - pub async fn get_contract_type( - &self, - contract_address: FieldElement, - block: BlockId, - ) -> Result { - let token_uri_cairo_0 = self - .get_contract_property_string( - contract_address, - "tokenURI", - vec![FieldElement::ONE, FieldElement::ZERO], - block, - ) - .await?; - let token_uri = self - .get_contract_property_string( - contract_address, - "token_uri", - vec![FieldElement::ONE, FieldElement::ZERO], - block, - ) - .await?; - let uri_result = self - .get_contract_property_string(contract_address, "uri", vec![], block) - .await?; - - if (token_uri_cairo_0 != "undefined" && !token_uri_cairo_0.is_empty()) - || (token_uri != "undefined" && !token_uri.is_empty()) - { - Ok(ContractType::ERC721) - } else if uri_result != "undefined" { - Ok(ContractType::ERC1155) - } else { - Ok(ContractType::Unknown) - } - } - - pub async fn get_contract_property_string( - &self, - contract_address: FieldElement, - selector_name: &str, - calldata: Vec, - block: BlockId, - ) -> Result { - info!("Getting contract property: {:?}", selector_name); - - let response = self - .client - .call_contract( - contract_address, - get_selector_from_name(selector_name)?, - calldata, - block, - ) - .await?; - - decode_string_array(&response) - } -} - -pub fn decode_string_array(string_array: &Vec) -> Result { - match string_array.len() { - 0 => Ok("".to_string()), - 1 => Ok(parse_cairo_short_string(&string_array[0])?), - 2 => Ok(format!( - "{}{}", - parse_cairo_short_string(&string_array[0])?, - parse_cairo_short_string(&string_array[1])?, - )), - _ => { - // The first element is the length of the string, - // we can skip it as it's implicitely given by the vector itself. - let mut result = String::new(); - - for s in &string_array[1..] { - result.push_str(&parse_cairo_short_string(s)?); - } - - Ok(result) - } - } -} diff --git a/src/managers/event_manager.rs b/src/managers/event_manager.rs deleted file mode 100644 index c24456dcb..000000000 --- a/src/managers/event_manager.rs +++ /dev/null @@ -1,74 +0,0 @@ -use anyhow::{anyhow, Result}; -use ark_storage::storage_manager::StorageManager; -use ark_storage::types::{EventType, TokenEvent, TokenId}; -use log::info; -use starknet::core::types::{EmittedEvent, FieldElement}; - -#[derive(Debug)] -pub struct EventManager<'a, T: StorageManager> { - storage: &'a T, - token_event: TokenEvent, -} - -impl<'a, T: StorageManager> EventManager<'a, T> { - pub fn new(storage: &'a T) -> Self { - EventManager { - storage, - token_event: TokenEvent::default(), - } - } - - pub fn reset_event(&mut self) { - self.token_event = TokenEvent::default(); - } - - pub fn format_event( - &mut self, - event: &EmittedEvent, - contract_type: &str, - timestamp: u64, - ) -> Result<()> { - if event.data.len() < 4 { - return Err(anyhow!("Invalid event data")); - } - - self.token_event.from_address_field_element = event.data[0]; - self.token_event.from_address = format!("{:#064x}", event.data[0]); - self.token_event.to_address_field_element = event.data[1]; - self.token_event.to_address = format!("{:#064x}", event.data[1]); - self.token_event.contract_address = format!("{:#064x}", event.from_address); - self.token_event.transaction_hash = format!("{:#064x}", event.transaction_hash); - self.token_event.token_id = TokenId { - low: event.data[2], - high: event.data[3], - }; - self.token_event.formated_token_id = self.token_event.token_id.format(); - self.token_event.block_number = event.block_number; - self.token_event.timestamp = timestamp; - self.token_event.contract_type = contract_type.to_string(); - self.token_event.event_type = self.get_event_type(); - Ok(()) - } - - pub fn get_event(&self) -> &TokenEvent { - &self.token_event - } - - pub fn get_event_type(&mut self) -> EventType { - if self.token_event.from_address_field_element == FieldElement::ZERO { - info!("EVENT MANAGER: Mint detected"); - EventType::Mint - } else if self.token_event.to_address_field_element == FieldElement::ZERO { - info!("EVENT MANAGER: Burn detected"); - EventType::Burn - } else { - info!("EVENT MANAGER: Transfer detected"); - EventType::Transfer - } - } - - pub fn create_event(&self) -> Result<()> { - self.storage.create_event(&self.token_event); - Ok(()) - } -} diff --git a/src/managers/mod.rs b/src/managers/mod.rs deleted file mode 100644 index 16431c061..000000000 --- a/src/managers/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod collection_manager; -pub mod event_manager; -pub mod token_manager; diff --git a/src/managers/token_manager.rs b/src/managers/token_manager.rs deleted file mode 100644 index a1373c84f..000000000 --- a/src/managers/token_manager.rs +++ /dev/null @@ -1,65 +0,0 @@ -use anyhow::Result; -use ark_storage::storage_manager::StorageManager; -use ark_storage::types::{EventType, TokenEvent, TokenFromEvent}; - -#[derive(Debug)] -pub struct TokenManager<'a, T: StorageManager> { - storage: &'a T, - token: TokenFromEvent, - // client: StarknetClient, -} - -impl<'a, T: StorageManager> TokenManager<'a, T> { - pub fn new( - storage: &'a T, - // client: StarknetClient - ) -> Self { - Self { - storage, - token: TokenFromEvent::default(), - // client - } - } - - pub fn format_token_from_event(&mut self, event: &TokenEvent) { - self.token.address = event.contract_address.clone(); - self.token.padded_token_id = event.padded_token_id.clone(); - self.token.from_address = event.from_address.clone(); - self.token.to_address = event.to_address.clone(); - self.token.timestamp = event.timestamp.clone(); - // TODO update owner here call get_token_owner - self.token.owner = event.to_address.clone(); - self.token.mint_transaction_hash = if event.event_type == EventType::Mint { - Some(event.transaction_hash.clone()) - } else { - None - }; - self.token.block_number_minted = if event.event_type == EventType::Mint { - Some(event.block_number) - } else { - None - }; - } - - pub fn create_token(&self) -> Result<()> { - self.storage.create_token(&self.token); - Ok(()) - } - - pub fn reset_token(&mut self) { - self.token = TokenFromEvent::default(); - } - - // pub fn get_token_owner(&self) -> TokenFromEvent { - // TODO call contract to get owner - // } - - // pub fn update_token_owner(&self) -> Result<()> { - // let token = self - // .storage - // .get_token(event.contract_address.clone(), event.token_id.clone())?; - // self.storage.update_token_owner(&token); - // Ok(()) - // } - -} diff --git a/src/storage.rs b/src/storage.rs new file mode 100644 index 000000000..13e1ebcae --- /dev/null +++ b/src/storage.rs @@ -0,0 +1,26 @@ +//! Storage initializers. + +use ark_storage::storage_manager::DefaultStorage; + +/// New Default storage manager. +pub fn init_default() -> DefaultStorage { + log::info!("Storage backend: default"); + DefaultStorage::new() +} + +/// +pub fn init_aws() { + // TODO: add AWS initialization and return AwsStorage. + + // let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); + // let config = aws_config::from_env().region(region_provider).load().await; + // let dynamo_client = DynamoClient::new(&config); + // let reqwest_client = ReqwestClient::new(); + // let ecs_task_id = get_ecs_task_id(); + // let is_continous = env::var("END_BLOCK").is_err(); + + // info!( + // "\n=== Indexing started ===\n\necs_task_id: {}\nis_continous: {}", + // ecs_task_id, is_continous + // ); +} diff --git a/src/transfer.rs b/src/transfer.rs deleted file mode 100644 index 07439b11f..000000000 --- a/src/transfer.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::managers::event_manager::EventManager; -use crate::managers::token_manager::TokenManager; -use anyhow::Result; -use ark_storage::storage_manager::StorageManager; -use ark_storage::types::EventType; -use starknet::core::types::EmittedEvent; - -pub async fn process_transfer<'a, T: StorageManager>( - event: &EmittedEvent, - contract_type: &str, - timestamp: u64, - token_manager: &mut TokenManager<'a, T>, - event_manager: &mut EventManager<'a, T>, -) -> Result<()> { - event_manager.format_event(event, contract_type, timestamp)?; - event_manager.create_event()?; - - let token_event = event_manager.get_event(); - token_manager.format_token_from_event(token_event); - - if token_event.event_type == EventType::Mint { - token_manager.create_token()?; - } else { - // TODO update token owner - // token_manager.update_token_owner()?; - } - - token_manager.reset_token(); - event_manager.reset_event(); - Ok(()) -} diff --git a/src/utils.rs b/src/utils.rs deleted file mode 100644 index 9e8d8f662..000000000 --- a/src/utils.rs +++ /dev/null @@ -1,49 +0,0 @@ -// use serde_json::Value; -// use starknet::core::utils::starknet_keccak; - -use std::env; - -use log::info; - -#[allow(dead_code)] -pub async fn upload_image_to_s3(_url: &str) -> Result> { - // let client = reqwest::Client::new(); - // let res = client.get(url).send().await?; - - // let extension = url.split('.').last().unwrap_or_default(); - // let key = format!("{}.{}", Uuid::new_v4(), extension); - - // let s3_client = S3Client::new(Region::UsEast1); - // let put_request = PutObjectRequest { - // bucket: "ark-nft-images".to_string(), - // key, - // body: Some(ByteStream::new(res.bytes_stream())), - // content_type: Some( - // res.headers() - // .get("content-type") - // .unwrap() - // .to_str() - // .unwrap() - // .into(), - // ), - // ..Default::default() - // }; - - // let s3_res = s3_client.put_object(put_request).await?; - // Ok(s3_res.key.unwrap()) - - Ok("".to_string()) -} - -pub fn get_ecs_task_id() -> String { - let container_metadata_uri = env::var("ECS_CONTAINER_METADATA_URI").unwrap_or("".to_string()); - - let pattern = regex::Regex::new(r"/v3/([a-f0-9]{32})-").unwrap(); - let task_id = pattern - .captures(container_metadata_uri.as_str()) - .and_then(|cap| cap.get(1).map(|m| m.as_str())) - .unwrap_or(""); - - info!("ECS task ID: {:?}", task_id); - task_id.to_string() -}