Skip to content

Commit

Permalink
indexer-alt: ensure --pipeline flags are only known pipelines
Browse files Browse the repository at this point in the history
## Description

Produce an error if the a value is passed to the `--pipeline` flag that
is not supported at all. This should catch pipeline name typos more
easily.

## Test Plan

Test various configurations of the indexer (no pipeline filtering, valid
filtering, and invalid filtering):

```
sui$ cargo run -p sui-indexer-alt --release --                                 \
  --database-url postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io                \
  --last-checkpoint 100000 --consistent-range 1000

sui$ cargo run -p sui-indexer-alt --release --                                 \
  --database-url postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io                \
  --last-checkpoint 100000 --consistent-range 1000                             \
  --pipeline sum_obj_types --pipeline wal_obj_types

sui$ cargo run -p sui-indexer-alt --release --                                 \
  --database-url postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io                \
  --last-checkpoint 100000 --consistent-range 1000                             \
  --pipeline i_dont_exist
[...]
Error: Failed to start indexer

Caused by:
    Tried to enable pipelines that this indexer does not know about: {
        "i_dont_exist",
    }
```
  • Loading branch information
amnn committed Nov 22, 2024
1 parent fa252b2 commit d4876de
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct Indexer {
last_checkpoint: Option<u64>,

/// Optional override of enabled pipelines.
enabled_pipelines: BTreeSet<String>,
enabled_pipelines: Option<BTreeSet<String>>,

/// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
/// with the same name isn't added twice.
Expand Down Expand Up @@ -150,6 +150,8 @@ impl Indexer {
let ingestion_service =
IngestionService::new(ingestion_config, metrics.clone(), cancel.clone())?;

let enabled_pipelines: BTreeSet<_> = pipeline.into_iter().collect();

Ok(Self {
db,
metrics,
Expand All @@ -158,7 +160,11 @@ impl Indexer {
pipeline_config,
first_checkpoint,
last_checkpoint,
enabled_pipelines: pipeline.into_iter().collect(),
enabled_pipelines: if enabled_pipelines.is_empty() {
None
} else {
Some(enabled_pipelines)
},
added_pipelines: BTreeSet::new(),
cancel,
first_checkpoint_from_watermark: u64::MAX,
Expand Down Expand Up @@ -278,6 +284,13 @@ impl Indexer {
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
/// or will continue until it tracks the tip of the network.
pub async fn run(mut self) -> Result<JoinHandle<()>> {
if let Some(enabled_pipelines) = &self.enabled_pipelines {
ensure!(
enabled_pipelines.is_empty(),
"Tried to enable pipelines that this indexer does not know about: {enabled_pipelines:#?}",
);
}

let metrics_handle = self
.metrics_service
.run()
Expand Down Expand Up @@ -332,9 +345,11 @@ impl Indexer {
P::NAME,
);

if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(P::NAME) {
info!("Skipping pipeline {}", P::NAME);
return Ok(None);
if let Some(enabled_pipelines) = &mut self.enabled_pipelines {
if !enabled_pipelines.remove(P::NAME) {
info!("Skipping pipeline {}", P::NAME);
return Ok(None);
}
}

let mut conn = self.db.connect().await.context("Failed DB connection")?;
Expand Down

0 comments on commit d4876de

Please sign in to comment.