Skip to content

Commit

Permalink
[indexer-alt] Add benchmark command (#20352)
Browse files Browse the repository at this point in the history
## Description 

This PR adds a benchmark command to indexer-alt.
It would start the indexer using the provided parameters, start and stop
based on the local ingestion data.
It then measures the time and TPS.
A few structural changes to indexer-alt code:
1. Moved lag and pruner options to a dedicated sequential pipeline
config, since these params only make sense in a sequential pipeline
setting.
2. Moved all the indexer start logic to a dedicated function, so that I
can reuse it in the benchmark.
3. Added option to not bootstrap genesis if not needed.

## Test plan 

Run locally

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Nov 21, 2024
1 parent fbe16db commit 66e0221
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 115 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ sui-pg-temp-db.workspace = true
sui-protocol-config.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-synthetic-ingestion = { workspace = true, optional = true }

[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
wiremock.workspace = true

[features]
default = []
benchmark = ["sui-synthetic-ingestion"] # This will be used to enable benchmark mode
81 changes: 59 additions & 22 deletions crates/sui-indexer-alt/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use std::time::Duration;

#[cfg(feature = "benchmark")]
use crate::benchmark::BenchmarkConfig;
use crate::db::DbConfig;
use crate::IndexerConfig;
use clap::Subcommand;
Expand All @@ -24,28 +26,8 @@ pub enum Command {
#[command(flatten)]
indexer: IndexerConfig,

/// How often to check whether write-ahead logs related to the consistent range can be
/// pruned.
#[arg(
long,
default_value = "300",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
consistent_pruning_interval: Duration,

/// How long to wait before honouring reader low watermarks.
#[arg(
long,
default_value = "120",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
pruner_delay: Duration,

/// Number of checkpoints to delay indexing summary tables for.
#[clap(long)]
consistent_range: Option<u64>,
#[command(flatten)]
consistency_config: ConsistencyConfig,
},

/// Wipe the database of its contents
Expand All @@ -55,4 +37,59 @@ pub enum Command {
#[clap(long, default_value_t = false)]
skip_migrations: bool,
},

/// Run the benchmark. It will load ingestion data from the given path and run the pipelines.
/// The first and last checkpoint will be determined automatically based on the ingestion data.
/// Note that the indexer will not be bootstrapped from genesis, and hence will
/// skip any pipelines that rely on genesis data.
#[cfg(feature = "benchmark")]
Benchmark {
#[command(flatten)]
config: BenchmarkConfig,
},
}

#[derive(clap::Args, Debug, Clone)]
pub struct ConsistencyConfig {
/// How often to check whether write-ahead logs related to the consistent range can be
/// pruned.
#[arg(
long,
default_value = "300",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
pub consistent_pruning_interval: Duration,

/// How long to wait before honouring reader low watermarks.
#[arg(
long,
default_value = "120",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
pub pruner_delay: Duration,

/// Number of checkpoints to delay indexing summary tables for.
#[clap(long)]
pub consistent_range: Option<u64>,
}

impl ConsistencyConfig {
const DEFAULT_CONSISTENT_PRUNING_INTERVAL: &'static str = "300";
const DEFAULT_PRUNER_DELAY: &'static str = "120";

pub fn default_consistent_pruning_interval() -> Duration {
Self::DEFAULT_CONSISTENT_PRUNING_INTERVAL
.parse()
.map(Duration::from_secs)
.unwrap()
}

pub fn default_pruner_delay() -> Duration {
Self::DEFAULT_PRUNER_DELAY
.parse()
.map(Duration::from_secs)
.unwrap()
}
}
70 changes: 70 additions & 0 deletions crates/sui-indexer-alt/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{path::PathBuf, time::Instant};

use crate::{
args::ConsistencyConfig,
db::{reset_database, DbConfig},
ingestion::IngestionConfig,
pipeline::PipelineConfig,
start_indexer, IndexerConfig,
};
use sui_synthetic_ingestion::synthetic_ingestion::read_ingestion_data;

#[derive(clap::Args, Debug, Clone)]
pub struct BenchmarkConfig {
/// Path to the local ingestion directory to read checkpoints data from.
#[arg(long)]
ingestion_path: PathBuf,

#[command(flatten)]
pipeline_config: PipelineConfig,

/// Only run the following pipelines. If not provided, all pipelines will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,

#[command(flatten)]
consistency_config: ConsistencyConfig,
}

pub async fn run_benchmark(
benchmark_config: BenchmarkConfig,
db_config: DbConfig,
) -> anyhow::Result<()> {
let BenchmarkConfig {
ingestion_path,
pipeline_config,
pipeline,
consistency_config,
} = benchmark_config;

let ingestion_data = read_ingestion_data(&ingestion_path).await?;
let first_checkpoint = *ingestion_data.keys().next().unwrap();
let last_checkpoint = *ingestion_data.keys().last().unwrap();
let num_transactions: usize = ingestion_data.values().map(|c| c.transactions.len()).sum();

reset_database(db_config.clone(), false /* do not skip migrations */).await?;

let indexer_config = IndexerConfig {
ingestion_config: IngestionConfig {
remote_store_url: None,
local_ingestion_path: Some(ingestion_path),
checkpoint_buffer_size: IngestionConfig::DEFAULT_CHECKPOINT_BUFFER_SIZE,
ingest_concurrency: IngestionConfig::DEFAULT_INGEST_CONCURRENCY,
retry_interval: IngestionConfig::default_retry_interval(),
},
pipeline_config,
first_checkpoint: Some(first_checkpoint),
last_checkpoint: Some(last_checkpoint),
pipeline,
metrics_address: IndexerConfig::default_metrics_address(),
};
let cur_time = Instant::now();
start_indexer(indexer_config, db_config, consistency_config, false).await?;
let elapsed = Instant::now().duration_since(cur_time);
println!("Indexed {} transactions in {:?}", num_transactions, elapsed);
println!("TPS: {}", num_transactions as f64 / elapsed.as_secs_f64());
Ok(())
}
19 changes: 16 additions & 3 deletions crates/sui-indexer-alt/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,36 @@ pub struct IngestionConfig {
pub local_ingestion_path: Option<PathBuf>,

/// Maximum size of checkpoint backlog across all workers downstream of the ingestion service.
#[arg(long, default_value_t = 5000)]
#[arg(long, default_value_t = Self::DEFAULT_CHECKPOINT_BUFFER_SIZE)]
pub checkpoint_buffer_size: usize,

/// Maximum number of checkpoints to attempt to fetch concurrently.
#[arg(long, default_value_t = 200)]
#[arg(long, default_value_t = Self::DEFAULT_INGEST_CONCURRENCY)]
pub ingest_concurrency: usize,

/// Polling interval to retry fetching checkpoints that do not exist.
#[arg(
long,
default_value = "200",
default_value = Self::DEFAULT_RETRY_INTERVAL_MS,
value_name = "MILLISECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_millis)
)]
pub retry_interval: Duration,
}

