From 521ab1349526cc9abaceae51bd43f5b9ce7cfdae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 06:08:46 +0200 Subject: [PATCH 01/12] feat(core): insert into indexer table --- Cargo.lock | 39 ++++++++++++++++++++++++++++++--------- Cargo.toml | 1 + src/main.rs | 18 +++++++++++++++++- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ac472f05..d72b3330e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,6 +211,7 @@ dependencies = [ "aws-sdk-dynamodb", "aws-sdk-kinesis", "aws-types", + "chrono", "clap 4.4.1", "dotenv", "hex", @@ -428,7 +429,7 @@ dependencies = [ "http", "hyper", "ring", - "time", + "time 0.3.28", "tokio", "tower", "tracing", @@ -601,7 +602,7 @@ dependencies = [ "percent-encoding", "regex", "sha2", - "time", + "time 0.3.28", "tracing", ] @@ -746,7 +747,7 @@ dependencies = [ "num-integer", "ryu", "serde", - "time", + "time 0.3.28", ] [[package]] @@ -893,7 +894,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "time", + "time 0.3.28", "uuid 1.4.1", ] @@ -957,8 +958,11 @@ checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "time 0.1.45", + "wasm-bindgen", "winapi", ] @@ -1531,7 +1535,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -2034,7 +2038,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.48.0", ] @@ -2895,7 +2899,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros 2.3.3", - "time", + "time 0.3.28", ] [[package]] @@ -2912,7 +2916,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros 3.3.0", - "time", + "time 0.3.28", ] [[package]] @@ -3000,7 +3004,7 @@ checksum = "2230cd5c29b815c9b699fb610b49a5ed65588f3509d9f0108be3a885da629333" dependencies = [ "colored", "log", - "time", + "time 0.3.28", "windows-sys 0.42.0", ] @@ -3345,6 +3349,17 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "time" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "time" version = "0.3.28" @@ -3753,6 +3768,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 267323b4a..ac7872f5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ version = "0.1.0" edition = "2021" [dependencies] +chrono = "0.4.19" anyhow = "1.0" tokio = { version = "1", features = ["full"] } reqwest = { version = "0.11", features = ["json"] } diff --git a/src/main.rs b/src/main.rs index a313740f9..c6d3ad304 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,9 @@ use crate::core::block::process_blocks_continuously; use anyhow::Result; use ark_starknet::{client2::StarknetClient, collection_manager::CollectionManager}; use aws_config::meta::region::RegionProviderChain; -use aws_sdk_dynamodb::Client as DynamoClient; +use aws_sdk_dynamodb::{types::AttributeValue, Client as DynamoClient}; use aws_sdk_kinesis::Client as KinesisClient; +use chrono::Utc; use dotenv::dotenv; use reqwest::{Client as ReqwestClient, Url}; use simple_logger::SimpleLogger; @@ -33,6 +34,21 @@ async fn main() -> Result<()> { let dynamo_client = DynamoClient::new(&config); let reqwest_client = ReqwestClient::new(); + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + + let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); + let now = Utc::now(); + let unix_timestamp = now.timestamp(); + + dynamo_client + .put_item() + .table_name(indexer_table_name) + .item("PK", AttributeValue::S(String::from("ECS_"))) + .item("SK", AttributeValue::S(unix_timestamp.to_string())) + .item("status", AttributeValue::S(String::from("running"))) + .item("version", AttributeValue::S(String::from(indexer_version))); + process_blocks_continuously( &collection_manager, &rpc_client, From 914f30a3650601557967056fb7af2c1f55b0c832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 06:33:00 +0200 Subject: [PATCH 02/12] feat(core): update indexer status --- Cargo.lock | 3 ++- Cargo.toml | 2 +- crates/ark-db/Cargo.toml | 3 ++- crates/ark-db/src/indexer/create.rs | 26 +++++++++++++++++++++++ crates/ark-db/src/indexer/mod.rs | 1 + crates/ark-db/src/lib.rs | 1 + src/main.rs | 32 ++++++++++++++--------------- 7 files changed, 48 insertions(+), 20 deletions(-) create mode 100644 crates/ark-db/src/indexer/create.rs create mode 100644 crates/ark-db/src/indexer/mod.rs diff --git a/Cargo.lock b/Cargo.lock index d72b3330e..f70e0461f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,6 +148,7 @@ dependencies = [ "anyhow", "ark-metadata", "aws-sdk-dynamodb", + "chrono", "dotenv", "log", "serde", @@ -211,13 +212,13 @@ dependencies = [ "aws-sdk-dynamodb", "aws-sdk-kinesis", "aws-types", - "chrono", "clap 4.4.1", "dotenv", "hex", "log", "num-bigint", "openssl", + "regex", "reqwest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index ac7872f5a..dbe52ec25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ version = "0.1.0" edition = "2021" [dependencies] -chrono = "0.4.19" +regex = "1.5.4" anyhow = "1.0" tokio = { version = "1", features = ["full"] } reqwest = { version = "0.11", features = ["json"] } diff --git a/crates/ark-db/Cargo.toml b/crates/ark-db/Cargo.toml index b59a9c6ab..6f90623f1 100644 --- a/crates/ark-db/Cargo.toml +++ b/crates/ark-db/Cargo.toml @@ -4,9 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] +chrono = "0.4.19" anyhow = "1.0" serde = "1.0.164" log = "0.4" ark-metadata = { path = "../ark-metadata" } aws-sdk-dynamodb = "0.29.0" -dotenv = "0.15.0" \ No newline at end of file +dotenv = "0.15.0" diff --git a/crates/ark-db/src/indexer/create.rs b/crates/ark-db/src/indexer/create.rs new file mode 100644 index 000000000..27ca2f906 --- /dev/null +++ b/crates/ark-db/src/indexer/create.rs @@ -0,0 +1,26 @@ +use std::env; + +use anyhow::Result; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::Client; +use chrono::Utc; + +pub async fn create_indexer(dynamo_client: &Client, task_id: &str) -> Result<()> { + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); + let now = Utc::now(); + let unix_timestamp = now.timestamp(); + + dynamo_client + .put_item() + .table_name(indexer_table_name) + .item("PK", AttributeValue::S(format!("ECS_{}", task_id))) + .item("SK", AttributeValue::S(unix_timestamp.to_string())) + .item("status", AttributeValue::S(String::from("running"))) + .item("version", AttributeValue::S(String::from(indexer_version))) + .send() + .await?; + + Ok(()) +} diff --git a/crates/ark-db/src/indexer/mod.rs b/crates/ark-db/src/indexer/mod.rs new file mode 100644 index 000000000..c5fb369c1 --- /dev/null +++ b/crates/ark-db/src/indexer/mod.rs @@ -0,0 +1 @@ +pub mod create; diff --git a/crates/ark-db/src/lib.rs b/crates/ark-db/src/lib.rs index 6deb54067..61a853b2b 100644 --- a/crates/ark-db/src/lib.rs +++ b/crates/ark-db/src/lib.rs @@ -1,6 +1,7 @@ pub mod block; pub mod collection; pub mod contract; +pub mod indexer; pub mod owners; pub mod token; pub mod token_event; diff --git a/src/main.rs b/src/main.rs index c6d3ad304..37f73c9f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,25 @@ mod constants; mod core; mod utils; -use std::env; - use crate::core::block::process_blocks_continuously; use anyhow::Result; +use ark_db::indexer::create::create_indexer; use ark_starknet::{client2::StarknetClient, collection_manager::CollectionManager}; use aws_config::meta::region::RegionProviderChain; -use aws_sdk_dynamodb::{types::AttributeValue, Client as DynamoClient}; +use aws_sdk_dynamodb::Client as DynamoClient; use aws_sdk_kinesis::Client as KinesisClient; -use chrono::Utc; use dotenv::dotenv; use reqwest::{Client as ReqwestClient, Url}; use simple_logger::SimpleLogger; use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient}; +use std::env; + +fn extract_ecs_task_id(text: &str) -> Option<&str> { + let pattern = regex::Regex::new(r"/v3/([a-f0-9]{32})-").unwrap(); + pattern + .captures(text) + .and_then(|cap| cap.get(1).map(|m| m.as_str())) +} #[tokio::main] async fn main() -> Result<()> { @@ -34,20 +40,12 @@ async fn main() -> Result<()> { let dynamo_client = DynamoClient::new(&config); let reqwest_client = ReqwestClient::new(); - let indexer_table_name = - env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); - - let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); - let now = Utc::now(); - let unix_timestamp = now.timestamp(); + let container_metadata_uri = env::var("ECS_CONTAINER_METADATA_URI").unwrap_or("".to_string()); + let task_id = extract_ecs_task_id(container_metadata_uri.as_str()); - dynamo_client - .put_item() - .table_name(indexer_table_name) - .item("PK", AttributeValue::S(String::from("ECS_"))) - .item("SK", AttributeValue::S(unix_timestamp.to_string())) - .item("status", AttributeValue::S(String::from("running"))) - .item("version", AttributeValue::S(String::from(indexer_version))); + if task_id.is_some() { + create_indexer(&dynamo_client, task_id.unwrap()).await?; + } process_blocks_continuously( &collection_manager, From a3eaa457a1f5f7375863917b45d03e7b39ebd00c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 09:38:41 +0200 Subject: [PATCH 03/12] feat(core): indexer table --- crates/ark-db/src/block/create.rs | 38 --------------- crates/ark-db/src/block/get.rs | 41 ---------------- crates/ark-db/src/block/mod.rs | 3 -- crates/ark-db/src/block/update.rs | 20 -------- crates/ark-db/src/indexer/create.rs | 26 ----------- crates/ark-db/src/indexer/get.rs | 39 ++++++++++++++++ crates/ark-db/src/indexer/mod.rs | 3 +- crates/ark-db/src/indexer/update.rs | 72 +++++++++++++++++++++++++++++ crates/ark-db/src/lib.rs | 1 - src/core/block.rs | 53 +++++++++++++++------ src/main.rs | 24 ++++------ src/utils.rs | 17 +++++++ 12 files changed, 176 insertions(+), 161 deletions(-) delete mode 100644 crates/ark-db/src/block/create.rs delete mode 100644 crates/ark-db/src/block/get.rs delete mode 100644 crates/ark-db/src/block/mod.rs delete mode 100644 crates/ark-db/src/block/update.rs delete mode 100644 crates/ark-db/src/indexer/create.rs create mode 100644 crates/ark-db/src/indexer/get.rs create mode 100644 crates/ark-db/src/indexer/update.rs diff --git a/crates/ark-db/src/block/create.rs b/crates/ark-db/src/block/create.rs deleted file mode 100644 index 6b19fce3e..000000000 --- a/crates/ark-db/src/block/create.rs +++ /dev/null @@ -1,38 +0,0 @@ -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::{Client, Error}; -use std::env; - -// This function adds a block number to the list of fetched blocks. -pub async fn create_block( - dynamo_client: &Client, - block_number: u64, - status: bool, -) -> Result<(), Error> { - let table = env::var("ARK_BLOCKS_TABLE_NAME").expect("ARK_BLOCKS_TABLE_NAME must be set"); - let block_number_av = AttributeValue::N(block_number.to_string()); - let is_fetched_av = AttributeValue::Bool(status); - let request = dynamo_client - .put_item() - .table_name(table) - .item("block_number", block_number_av) - .item("is_fetched", is_fetched_av); - - let result = request.send().await; - - match result { - Ok(_) => { - println!( - "Successfully added block number {} to the list of fetched blocks", - block_number - ); - } - Err(e) => { - println!( - "Error adding block number {} to the list of fetched blocks: {:?}", - block_number, e - ); - } - } - - Ok(()) -} diff --git a/crates/ark-db/src/block/get.rs b/crates/ark-db/src/block/get.rs deleted file mode 100644 index e6f597cfd..000000000 --- a/crates/ark-db/src/block/get.rs +++ /dev/null @@ -1,41 +0,0 @@ -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::{Client, Error}; -use log::info; -use std::env; - -pub async fn get_block(dynamo_client: &Client, block_number: u64) -> Result { - let table = env::var("ARK_BLOCKS_TABLE_NAME").expect("ARK_BLOCKS_TABLE_NAME must be set"); - - info!("var ARK_BLOCKS_TABLE_NAME: {:?}", table); - - let block_number_av = AttributeValue::N(block_number.to_string()); - let request = dynamo_client - .get_item() - .table_name(table) - .key("block_number", block_number_av); - - let result = request.send().await; - - match result { - Ok(value) => { - info!("get_block: {:?}", value.item); - - if let Some(item) = value.item { - if let Some(is_fetched) = item.get("is_fetched") { - match is_fetched.as_bool() { - Ok(is_fetched_bool) => return Ok(*is_fetched_bool), - Err(_) => return Ok(false), - } - } - } - } - Err(e) => { - println!( - "Error requesting block number {} to the list of fetched blocks: {:?}", - block_number, e - ); - } - } - - Ok(false) -} diff --git a/crates/ark-db/src/block/mod.rs b/crates/ark-db/src/block/mod.rs deleted file mode 100644 index 5a20acaf7..000000000 --- a/crates/ark-db/src/block/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod create; -pub mod get; -pub mod update; diff --git a/crates/ark-db/src/block/update.rs b/crates/ark-db/src/block/update.rs deleted file mode 100644 index 99dec5084..000000000 --- a/crates/ark-db/src/block/update.rs +++ /dev/null @@ -1,20 +0,0 @@ -use anyhow::Result; -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::Client; -use std::env; - -// This function adds a block number to the list of fetched blocks. -pub async fn update_block(dynamo_client: &Client, block_number: u64, status: bool) -> Result<()> { - let table = env::var("ARK_BLOCKS_TABLE_NAME").expect("ARK_BLOCKS_TABLE_NAME must be set"); - let block_number_av = AttributeValue::N(block_number.to_string()); - let is_fetched_av = AttributeValue::Bool(status); - let request = dynamo_client - .update_item() - .table_name(table) - .key("block_number", block_number_av) - .update_expression("SET is_fetched = :is_fetched") - .expression_attribute_values(":is_fetched", is_fetched_av); - - request.send().await?; - Ok(()) -} diff --git a/crates/ark-db/src/indexer/create.rs b/crates/ark-db/src/indexer/create.rs deleted file mode 100644 index 27ca2f906..000000000 --- a/crates/ark-db/src/indexer/create.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::env; - -use anyhow::Result; -use aws_sdk_dynamodb::types::AttributeValue; -use aws_sdk_dynamodb::Client; -use chrono::Utc; - -pub async fn create_indexer(dynamo_client: &Client, task_id: &str) -> Result<()> { - let indexer_table_name = - env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); - let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); - let now = Utc::now(); - let unix_timestamp = now.timestamp(); - - dynamo_client - .put_item() - .table_name(indexer_table_name) - .item("PK", AttributeValue::S(format!("ECS_{}", task_id))) - .item("SK", AttributeValue::S(unix_timestamp.to_string())) - .item("status", AttributeValue::S(String::from("running"))) - .item("version", AttributeValue::S(String::from(indexer_version))) - .send() - .await?; - - Ok(()) -} diff --git a/crates/ark-db/src/indexer/get.rs b/crates/ark-db/src/indexer/get.rs new file mode 100644 index 000000000..67cdf9266 --- /dev/null +++ b/crates/ark-db/src/indexer/get.rs @@ -0,0 +1,39 @@ +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::{Client, Error}; +use std::env; + +pub async fn get_block(dynamo_client: &Client, block_number: u64) -> Result { + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + let partition_key = format!("Block_{}", block_number.to_string()); + + let request = dynamo_client + .query() + .table_name(indexer_table_name) + .key_condition_expression("#PK = :PK") + .expression_attribute_names("#PK", "PK") + .expression_attribute_values(":PK", AttributeValue::S(partition_key)); + + match request.send().await { + Ok(value) => { + if let Some(items) = value.items() { + if let Some(item) = items.first() { + if let Some(is_fetched) = item.get("is_fetched") { + match is_fetched.as_bool() { + Ok(is_fetched_bool) => return Ok(*is_fetched_bool), + Err(_) => return Ok(false), + } + } + } + } + } + Err(e) => { + println!( + "Error requesting block number {} to the list of fetched blocks: {:?}", + block_number, e + ); + } + } + + Ok(false) +} diff --git a/crates/ark-db/src/indexer/mod.rs b/crates/ark-db/src/indexer/mod.rs index c5fb369c1..0e93baa52 100644 --- a/crates/ark-db/src/indexer/mod.rs +++ b/crates/ark-db/src/indexer/mod.rs @@ -1 +1,2 @@ -pub mod create; +pub mod get; +pub mod update; diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs new file mode 100644 index 000000000..6b9a76111 --- /dev/null +++ b/crates/ark-db/src/indexer/update.rs @@ -0,0 +1,72 @@ +use std::env; + +use anyhow::Result; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_dynamodb::Client; +use chrono::Utc; + +pub async fn update_indexer( + dynamo_client: &Client, + task_id: &str, + status: String, + from: Option, + to: Option, + indexation_progress: Option, +) -> Result<()> { + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); + let now = Utc::now(); + let unix_timestamp = now.timestamp(); + + let mut request = dynamo_client + .put_item() + .table_name(indexer_table_name) + .item( + "PK", + AttributeValue::S(format!("Indexer_{}", task_id.to_string())), + ) + .item("SK", AttributeValue::S(String::from(indexer_version))) + .item("status", AttributeValue::S(status)) + .item("last_update", AttributeValue::N(unix_timestamp.to_string())); + + if from.is_some() { + request = request.item("from", AttributeValue::N(from.unwrap().to_string())); + } + + if to.is_some() { + request = request.item("to", AttributeValue::N(to.unwrap().to_string())); + } + + if indexation_progress.is_some() { + request = request.item( + "indexation_progress", + AttributeValue::N(indexation_progress.unwrap().to_string()), + ); + } + + request.send().await?; + + Ok(()) +} + +pub async fn update_block(dynamo_client: &Client, task_id: &str, block_number: u64) -> Result<()> { + let indexer_table_name = + env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); + let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined")); + let now = Utc::now(); + let unix_timestamp = now.timestamp(); + + dynamo_client + .put_item() + .table_name(indexer_table_name) + .item("PK", AttributeValue::S(format!("Block_{}", block_number))) + .item("SK", AttributeValue::S(indexer_version)) + .item("is_fetched", AttributeValue::Bool(true)) + .item("last_update", AttributeValue::N(unix_timestamp.to_string())) + .item("indexer", AttributeValue::S(task_id.to_string())) + .send() + .await?; + + Ok(()) +} diff --git a/crates/ark-db/src/lib.rs b/crates/ark-db/src/lib.rs index 61a853b2b..2c68e03ef 100644 --- a/crates/ark-db/src/lib.rs +++ b/crates/ark-db/src/lib.rs @@ -1,4 +1,3 @@ -pub mod block; pub mod collection; pub mod contract; pub mod indexer; diff --git a/src/core/block.rs b/src/core/block.rs index 391204958..c7a671ec7 100644 --- a/src/core/block.rs +++ b/src/core/block.rs @@ -1,8 +1,7 @@ use super::contract::identify_contract_types_from_transfers; use anyhow::Result; -use ark_db::block::create::create_block; -use ark_db::block::get::get_block; -use ark_db::block::update::update_block; +use ark_db::indexer::get::get_block; +use ark_db::indexer::update::{update_block, update_indexer}; use ark_starknet::collection_manager::CollectionManager; use aws_sdk_dynamodb::Client as DynamoClient; use aws_sdk_kinesis::Client as KinesisClient; @@ -63,6 +62,8 @@ pub async fn process_blocks_continuously( reqwest_client: &ReqwestClient, dynamo_client: &DynamoClient, kinesis_client: &KinesisClient, + ecs_task_id: &str, + is_continous: bool, ) -> Result<()> { let starting_block = env::var("START_BLOCK") .expect("START_BLOCK must be set") @@ -75,14 +76,25 @@ pub async fn process_blocks_continuously( loop { let execution_time = Instant::now(); let dest_block_number = get_destination_block_number(collection_manager).await?; + let indexation_progress = (current_block_number as f64 / dest_block_number as f64) * 100.0; info!( "Dest block: {}, Current block: {}, Indexing progress: {:.2}%", - dest_block_number, - current_block_number, - (current_block_number as f64 / dest_block_number as f64) * 100.0 + dest_block_number, current_block_number, indexation_progress ); + if !is_continous { + update_indexer( + &dynamo_client, + ecs_task_id, + String::from("running"), + Some(starting_block), + Some(dest_block_number), + Some(format!("{:.2}", indexation_progress)), + ) + .await?; + } + // If the current block number is less than or equal to the destination block number if current_block_number <= dest_block_number { let is_block_fetched = get_block(dynamo_client, current_block_number).await?; @@ -94,8 +106,6 @@ pub async fn process_blocks_continuously( continue; } - create_block(dynamo_client, current_block_number, false).await?; - if let Some(events_only) = get_transfer_events(collection_manager, current_block_number).await? { @@ -116,7 +126,7 @@ pub async fn process_blocks_continuously( .await { Ok(_) => { - update_block(dynamo_client, current_block_number, true).await?; + update_block(dynamo_client, ecs_task_id, current_block_number).await?; info!( "Indexing time: {}ms (block {})", execution_time.elapsed().as_millis(), @@ -126,20 +136,33 @@ pub async fn process_blocks_continuously( } Err(_err) => { error!("Error processing block: {:?}", current_block_number); + break; } } } else { info!("No event to process for block {:?}", current_block_number); - update_block(dynamo_client, current_block_number, true).await?; + update_block(dynamo_client, ecs_task_id, current_block_number).await?; current_block_number += 1; } + } else { + if !is_continous { + break; + } else { + sleep(Duration::from_secs(5)).await; + } } + } - // If END_BLOCK is set, exit the loop, otherwise wait for more blocks - match env::var("END_BLOCK") { - Ok(_) => break, - Err(_) => sleep(Duration::from_secs(5)).await, - } + if !is_continous { + update_indexer( + &dynamo_client, + &ecs_task_id, + String::from("stopped"), + None, + None, + None, + ) + .await?; } Ok(()) diff --git a/src/main.rs b/src/main.rs index 37f73c9f3..61e3e90cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,6 @@ mod core; mod utils; use crate::core::block::process_blocks_continuously; use anyhow::Result; -use ark_db::indexer::create::create_indexer; use ark_starknet::{client2::StarknetClient, collection_manager::CollectionManager}; use aws_config::meta::region::RegionProviderChain; use aws_sdk_dynamodb::Client as DynamoClient; @@ -13,13 +12,7 @@ use reqwest::{Client as ReqwestClient, Url}; use simple_logger::SimpleLogger; use starknet::providers::{jsonrpc::HttpTransport, JsonRpcClient}; use std::env; - -fn extract_ecs_task_id(text: &str) -> Option<&str> { - let pattern = regex::Regex::new(r"/v3/([a-f0-9]{32})-").unwrap(); - pattern - .captures(text) - .and_then(|cap| cap.get(1).map(|m| m.as_str())) -} +use utils::get_ecs_task_id; #[tokio::main] async fn main() -> Result<()> { @@ -39,13 +32,8 @@ async fn main() -> Result<()> { let kinesis_client = KinesisClient::new(&config); let dynamo_client = DynamoClient::new(&config); let reqwest_client = ReqwestClient::new(); - - let container_metadata_uri = env::var("ECS_CONTAINER_METADATA_URI").unwrap_or("".to_string()); - let task_id = extract_ecs_task_id(container_metadata_uri.as_str()); - - if task_id.is_some() { - create_indexer(&dynamo_client, task_id.unwrap()).await?; - } + let ecs_task_id = get_ecs_task_id(); + let is_continous = env::var("END_BLOCK").is_ok(); process_blocks_continuously( &collection_manager, @@ -53,6 +41,10 @@ async fn main() -> Result<()> { &reqwest_client, &dynamo_client, &kinesis_client, + &ecs_task_id, + is_continous, ) - .await + .await?; + + Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index c6789f348..01297c3f5 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,6 +1,10 @@ // use serde_json::Value; // use starknet::core::utils::starknet_keccak; +use std::env; + +use log::info; + #[allow(dead_code)] pub async fn upload_image_to_s3(_url: &str) -> Result> { // let client = reqwest::Client::new(); @@ -30,3 +34,16 @@ pub async fn upload_image_to_s3(_url: &str) -> Result String { + let container_metadata_uri = env::var("ECS_CONTAINER_METADATA_URI").unwrap_or("".to_string()); + + let pattern = regex::Regex::new(r"/v3/([a-f0-9]{32})-").unwrap(); + let task_id = pattern + .captures(container_metadata_uri.as_str()) + .and_then(|cap| cap.get(1).map(|m| m.as_str())) + .unwrap_or(&""); + + info!("ECS task ID: {:?}", task_id); + task_id.to_string() +} From 3ef267146feb4284ec1aa75883dcea7357b576f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 09:59:19 +0200 Subject: [PATCH 04/12] feat(core): update indexer --- crates/ark-db/src/indexer/update.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index 6b9a76111..d69b36346 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -22,13 +22,15 @@ pub async fn update_indexer( let mut request = dynamo_client .put_item() .table_name(indexer_table_name) + .item("PK", AttributeValue::S(String::from("Indexer"))) .item( - "PK", - AttributeValue::S(format!("Indexer_{}", task_id.to_string())), + "SK", + AttributeValue::S(format!("{}_{}", indexer_version, task_id.to_string())), ) - .item("SK", AttributeValue::S(String::from(indexer_version))) .item("status", AttributeValue::S(status)) - .item("last_update", AttributeValue::N(unix_timestamp.to_string())); + .item("last_update", AttributeValue::N(unix_timestamp.to_string())) + .item("version", AttributeValue::S(indexer_version)) + .item("indexer", AttributeValue::S(task_id.to_string())); if from.is_some() { request = request.item("from", AttributeValue::N(from.unwrap().to_string())); From 273e44e93bdaab79157b79136e6398036feb412a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 11:04:30 +0200 Subject: [PATCH 05/12] fix(cors): update indexer --- crates/ark-db/src/indexer/update.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index d69b36346..94ac2b32c 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -23,10 +23,7 @@ pub async fn update_indexer( .put_item() .table_name(indexer_table_name) .item("PK", AttributeValue::S(String::from("Indexer"))) - .item( - "SK", - AttributeValue::S(format!("{}_{}", indexer_version, task_id.to_string())), - ) + .item("SK", AttributeValue::S(task_id.to_string())) .item("status", AttributeValue::S(status)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) .item("version", AttributeValue::S(indexer_version)) @@ -63,10 +60,11 @@ pub async fn update_block(dynamo_client: &Client, task_id: &str, block_number: u .put_item() .table_name(indexer_table_name) .item("PK", AttributeValue::S(format!("Block_{}", block_number))) - .item("SK", AttributeValue::S(indexer_version)) + .item("SK", AttributeValue::S(task_id.to_string())) .item("is_fetched", AttributeValue::Bool(true)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) .item("indexer", AttributeValue::S(task_id.to_string())) + .item("version", AttributeValue::S(indexer_version)) .send() .await?; From 2377c2d523cf6194c862922bc5546115ae3b232b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 11:13:38 +0200 Subject: [PATCH 06/12] fix: clippy issues --- crates/ark-db/src/indexer/get.rs | 2 +- crates/ark-starknet/src/collection_manager.rs | 2 +- src/core/block.rs | 14 ++++++-------- src/utils.rs | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/crates/ark-db/src/indexer/get.rs b/crates/ark-db/src/indexer/get.rs index 67cdf9266..53df1d8a0 100644 --- a/crates/ark-db/src/indexer/get.rs +++ b/crates/ark-db/src/indexer/get.rs @@ -5,7 +5,7 @@ use std::env; pub async fn get_block(dynamo_client: &Client, block_number: u64) -> Result { let indexer_table_name = env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); - let partition_key = format!("Block_{}", block_number.to_string()); + let partition_key = format!("Block_{}", block_number); let request = dynamo_client .query() diff --git a/crates/ark-starknet/src/collection_manager.rs b/crates/ark-starknet/src/collection_manager.rs index ec919999e..76feeb160 100644 --- a/crates/ark-starknet/src/collection_manager.rs +++ b/crates/ark-starknet/src/collection_manager.rs @@ -61,7 +61,7 @@ impl CollectionManager { effective_block_id, ) .await - .or_else(|_| Err(anyhow!("Failed to get token owner"))), + .map_err(|_| anyhow!("Failed to get token owner")), } } diff --git a/src/core/block.rs b/src/core/block.rs index c7a671ec7..c3ce8ac21 100644 --- a/src/core/block.rs +++ b/src/core/block.rs @@ -85,7 +85,7 @@ pub async fn process_blocks_continuously( if !is_continous { update_indexer( - &dynamo_client, + dynamo_client, ecs_task_id, String::from("running"), Some(starting_block), @@ -144,19 +144,17 @@ pub async fn process_blocks_continuously( update_block(dynamo_client, ecs_task_id, current_block_number).await?; current_block_number += 1; } + } else if !is_continous { + break; } else { - if !is_continous { - break; - } else { - sleep(Duration::from_secs(5)).await; - } + sleep(Duration::from_secs(5)).await; } } if !is_continous { update_indexer( - &dynamo_client, - &ecs_task_id, + dynamo_client, + ecs_task_id, String::from("stopped"), None, None, diff --git a/src/utils.rs b/src/utils.rs index 01297c3f5..9e8d8f662 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -42,7 +42,7 @@ pub fn get_ecs_task_id() -> String { let task_id = pattern .captures(container_metadata_uri.as_str()) .and_then(|cap| cap.get(1).map(|m| m.as_str())) - .unwrap_or(&""); + .unwrap_or(""); info!("ECS task ID: {:?}", task_id); task_id.to_string() From 5d6970b83471e7db28fe94bfebd2513fe4cbdbcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 16:52:29 +0200 Subject: [PATCH 07/12] fix(ark-db): some issue --- crates/ark-db/src/indexer/update.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index 94ac2b32c..3b5f46a46 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -33,15 +33,12 @@ pub async fn update_indexer( request = request.item("from", AttributeValue::N(from.unwrap().to_string())); } - if to.is_some() { - request = request.item("to", AttributeValue::N(to.unwrap().to_string())); + if let Some(value) = to { + request = request.item("to", AttributeValue::N(value.to_string())); } - if indexation_progress.is_some() { - request = request.item( - "indexation_progress", - AttributeValue::N(indexation_progress.unwrap().to_string()), - ); + if let Some(value) = indexation_progress { + request = request.item("indexation_progress", AttributeValue::N(value.to_string())); } request.send().await?; From 9c6472817457d02ceab6d30334a0db936c88961f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 17:20:21 +0200 Subject: [PATCH 08/12] fix(ark-db): some issue --- crates/ark-db/src/indexer/update.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index 3b5f46a46..a237cd2c5 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -29,7 +29,7 @@ pub async fn update_indexer( .item("version", AttributeValue::S(indexer_version)) .item("indexer", AttributeValue::S(task_id.to_string())); - if from.is_some() { + if let Some(value) = from { request = request.item("from", AttributeValue::N(from.unwrap().to_string())); } From 2093a422f13153d9e01a86fb049dcf0e6aa052aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 17:27:14 +0200 Subject: [PATCH 09/12] fix(ark-db): some issue --- crates/ark-db/src/indexer/update.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index a237cd2c5..ac4075eed 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -30,7 +30,7 @@ pub async fn update_indexer( .item("indexer", AttributeValue::S(task_id.to_string())); if let Some(value) = from { - request = request.item("from", AttributeValue::N(from.unwrap().to_string())); + request = request.item("from", AttributeValue::N(value.to_string())); } if let Some(value) = to { From 6a4f77762f3d388dd3d056100d9d14662549eb65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 18:44:07 +0200 Subject: [PATCH 10/12] refactor(ark-db): indexer table --- crates/ark-db/src/indexer/get.rs | 2 +- crates/ark-db/src/indexer/update.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/ark-db/src/indexer/get.rs b/crates/ark-db/src/indexer/get.rs index 53df1d8a0..cfe152d2a 100644 --- a/crates/ark-db/src/indexer/get.rs +++ b/crates/ark-db/src/indexer/get.rs @@ -5,7 +5,7 @@ use std::env; pub async fn get_block(dynamo_client: &Client, block_number: u64) -> Result { let indexer_table_name = env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set"); - let partition_key = format!("Block_{}", block_number); + let partition_key = format!("BLOCK#{}", block_number); let request = dynamo_client .query() diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index ac4075eed..089b52af9 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -22,7 +22,7 @@ pub async fn update_indexer( let mut request = dynamo_client .put_item() .table_name(indexer_table_name) - .item("PK", AttributeValue::S(String::from("Indexer"))) + .item("PK", AttributeValue::S(String::from("INDEXER"))) .item("SK", AttributeValue::S(task_id.to_string())) .item("status", AttributeValue::S(status)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) @@ -56,7 +56,7 @@ pub async fn update_block(dynamo_client: &Client, task_id: &str, block_number: u dynamo_client .put_item() .table_name(indexer_table_name) - .item("PK", AttributeValue::S(format!("Block_{}", block_number))) + .item("PK", AttributeValue::S(format!("BLOCK#{}", block_number))) .item("SK", AttributeValue::S(task_id.to_string())) .item("is_fetched", AttributeValue::Bool(true)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) From 2db9d26be00179b041a2c159b3f3fe6c27fc8ed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Thu, 7 Sep 2023 18:48:50 +0200 Subject: [PATCH 11/12] refactor(ark-db): indexer table --- crates/ark-db/src/indexer/update.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index 089b52af9..c69805586 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -23,7 +23,7 @@ pub async fn update_indexer( .put_item() .table_name(indexer_table_name) .item("PK", AttributeValue::S(String::from("INDEXER"))) - .item("SK", AttributeValue::S(task_id.to_string())) + .item("SK", AttributeValue::S(format!("TASK#{}", task_id))) .item("status", AttributeValue::S(status)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) .item("version", AttributeValue::S(indexer_version)) @@ -57,7 +57,7 @@ pub async fn update_block(dynamo_client: &Client, task_id: &str, block_number: u .put_item() .table_name(indexer_table_name) .item("PK", AttributeValue::S(format!("BLOCK#{}", block_number))) - .item("SK", AttributeValue::S(task_id.to_string())) + .item("SK", AttributeValue::S(format!("TASK#{}", task_id))) .item("is_fetched", AttributeValue::Bool(true)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) .item("indexer", AttributeValue::S(task_id.to_string())) From 887983e90c2bc6225c326587a26a99139b659c5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Roycourt?= Date: Fri, 8 Sep 2023 12:48:49 +0200 Subject: [PATCH 12/12] refactor(ark-db): rename task_id --- crates/ark-db/src/indexer/update.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/ark-db/src/indexer/update.rs b/crates/ark-db/src/indexer/update.rs index c69805586..e5aac0d6f 100644 --- a/crates/ark-db/src/indexer/update.rs +++ b/crates/ark-db/src/indexer/update.rs @@ -27,7 +27,7 @@ pub async fn update_indexer( .item("status", AttributeValue::S(status)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) .item("version", AttributeValue::S(indexer_version)) - .item("indexer", AttributeValue::S(task_id.to_string())); + .item("task_id", AttributeValue::S(task_id.to_string())); if let Some(value) = from { request = request.item("from", AttributeValue::N(value.to_string())); @@ -60,7 +60,7 @@ pub async fn update_block(dynamo_client: &Client, task_id: &str, block_number: u .item("SK", AttributeValue::S(format!("TASK#{}", task_id))) .item("is_fetched", AttributeValue::Bool(true)) .item("last_update", AttributeValue::N(unix_timestamp.to_string())) - .item("indexer", AttributeValue::S(task_id.to_string())) + .item("task_id", AttributeValue::S(task_id.to_string())) .item("version", AttributeValue::S(indexer_version)) .send() .await?;