Skip to content

Commit

Permalink
[kv store] add timeout to bigtable ingestion + backfill mode (#20193)
Browse files Browse the repository at this point in the history
## Description 

adds timeouts to the Bigtable ingestion pipeline and introduces a
backfill mode that skips most progress updates to DynamoDB during
backfilling

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Nov 12, 2024
1 parent a24d43d commit 4ba4b29
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
12 changes: 11 additions & 1 deletion crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use prometheus::Registry;
use serde::{Deserialize, Serialize};
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use sui_data_ingestion::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
Expand Down Expand Up @@ -45,6 +46,7 @@ struct ProgressStoreConfig {
#[derive(Serialize, Deserialize, Clone, Debug)]
struct BigTableTaskConfig {
instance_id: String,
timeout_secs: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -62,6 +64,8 @@ struct IndexerConfig {
metrics_host: String,
#[serde(default = "default_metrics_port")]
metrics_port: u16,
#[serde(default)]
is_backfill: bool,
}

fn default_metrics_host() -> String {
Expand Down Expand Up @@ -119,6 +123,7 @@ async fn main() -> Result<()> {
&config.progress_store.aws_secret_access_key,
config.progress_store.aws_region,
config.progress_store.table_name,
config.is_backfill,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
Expand Down Expand Up @@ -154,7 +159,12 @@ async fn main() -> Result<()> {
executor.register(worker_pool).await?;
}
Task::BigTableKV(kv_config) => {
let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?;
let client = BigTableClient::new_remote(
kv_config.instance_id,
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?;
let worker_pool = WorkerPool::new(
KvWorker { client },
task_config.name,
Expand Down
11 changes: 10 additions & 1 deletion crates/sui-data-ingestion/src/progress_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber;
pub struct DynamoDBProgressStore {
client: Client,
table_name: String,
is_backfill: bool,
}

impl DynamoDBProgressStore {
Expand All @@ -24,6 +25,7 @@ impl DynamoDBProgressStore {
aws_secret_access_key: &str,
aws_region: String,
table_name: String,
is_backfill: bool,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
Expand All @@ -44,7 +46,11 @@ impl DynamoDBProgressStore {
.load()
.await;
let client = Client::new(&aws_config);
Self { client, table_name }
Self {
client,
table_name,
is_backfill,
}
}
}

Expand All @@ -70,6 +76,9 @@ impl ProgressStore for DynamoDBProgressStore {
task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> Result<()> {
if self.is_backfill && checkpoint_number % 1000 != 0 {
return Ok(());
}
let backoff = backoff::ExponentialBackoff::default();
backoff::future::retry(backoff, || async {
let result = self
Expand Down

0 comments on commit 4ba4b29

Please sign in to comment.