impl IngestionConfig {
pub const DEFAULT_CHECKPOINT_BUFFER_SIZE: usize = 5000;
pub const DEFAULT_INGEST_CONCURRENCY: usize = 200;
const DEFAULT_RETRY_INTERVAL_MS: &'static str = "200";

pub fn default_retry_interval() -> Duration {
Self::DEFAULT_RETRY_INTERVAL_MS
.parse()
.map(Duration::from_millis)
.unwrap()
}
}

impl IngestionService {
pub fn new(
config: IngestionConfig,
Expand Down
113 changes: 109 additions & 4 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,19 @@
use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};

use anyhow::{ensure, Context, Result};
use args::ConsistencyConfig;
use bootstrap::bootstrap;
use db::{Db, DbConfig};
use handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags,
kv_objects::KvObjects, kv_protocol_configs::KvProtocolConfigs, kv_transactions::KvTransactions,
obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances, sum_displays::SumDisplays,
sum_obj_types::SumObjTypes, sum_packages::SumPackages,
tx_affected_addresses::TxAffectedAddress, tx_affected_objects::TxAffectedObjects,
tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests,
tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes,
};
use ingestion::{client::IngestionClient, IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use models::watermarks::CommitterWatermark;
Expand All @@ -28,6 +40,9 @@ pub mod pipeline;
pub mod schema;
pub mod task;

#[cfg(feature = "benchmark")]
pub mod benchmark;

pub struct Indexer {
/// Connection pool to the database.
db: Db,
Expand Down Expand Up @@ -77,23 +92,31 @@ pub struct IndexerConfig {
/// ingestion will start just after the lowest checkpoint watermark across all active
/// pipelines.
#[arg(long)]
first_checkpoint: Option<u64>,
pub first_checkpoint: Option<u64>,

/// Override for the checkpoint to end ingestion at (inclusive) -- useful for backfills. By
/// default, ingestion will not stop, and will continue to poll for new checkpoints.
#[arg(long)]
last_checkpoint: Option<u64>,
pub last_checkpoint: Option<u64>,

/// Only run the following pipelines -- useful for backfills. If not provided, all pipelines
/// will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,
pub pipeline: Vec<String>,

/// Address to serve Prometheus Metrics from.
#[arg(long, default_value = "0.0.0.0:9184")]
#[arg(long, default_value = Self::DEFAULT_METRICS_ADDRESS)]
pub metrics_address: SocketAddr,
}

impl IndexerConfig {
const DEFAULT_METRICS_ADDRESS: &'static str = "0.0.0.0:9184";

pub fn default_metrics_address() -> SocketAddr {
Self::DEFAULT_METRICS_ADDRESS.parse().unwrap()
}
}

impl Indexer {
pub async fn new(
db_config: DbConfig,
Expand Down Expand Up @@ -318,3 +341,85 @@ impl Indexer {
Ok(Some(watermark))
}
}

pub async fn start_indexer(
indexer_config: IndexerConfig,
db_config: DbConfig,
consistency_config: ConsistencyConfig,
// If true, the indexer will bootstrap from genesis.
// Otherwise it will skip the pipelines that rely on genesis data.
// TODO: There is probably a better way to handle this.
// For instance, we could also pass in dummy genesis data in the benchmark mode.
with_genesis: bool,
) -> anyhow::Result<()> {
let cancel = CancellationToken::new();
let retry_interval = indexer_config.ingestion_config.retry_interval;
let mut indexer = Indexer::new(db_config, indexer_config, cancel.clone()).await?;

if with_genesis {
let genesis = bootstrap(&indexer, retry_interval, cancel.clone()).await?;

// Pipelines that rely on genesis information
indexer
.concurrent_pipeline(KvFeatureFlags(genesis.clone()), None)
.await?;

indexer
.concurrent_pipeline(KvProtocolConfigs(genesis.clone()), None)
.await?;
}

let ConsistencyConfig {
consistent_pruning_interval,
pruner_delay,
consistent_range: lag,
} = consistency_config;

// Pipelines that are split up into a summary table, and a write-ahead log, where the
// write-ahead log needs to be pruned.
let pruner_config = lag.map(|l| PrunerConfig {
interval: consistent_pruning_interval,
delay: pruner_delay,
// Retain at least twice as much data as the lag, to guarantee overlap between the
// summary table and the write-ahead log.
retention: l * 2,
// Prune roughly five minutes of data in one go.
max_chunk_size: 5 * 300,
});

indexer.sequential_pipeline(SumCoinBalances, lag).await?;
indexer
.concurrent_pipeline(WalCoinBalances, pruner_config.clone())
.await?;

indexer.sequential_pipeline(SumObjTypes, lag).await?;
indexer
.concurrent_pipeline(WalObjTypes, pruner_config)
.await?;

// Other summary tables (without write-ahead log)
indexer.sequential_pipeline(SumDisplays, None).await?;
indexer.sequential_pipeline(SumPackages, None).await?;

// Unpruned concurrent pipelines
indexer.concurrent_pipeline(EvEmitMod, None).await?;
indexer.concurrent_pipeline(EvStructInst, None).await?;
indexer.concurrent_pipeline(KvCheckpoints, None).await?;
indexer.concurrent_pipeline(KvEpochEnds, None).await?;
indexer.concurrent_pipeline(KvEpochStarts, None).await?;
indexer.concurrent_pipeline(KvObjects, None).await?;
indexer.concurrent_pipeline(KvTransactions, None).await?;
indexer.concurrent_pipeline(ObjVersions, None).await?;
indexer.concurrent_pipeline(TxAffectedAddress, None).await?;
indexer.concurrent_pipeline(TxAffectedObjects, None).await?;
indexer.concurrent_pipeline(TxBalanceChanges, None).await?;
indexer.concurrent_pipeline(TxCalls, None).await?;
indexer.concurrent_pipeline(TxDigests, None).await?;
indexer.concurrent_pipeline(TxKinds, None).await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;

cancel.cancelled().await;
let _ = h_indexer.await;
Ok(())
}
Loading

0 comments on commit 66e0221

Please sign in to comment.