Skip to content

Commit

Permalink
fix(core): indexer sk
Browse files Browse the repository at this point in the history
  • Loading branch information
remiroyc committed Sep 10, 2023
1 parent e989408 commit ac3c662
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ version = "0.1.0"
edition = "2021"

[dependencies]
chrono = "0.4.19"
regex = "1.5.4"
anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
Expand Down
30 changes: 30 additions & 0 deletions crates/ark-db/src/indexer/get.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::{anyhow, Result};
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::{Client, Error};
use std::env;
Expand Down Expand Up @@ -37,3 +38,32 @@ pub async fn get_block(dynamo_client: &Client, block_number: u64) -> Result<bool

Ok(false)
}

pub async fn get_indexer_sk(dynamo_client: &Client, task_id: &str) -> Result<String> {
let indexer_table_name =
env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set");

let result = dynamo_client
.query()
.table_name(indexer_table_name)
.index_name("task_id_index")
.key_condition_expression("#task_id = :task_id, #PK = :PK")
.expression_attribute_names("#task_id", "task_id")
.expression_attribute_names("#PK", "PK")
.expression_attribute_values(":task_id", AttributeValue::S(task_id.to_string()))
.expression_attribute_values(":PK", AttributeValue::S("INDEXER".to_string()))
.limit(1)
.send()
.await?;

if let Some(items) = result.items() {
if let Some(item) = items.first() {
if let Some(sk) = item.get("SK") {
let res = sk.as_s().unwrap();
return Ok(res.to_string());
}
}
}

Err(anyhow!("Indexer SK not found"))
}
12 changes: 4 additions & 8 deletions crates/ark-db/src/indexer/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use chrono::Utc;
pub async fn update_indexer(
dynamo_client: &Client,
task_id: &str,
indexer_sk: &str,
status: String,
from: u64,
to: u64,
Expand All @@ -16,17 +17,15 @@ pub async fn update_indexer(
let indexer_table_name =
env::var("ARK_INDEXER_TABLE_NAME").expect("ARK_INDEXER_TABLE_NAME must be set");
let indexer_version = env::var("ARK_INDEXER_VERSION").unwrap_or(String::from("undefined"));

let now = Utc::now();
let unix_timestamp = now.timestamp();

dynamo_client
.put_item()
.table_name(indexer_table_name)
.item("PK", AttributeValue::S(String::from("INDEXER")))
.item(
"SK",
AttributeValue::S(format!("TASK#{}#{}", unix_timestamp, task_id)),
)
.item("SK", AttributeValue::S(indexer_sk.to_string()))
.item("status", AttributeValue::S(status))
.item("last_update", AttributeValue::N(unix_timestamp.to_string()))
.item("version", AttributeValue::S(indexer_version))
Expand Down Expand Up @@ -54,10 +53,7 @@ pub async fn update_block(dynamo_client: &Client, task_id: &str, block_number: u
.put_item()
.table_name(indexer_table_name)
.item("PK", AttributeValue::S(format!("BLOCK#{}", block_number)))
.item(
"SK",
AttributeValue::S(format!("TASK#{}#{}", unix_timestamp, task_id)),
)
.item("SK", AttributeValue::S(format!("TASK#{}", task_id)))
.item("is_fetched", AttributeValue::Bool(true))
.item("last_update", AttributeValue::N(unix_timestamp.to_string()))
.item("task_id", AttributeValue::S(task_id.to_string()))
Expand Down
16 changes: 14 additions & 2 deletions src/core/block.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::contract::identify_contract_types_from_transfers;
use anyhow::Result;
use ark_db::indexer::get::get_block;
use ark_db::indexer::get::{get_block, get_indexer_sk};
use ark_db::indexer::update::{update_block, update_indexer};
use ark_starknet::collection_manager::CollectionManager;
use aws_sdk_dynamodb::Client as DynamoClient;
use aws_sdk_kinesis::Client as KinesisClient;
use chrono::Utc;
use log::{error, info};
use reqwest::Client as ReqwestClient;
use starknet::core::types::{BlockId, EmittedEvent};
Expand Down Expand Up @@ -77,6 +78,15 @@ pub async fn process_blocks_continuously(
info!("Starting block: {}", starting_block);
let mut current_block_number: u64 = starting_block;

let indexer_sk = match get_indexer_sk(dynamo_client, ecs_task_id).await {
Ok(indexer_sk) => indexer_sk,
Err(_) => {
let now = Utc::now();
let unix_timestamp = now.timestamp();
format!("TASK#{}#{}", unix_timestamp, ecs_task_id)
}
};

loop {
let execution_time = Instant::now();
let dest_block_number = get_destination_block_number(collection_manager).await?;
Expand All @@ -91,7 +101,8 @@ pub async fn process_blocks_continuously(
update_indexer(
dynamo_client,
ecs_task_id,
String::from("running"),
indexer_sk.as_str(),
"running".to_string(),
starting_block,
dest_block_number,
indexation_progress as u64,
Expand Down Expand Up @@ -159,6 +170,7 @@ pub async fn process_blocks_continuously(
update_indexer(
dynamo_client,
ecs_task_id,
&indexer_sk,
String::from("stopped"),
starting_block,
current_block_number,
Expand Down

0 comments on commit ac3c662

Please sign in to comment.