diff --git a/crates/sui-data-ingestion/src/lib.rs b/crates/sui-data-ingestion/src/lib.rs index 3ef48d9ec9713..1853b8f0af884 100644 --- a/crates/sui-data-ingestion/src/lib.rs +++ b/crates/sui-data-ingestion/src/lib.rs @@ -4,7 +4,7 @@ mod progress_store; mod workers; -pub use progress_store::DynamoDBProgressStore; +pub use progress_store::IngestionWorkflowsProgressStore; pub use workers::{ ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, KVStoreTaskConfig, KVStoreWorker, diff --git a/crates/sui-data-ingestion/src/main.rs b/crates/sui-data-ingestion/src/main.rs index cd2c00694a07d..f403a01c2d447 100644 --- a/crates/sui-data-ingestion/src/main.rs +++ b/crates/sui-data-ingestion/src/main.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use std::time::Duration; use sui_data_ingestion::{ ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, - DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker, + IngestionWorkflowsProgressStore, KVStoreTaskConfig, KVStoreWorker, }; use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions}; use sui_data_ingestion_core::{IndexerExecutor, WorkerPool}; @@ -119,12 +119,27 @@ async fn main() -> Result<()> { mysten_metrics::init_metrics(®istry); let metrics = DataIngestionMetrics::new(®istry); - let progress_store = DynamoDBProgressStore::new( + let mut bigtable_client = None; + for task in &config.tasks { + if let Task::BigTableKV(kv_config) = &task.task { + bigtable_client = Some( + BigTableClient::new_remote( + kv_config.instance_id.clone(), + false, + Some(Duration::from_secs(kv_config.timeout_secs as u64)), + ) + .await?, + ); + } + } + + let progress_store = IngestionWorkflowsProgressStore::new( &config.progress_store.aws_access_key_id, &config.progress_store.aws_secret_access_key, config.progress_store.aws_region, config.progress_store.table_name, config.is_backfill, + bigtable_client, ) .await; let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics); diff --git a/crates/sui-data-ingestion/src/progress_store.rs b/crates/sui-data-ingestion/src/progress_store.rs index 02857becfc626..550f164e4ddf7 100644 --- a/crates/sui-data-ingestion/src/progress_store.rs +++ b/crates/sui-data-ingestion/src/progress_store.rs @@ -11,21 +11,24 @@ use aws_sdk_s3::config::{Credentials, Region}; use std::str::FromStr; use std::time::Duration; use sui_data_ingestion_core::ProgressStore; +use sui_kvstore::{BigTableClient, KeyValueStoreWriter}; use sui_types::messages_checkpoint::CheckpointSequenceNumber; -pub struct DynamoDBProgressStore { +pub struct IngestionWorkflowsProgressStore { client: Client, table_name: String, is_backfill: bool, + bigtable_client: Option, } -impl DynamoDBProgressStore { +impl IngestionWorkflowsProgressStore { pub async fn new( aws_access_key_id: &str, aws_secret_access_key: &str, aws_region: String, table_name: String, is_backfill: bool, + bigtable_client: Option, ) -> Self { let credentials = Credentials::new( aws_access_key_id, @@ -50,12 +53,13 @@ impl DynamoDBProgressStore { client, table_name, is_backfill, + bigtable_client, } } } #[async_trait] -impl ProgressStore for DynamoDBProgressStore { +impl ProgressStore for IngestionWorkflowsProgressStore { async fn load(&mut self, task_name: String) -> Result { let item = self .client @@ -79,6 +83,11 @@ impl ProgressStore for DynamoDBProgressStore { if self.is_backfill && checkpoint_number % 1000 != 0 { return Ok(()); } + if let Some(ref mut bigtable_client) = self.bigtable_client { + bigtable_client + .save_watermark(&task_name, checkpoint_number) + .await?; + } let backoff = backoff::ExponentialBackoff::default(); backoff::future::retry(backoff, || async { let result = self diff --git a/crates/sui-kvstore/src/bigtable/client.rs b/crates/sui-kvstore/src/bigtable/client.rs index 5c85447622827..553a97c6e9e92 100644 --- a/crates/sui-kvstore/src/bigtable/client.rs +++ b/crates/sui-kvstore/src/bigtable/client.rs @@ -35,9 +35,11 @@ const OBJECTS_TABLE: &str = "objects"; const TRANSACTIONS_TABLE: &str = "transactions"; const CHECKPOINTS_TABLE: &str = "checkpoints"; const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest"; +const WATERMARK_TABLE: &str = "watermark"; const COLUMN_FAMILY_NAME: &str = "sui"; const DEFAULT_COLUMN_QUALIFIER: &str = ""; +const AGGREGATED_WATERMARK_NAME: &str = "bigtable"; const CHECKPOINT_SUMMARY_COLUMN_QUALIFIER: &str = "s"; const CHECKPOINT_SIGNATURES_COLUMN_QUALIFIER: &str = "sg"; const CHECKPOINT_CONTENTS_COLUMN_QUALIFIER: &str = "c"; @@ -131,6 +133,21 @@ impl KeyValueStoreWriter for BigTableClient { ) .await } + + async fn save_watermark( + &mut self, + name: &str, + watermark: CheckpointSequenceNumber, + ) -> Result<()> { + let key = name.as_bytes().to_vec(); + let value = watermark.to_be_bytes().to_vec(); + self.multi_set_with_timestamp( + WATERMARK_TABLE, + [(key, vec![(DEFAULT_COLUMN_QUALIFIER, value)])], + watermark as i64, + ) + .await + } } #[async_trait] @@ -237,15 +254,7 @@ impl KeyValueStoreReader for BigTableClient { } async fn get_latest_checkpoint(&mut self) -> Result { - let upper_limit = u64::MAX.to_be_bytes().to_vec(); - match self - .reversed_scan(CHECKPOINTS_TABLE, upper_limit) - .await? - .pop() - { - Some((key_bytes, _)) => Ok(u64::from_be_bytes(key_bytes.as_slice().try_into()?)), - None => Ok(0), - } + self.get_watermark(AGGREGATED_WATERMARK_NAME).await } async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result> { @@ -257,6 +266,17 @@ impl KeyValueStoreReader for BigTableClient { } Ok(None) } + + async fn get_watermark(&mut self, watermark_name: &str) -> Result { + let key = watermark_name.as_bytes().to_vec(); + let mut response = self.multi_get(WATERMARK_TABLE, vec![key]).await?; + if let Some(row) = response.pop() { + if let Some((_, value)) = row.into_iter().next() { + return Ok(u64::from_be_bytes(value.as_slice().try_into()?)); + } + } + Ok(0) + } } impl BigTableClient { @@ -382,6 +402,15 @@ impl BigTableClient { &mut self, table_name: &str, values: impl IntoIterator)> + std::marker::Send, + ) -> Result<()> { + self.multi_set_with_timestamp(table_name, values, -1).await + } + + async fn multi_set_with_timestamp( + &mut self, + table_name: &str, + values: impl IntoIterator)> + std::marker::Send, + timestamp: i64, ) -> Result<()> { let mut entries = vec![]; for (row_key, cells) in values { @@ -393,7 +422,7 @@ impl BigTableClient { column_qualifier: column_name.to_owned().into_bytes(), // The timestamp of the cell into which new data should be written. // Use -1 for current Bigtable server time. - timestamp_micros: -1, + timestamp_micros: timestamp, value, })), }) diff --git a/crates/sui-kvstore/src/bigtable/init.sh b/crates/sui-kvstore/src/bigtable/init.sh index f96ac5c1e9827..b41ce0ec1fe09 100755 --- a/crates/sui-kvstore/src/bigtable/init.sh +++ b/crates/sui-kvstore/src/bigtable/init.sh @@ -10,7 +10,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then command+=(-project emulator) fi -for table in objects transactions checkpoints checkpoints_by_digest; do +for table in objects transactions checkpoints checkpoints_by_digest watermark; do ( set -x "${command[@]}" createtable $table diff --git a/crates/sui-kvstore/src/lib.rs b/crates/sui-kvstore/src/lib.rs index 5d3ab55a3f64a..d8fa9117de8c1 100644 --- a/crates/sui-kvstore/src/lib.rs +++ b/crates/sui-kvstore/src/lib.rs @@ -34,6 +34,7 @@ pub trait KeyValueStoreReader { ) -> Result>; async fn get_latest_checkpoint(&mut self) -> Result; async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result>; + async fn get_watermark(&mut self, task_name: &str) -> Result; } #[async_trait] @@ -41,6 +42,11 @@ pub trait KeyValueStoreWriter { async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>; async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>; async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>; + async fn save_watermark( + &mut self, + name: &str, + watermark: CheckpointSequenceNumber, + ) -> Result<()>; } #[derive(Clone, Debug)]