diff --git a/Cargo.lock b/Cargo.lock index 5ac472f05..f70e0461f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,6 +148,7 @@ dependencies = [ "anyhow", "ark-metadata", "aws-sdk-dynamodb", + "chrono", "dotenv", "log", "serde", @@ -217,6 +218,7 @@ dependencies = [ "log", "num-bigint", "openssl", + "regex", "reqwest", "serde", "serde_json", @@ -428,7 +430,7 @@ dependencies = [ "http", "hyper", "ring", - "time", + "time 0.3.28", "tokio", "tower", "tracing", @@ -601,7 +603,7 @@ dependencies = [ "percent-encoding", "regex", "sha2", - "time", + "time 0.3.28", "tracing", ] @@ -746,7 +748,7 @@ dependencies = [ "num-integer", "ryu", "serde", - "time", + "time 0.3.28", ] [[package]] @@ -893,7 +895,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "time", + "time 0.3.28", "uuid 1.4.1", ] @@ -957,8 +959,11 @@ checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "time 0.1.45", + "wasm-bindgen", "winapi", ] @@ -1531,7 +1536,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -2034,7 +2039,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.48.0", ] @@ -2895,7 +2900,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros 2.3.3", - "time", + "time 0.3.28", ] [[package]] @@ -2912,7 +2917,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros 3.3.0", - "time", + "time 0.3.28", ] [[package]] @@ -3000,7 +3005,7 @@ checksum = "2230cd5c29b815c9b699fb610b49a5ed65588f3509d9f0108be3a885da629333" dependencies = [ "colored", "log", - "time", + "time 0.3.28", "windows-sys 0.42.0", ] @@ -3345,6 +3350,17 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "time" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "time" version = "0.3.28" @@ -3753,6 +3769,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 267323b4a..dbe52ec25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ version = "0.1.0" edition = "2021" [dependencies] +regex = "1.5.4" anyhow = "1.0" tokio = { version = "1", features = ["full"] } reqwest = { version = "0.11", features = ["json"] } diff --git a/crates/ark-db/Cargo.toml b/crates/ark-db/Cargo.toml index b59a9c6ab..6f90623f1 100644 --- a/crates/ark-db/Cargo.toml +++ b/crates/ark-db/Cargo.toml @@ -4,9 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] +chrono = "0.4.19" anyhow = "1.0" serde = "1.0.164" log = "0.4" ark-metadata = { path = "../ark-metadata" } aws-sdk-dynamodb = "0.29.0" -dotenv = "0.15.0" \ No newline at end of file +dotenv = "0.15.0" diff --git a/crates/ark-db/src/block/create.rs b/crates/ark-db/src/block/create.rs deleted file mode 100644 index 6b19fce3e..000000000 --- a/crates/ark-db/src/block/create.rs +++ /dev/null @@ -1,38 +0,0 @@ -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::{Client, Error}; -use std::env; - -// This function adds a block number to the list of fetched blocks. -pub async fn create_block( - dynamo_client: &Client, - block_number: u64, - status: bool, -) -> Result<(), Error> { - let table = env::var("ARK_BLOCKS_TABLE_NAME").expect("ARK_BLOCKS_TABLE_NAME must be set"); - let block_number_av = AttributeValue::N(block_number.to_string()); - let is_fetched_av = AttributeValue::Bool(status); - let request = dynamo_client - .put_item() - .table_name(table) - .item("block_number", block_number_av) - .item("is_fetched", is_fetched_av); - - let result = request.send().await; - - match result { - Ok(_) => { - println!( - "Successfully added block number {} to the list of fetched blocks", - block_number - ); - } - Err(e) => { - println!( - "Error adding block number {} to the list of fetched blocks: {:?}", - block_number, e - ); - } - } - - Ok(()) -} diff --git a/crates/ark-db/src/block/get.rs b/crates/ark-db/src/block/get.rs deleted file mode 100644 index e6f597cfd..000000000 --- a/crates/ark-db/src/block/get.rs +++ /dev/null @@ -1,41 +0,0 @@ -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::{Client, Error}; -use log::info; -use std::env; - -pub async fn get_block(dynamo_client: &Client, block_number: u64) -> Result { - let table = env::var("ARK_BLOCKS_TABLE_NAME").expect("ARK_BLOCKS_TABLE_NAME must be set"); - - info!("var ARK_BLOCKS_TABLE_NAME: {:?}", table); - - let block_number_av = AttributeValue::N(block_number.to_string()); - let request = dynamo_client - .get_item() - .table_name(table) - .key("block_number", block_number_av); - - let result = request.send().await; - - match result { - Ok(value) => { - info!("get_block: {:?}", value.item); - - if let Some(item) = value.item { - if let Some(is_fetched) = item.get("is_fetched") { - match is_fetched.as_bool() { - Ok(is_fetched_bool) => return Ok(*is_fetched_bool), - Err(_) => return Ok(false), - } - } - } - } - Err(e) => { - println!( - "Error requesting block number {} to the list of fetched blocks: {:?}", - block_number, e - ); - } - } - - Ok(false) -} diff --git a/crates/ark-db/src/block/update.rs b/crates/ark-db/src/block/update.rs deleted file mode 100644 index 99dec5084..000000000 --- a/crates/ark-db/src/block/update.rs +++ /dev/null @@ -1,20 +0,0 @@ -use anyhow::Result; -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::Client; -use std::env; - -// This function adds a block number to the list of fetched blocks. -pub async fn update_block(dynamo_client: &Client, block_number: u64, status: bool) -> Result<()> { - let table = env::var("ARK_BLOCKS_TABLE_NAME").expect("ARK_BLOCKS_TABLE_NAME must be set"); - let block_number_av = AttributeValue::N(block_number.to_string()); - let is_fetched_av = AttributeValue::Bool(status); - let request = dynamo_client - .update_item() - .table_name(table) - .key("block_number", block_number_av) - .update_expression("SET is_fetched = :is_fetched") - .expression_attribute_values(":is_fetched", is_fetched_av); - - request.send().await?; - Ok(()) -} diff --git a/crates/ark-db/src/indexer/get.rs b/crates/ark-db/src/indexer/get.rs new file mode 100644 index 000000000..cfe152d2a --- /dev/null +++ b/crates/ark-db/src/indexer/get.rs @@ -0,0 +1,39 @@ +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::{Client, Error}; +use std::env; + +pub async fn get_block(dynamo_client: &Client, block_number: u64) -> Result { + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + let partition_key = format!("BLOCK#{}", block_number); + + let request = dynamo_client + .query() + .table_name(indexer_table_name) + .key_condition_expression("#PK = :PK") + .expression_attribute_names("#PK", "PK") + .expression_attribute_values(":PK", AttributeValue::S(partition_key)); + + match request.send().await { + Ok(value) => { + if let Some(items) = value.items() { + if let Some(item) = items.first() { + if let Some(is_fetched) = item.get("is_fetched") { + match is_fetched.as_bool() { + Ok(is_fetched_bool) => return Ok(*is_fetched_bool), + Err(_) => return Ok(false), + } + } + } + } + } + Err(e) => { + println!( + "Error requesting block number {} to the list of fetched blocks: {:?}", + block_number, e + ); + } + } + + Ok(false) +} diff --git a/crates/ark-db/src/block/mod.rs b/crates/ark-db/src/indexer/mod.rs similarity index 64% rename from crates/ark-db/src/block/mod.rs rename to crates/ark-db/src/indexer/mod.rs index 5a20acaf7..0e93baa52 100644 --- a/crates/ark-db/src/block/mod.rs +++ b/crates/ark-db/src/indexer/mod.rs @@ -1,3 +1,2 @@ -pub mod create; pub mod get; pub mod update; diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs new file mode 100644 index 000000000..e5aac0d6f --- /dev/null +++ b/crates/ark-db/src/indexer/update.rs @@ -0,0 +1,69 @@ +use std::env; + +use anyhow::Result; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::Client; +use chrono::Utc; + +pub async fn update_indexer( + dynamo_client: &Client, + task_id: &str, + status: String, + from: Option, + to: Option, + indexation_progress: Option, +) -> Result<()> { + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); + let now = Utc::now(); + let unix_timestamp = now.timestamp(); + + let mut request = dynamo_client + .put_item() + .table_name(indexer_table_name) + .item("PK", AttributeValue::S(String::from("INDEXER"))) + .item("SK", AttributeValue::S(format!("TASK#{}", task_id))) + .item("status", AttributeValue::S(status)) + .item("last_update", AttributeValue::N(unix_timestamp.to_string())) + .item("version", AttributeValue::S(indexer_version)) + .item("task_id", AttributeValue::S(task_id.to_string())); + + if let Some(value) = from { + request = request.item("from", AttributeValue::N(value.to_string())); + } + + if let Some(value) = to { + request = request.item("to", AttributeValue::N(value.to_string())); + } + + if let Some(value) = indexation_progress { + request = request.item("indexation_progress", AttributeValue::N(value.to_string())); + } + + request.send().await?; + + Ok(()) +} + +pub async fn update_block(dynamo_client: &Client, task_id: &str, block_number: u64) -> Result<()> { + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); + let now = Utc::now(); + let unix_timestamp = now.timestamp(); + + dynamo_client + .put_item() + .table_name(indexer_table_name) + .item("PK", AttributeValue::S(format!("BLOCK#{}", block_number))) + .item("SK", AttributeValue::S(format!("TASK#{}", task_id))) + .item("is_fetched", AttributeValue::Bool(true)) + .item("last_update", AttributeValue::N(unix_timestamp.to_string())) + .item("task_id", AttributeValue::S(task_id.to_string())) + .item("version", AttributeValue::S(indexer_version)) + .send() + .await?; + + Ok(()) +} diff --git a/crates/ark-db/src/lib.rs b/crates/ark-db/src/lib.rs index 6deb54067..2c68e03ef 100644 --- a/crates/ark-db/src/lib.rs +++ b/crates/ark-db/src/lib.rs @@ -1,6 +1,6 @@ -pub mod block; pub mod collection; pub mod contract; +pub mod indexer; pub mod owners; pub mod token; pub mod token_event; diff --git a/crates/ark-starknet/src/collection_manager.rs b/crates/ark-starknet/src/collection_manager.rs index ec919999e..76feeb160 100644 --- a/crates/ark-starknet/src/collection_manager.rs +++ b/crates/ark-starknet/src/collection_manager.rs @@ -61,7 +61,7 @@ impl CollectionManager { effective_block_id, ) .await - .or_else(|_| Err(anyhow!("Failed to get token owner"))), + .map_err(|_| anyhow!("Failed to get token owner")), } } diff --git a/src/core/block.rs b/src/core/block.rs index 391204958..c3ce8ac21 100644 --- a/src/core/block.rs +++ b/src/core/block.rs @@ -1,8 +1,7 @@ use super::contract::identify_contract_types_from_transfers; use anyhow::Result; -use ark_db::block::create::create_block; -use ark_db::block::get::get_block; -use ark_db::block::update::update_block; +use ark_db::indexer::get::get_block; +use ark_db::indexer::update::{update_block, update_indexer}; use ark_starknet::collection_manager::CollectionManager; use aws_sdk_dynamodb::Client as DynamoClient; use aws_sdk_kinesis::Client as KinesisClient; @@ -63,6 +62,8 @@ pub async fn process_blocks_continuously( reqwest_client: &ReqwestClient, dynamo_client: &DynamoClient, kinesis_client: &KinesisClient, + ecs_task_id: &str, + is_continous: bool, ) -> Result<()> { let starting_block = env::var("START_BLOCK") .expect("START_BLOCK must be set") @@ -75,14 +76,25 @@ pub async fn process_blocks_continuously( loop { let execution_time = Instant::now(); let dest_block_number = get_destination_block_number(collection_manager).await?; + let indexation_progress = (current_block_number as f64 / dest_block_number as f64) * 100.0; info!( "Dest block: {}, Current block: {}, Indexing progress: {:.2}%", - dest_block_number, - current_block_number, - (current_block_number as f64 / dest_block_number as f64) * 100.0 + dest_block_number, current_block_number, indexation_progress ); + if !is_continous { + update_indexer( + dynamo_client, + ecs_task_id, + String::from("running"), + Some(starting_block), + Some(dest_block_number), + Some(format!("{:.2}", indexation_progress)), + ) + .await?; + } + // If the current block number is less than or equal to the destination block number if current_block_number <= dest_block_number { let is_block_fetched = get_block(dynamo_client, current_block_number).await?; @@ -94,8 +106,6 @@ pub async fn process_blocks_continuously( continue; } - create_block(dynamo_client, current_block_number, false).await?; - if let Some(events_only) = get_transfer_events(collection_manager, current_block_number).await? { @@ -116,7 +126,7 @@ pub async fn process_blocks_continuously( .await { Ok(_) => { - update_block(dynamo_client, current_block_number, true).await?; + update_block(dynamo_client, ecs_task_id, current_block_number).await?; info!( "Indexing time: {}ms (block {})", execution_time.elapsed().as_millis(), @@ -126,20 +136,31 @@ pub async fn process_blocks_continuously( } Err(_err) => { error!("Error processing block: {:?}", current_block_number); + break; } } } else { info!("No event to process for block {:?}", current_block_number); - update_block(dynamo_client, current_block_number, true).await?; + update_block(dynamo_client, ecs_task_id, current_block_number).await?; current_block_number += 1; } + } else if !is_continous { + break; + } else { + sleep(Duration::from_secs(5)).await; } + } - // If END_BLOCK is set, exit the loop, otherwise wait for more blocks - match env::var("END_BLOCK") { - Ok(_) => break, - Err(_) => sleep(Duration::from_secs(5)).await, - } + if !is_continous { + update_indexer( + dynamo_client, + ecs_task_id, + String::from("stopped"), + None, + None, + None, + ) + .await?; } Ok(()) diff --git a/src/main.rs b/src/main.rs index a313740f9..61e3e90cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,6 @@ mod constants; mod core; mod utils; -use std::env; - use crate::core::block::process_blocks_continuously; use anyhow::Result; use ark_starknet::{client2::StarknetClient, collection_manager::CollectionManager}; @@ -13,6 +11,8 @@ use dotenv::dotenv; use reqwest::{Client as ReqwestClient, Url}; use simple_logger::SimpleLogger; use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient}; +use std::env; +use utils::get_ecs_task_id; #[tokio::main] async fn main() -> Result<()> { @@ -32,6 +32,8 @@ async fn main() -> Result<()> { let kinesis_client = KinesisClient::new(&config); let dynamo_client = DynamoClient::new(&config); let reqwest_client = ReqwestClient::new(); + let ecs_task_id = get_ecs_task_id(); + let is_continous = env::var("END_BLOCK").is_ok(); process_blocks_continuously( &collection_manager, @@ -39,6 +41,10 @@ async fn main() -> Result<()> { &reqwest_client, &dynamo_client, &kinesis_client, + &ecs_task_id, + is_continous, ) - .await + .await?; + + Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index c6789f348..9e8d8f662 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,10 @@ // use serde_json::Value; // use starknet::core::utils::starknet_keccak; +use std::env; + +use log::info; + #[allow(dead_code)] pub async fn upload_image_to_s3(_url: &str) -> Result> { // let client = reqwest::Client::new(); @@ -30,3 +34,16 @@ pub async fn upload_image_to_s3(_url: &str) -> Result String { + let container_metadata_uri = env::var("ECS_CONTAINER_METADATA_URI").unwrap_or("".to_string()); + + let pattern = regex::Regex::new(r"/v3/([a-f0-9]{32})-").unwrap(); + let task_id = pattern + .captures(container_metadata_uri.as_str()) + .and_then(|cap| cap.get(1).map(|m| m.as_str())) + .unwrap_or(""); + + info!("ECS task ID: {:?}", task_id); + task_id.to_string() +}