Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kv store] add watermark table to bigtable #20390

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
mod progress_store;
mod workers;

pub use progress_store::DynamoDBProgressStore;
pub use progress_store::IngestionWorkflowsProgressStore;
pub use workers::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker, KVStoreTaskConfig,
KVStoreWorker,
Expand Down
19 changes: 17 additions & 2 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::path::PathBuf;
use std::time::Duration;
use sui_data_ingestion::{
ArchivalConfig, ArchivalReducer, ArchivalWorker, BlobTaskConfig, BlobWorker,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
IngestionWorkflowsProgressStore, KVStoreTaskConfig, KVStoreWorker,
};
use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions};
use sui_data_ingestion_core::{IndexerExecutor, WorkerPool};
Expand Down Expand Up @@ -119,12 +119,27 @@ async fn main() -> Result<()> {
mysten_metrics::init_metrics(&registry);
let metrics = DataIngestionMetrics::new(&registry);

let progress_store = DynamoDBProgressStore::new(
let mut bigtable_client = None;
for task in &config.tasks {
if let Task::BigTableKV(kv_config) = &task.task {
bigtable_client = Some(
BigTableClient::new_remote(
kv_config.instance_id.clone(),
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?,
);
}
}

let progress_store = IngestionWorkflowsProgressStore::new(
&config.progress_store.aws_access_key_id,
&config.progress_store.aws_secret_access_key,
config.progress_store.aws_region,
config.progress_store.table_name,
config.is_backfill,
bigtable_client,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
Expand Down
15 changes: 12 additions & 3 deletions crates/sui-data-ingestion/src/progress_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@ use aws_sdk_s3::config::{Credentials, Region};
use std::str::FromStr;
use std::time::Duration;
use sui_data_ingestion_core::ProgressStore;
use sui_kvstore::{BigTableClient, KeyValueStoreWriter};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct DynamoDBProgressStore {
pub struct IngestionWorkflowsProgressStore {
client: Client,
table_name: String,
is_backfill: bool,
bigtable_client: Option<BigTableClient>,
}

impl DynamoDBProgressStore {
impl IngestionWorkflowsProgressStore {
pub async fn new(
aws_access_key_id: &str,
aws_secret_access_key: &str,
aws_region: String,
table_name: String,
is_backfill: bool,
bigtable_client: Option<BigTableClient>,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
Expand All @@ -50,12 +53,13 @@ impl DynamoDBProgressStore {
client,
table_name,
is_backfill,
bigtable_client,
}
}
}

#[async_trait]
impl ProgressStore for DynamoDBProgressStore {
impl ProgressStore for IngestionWorkflowsProgressStore {
async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
let item = self
.client
Expand All @@ -79,6 +83,11 @@ impl ProgressStore for DynamoDBProgressStore {
if self.is_backfill && checkpoint_number % 1000 != 0 {
return Ok(());
}
if let Some(ref mut bigtable_client) = self.bigtable_client {
bigtable_client
.save_watermark(&task_name, checkpoint_number)
.await?;
}
let backoff = backoff::ExponentialBackoff::default();
backoff::future::retry(backoff, || async {
let result = self
Expand Down
49 changes: 39 additions & 10 deletions crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ const OBJECTS_TABLE: &str = "objects";
const TRANSACTIONS_TABLE: &str = "transactions";
const CHECKPOINTS_TABLE: &str = "checkpoints";
const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest";
const WATERMARK_TABLE: &str = "watermark";

const COLUMN_FAMILY_NAME: &str = "sui";
const DEFAULT_COLUMN_QUALIFIER: &str = "";
const AGGREGATED_WATERMARK_NAME: &str = "bigtable";
const CHECKPOINT_SUMMARY_COLUMN_QUALIFIER: &str = "s";
const CHECKPOINT_SIGNATURES_COLUMN_QUALIFIER: &str = "sg";
const CHECKPOINT_CONTENTS_COLUMN_QUALIFIER: &str = "c";
Expand Down Expand Up @@ -131,6 +133,21 @@ impl KeyValueStoreWriter for BigTableClient {
)
.await
}

async fn save_watermark(
&mut self,
name: &str,
watermark: CheckpointSequenceNumber,
) -> Result<()> {
let key = name.as_bytes().to_vec();
let value = watermark.to_be_bytes().to_vec();
self.multi_set_with_timestamp(
WATERMARK_TABLE,
[(key, vec![(DEFAULT_COLUMN_QUALIFIER, value)])],
watermark as i64,
)
.await
}
}

#[async_trait]
Expand Down Expand Up @@ -237,15 +254,7 @@ impl KeyValueStoreReader for BigTableClient {
}

async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber> {
let upper_limit = u64::MAX.to_be_bytes().to_vec();
match self
.reversed_scan(CHECKPOINTS_TABLE, upper_limit)
.await?
.pop()
{
Some((key_bytes, _)) => Ok(u64::from_be_bytes(key_bytes.as_slice().try_into()?)),
None => Ok(0),
}
self.get_watermark(AGGREGATED_WATERMARK_NAME).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what "AGGREGATED_WATERMARK_NAME" is and why we wouldn't use the checkpoint task name? Or do we just have 1 "task" that is responsible for all the bigtable writes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we have a single task for all BigTable writes.
On the sui-data-ingestion side, DDB will be eventually replaced with BigTable, so our instance will store multiple entries for different workflows (e.g., archival, blob storage, etc.).
For community deployments, however, there will be only a single entry

}

async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>> {
Expand All @@ -257,6 +266,17 @@ impl KeyValueStoreReader for BigTableClient {
}
Ok(None)
}

async fn get_watermark(&mut self, watermark_name: &str) -> Result<CheckpointSequenceNumber> {
let key = watermark_name.as_bytes().to_vec();
let mut response = self.multi_get(WATERMARK_TABLE, vec![key]).await?;
if let Some(row) = response.pop() {
if let Some((_, value)) = row.into_iter().next() {
return Ok(u64::from_be_bytes(value.as_slice().try_into()?));
}
}
Ok(0)
}
}

impl BigTableClient {
Expand Down Expand Up @@ -382,6 +402,15 @@ impl BigTableClient {
&mut self,
table_name: &str,
values: impl IntoIterator<Item = (Bytes, Vec<(&str, Bytes)>)> + std::marker::Send,
) -> Result<()> {
self.multi_set_with_timestamp(table_name, values, -1).await
}

async fn multi_set_with_timestamp(
&mut self,
table_name: &str,
values: impl IntoIterator<Item = (Bytes, Vec<(&str, Bytes)>)> + std::marker::Send,
timestamp: i64,
) -> Result<()> {
let mut entries = vec![];
for (row_key, cells) in values {
Expand All @@ -393,7 +422,7 @@ impl BigTableClient {
column_qualifier: column_name.to_owned().into_bytes(),
// The timestamp of the cell into which new data should be written.
// Use -1 for current Bigtable server time.
timestamp_micros: -1,
timestamp_micros: timestamp,
value,
})),
})
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-kvstore/src/bigtable/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
command+=(-project emulator)
fi

for table in objects transactions checkpoints checkpoints_by_digest; do
for table in objects transactions checkpoints checkpoints_by_digest watermark; do
(
set -x
"${command[@]}" createtable $table
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-kvstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ pub trait KeyValueStoreReader {
) -> Result<Option<Checkpoint>>;
async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber>;
async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>>;
async fn get_watermark(&mut self, task_name: &str) -> Result<u64>;
}

#[async_trait]
pub trait KeyValueStoreWriter {
async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>;
async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>;
async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>;
async fn save_watermark(
&mut self,
name: &str,
watermark: CheckpointSequenceNumber,
) -> Result<()>;
}

#[derive(Clone, Debug)]
Expand Down
Loading