diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 3a7f284952527..124197a2eae02 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -66,7 +66,7 @@ pub struct Indexer { last_checkpoint: Option, /// Optional override of enabled pipelines. - enabled_pipelines: BTreeSet, + enabled_pipelines: Option>, /// Pipelines that have already been registered with the indexer. Used to make sure a pipeline /// with the same name isn't added twice. @@ -150,6 +150,8 @@ impl Indexer { let ingestion_service = IngestionService::new(ingestion_config, metrics.clone(), cancel.clone())?; + let enabled_pipelines: BTreeSet<_> = pipeline.into_iter().collect(); + Ok(Self { db, metrics, @@ -158,7 +160,11 @@ impl Indexer { pipeline_config, first_checkpoint, last_checkpoint, - enabled_pipelines: pipeline.into_iter().collect(), + enabled_pipelines: if enabled_pipelines.is_empty() { + None + } else { + Some(enabled_pipelines) + }, added_pipelines: BTreeSet::new(), cancel, first_checkpoint_from_watermark: u64::MAX, @@ -278,6 +284,13 @@ impl Indexer { /// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided, /// or will continue until it tracks the tip of the network. pub async fn run(mut self) -> Result> { + if let Some(enabled_pipelines) = &self.enabled_pipelines { + ensure!( + enabled_pipelines.is_empty(), + "Tried to enable pipelines that this indexer does not know about: {enabled_pipelines:#?}", + ); + } + let metrics_handle = self .metrics_service .run() @@ -332,9 +345,11 @@ impl Indexer { P::NAME, ); - if !self.enabled_pipelines.is_empty() && !self.enabled_pipelines.contains(P::NAME) { - info!("Skipping pipeline {}", P::NAME); - return Ok(None); + if let Some(enabled_pipelines) = &mut self.enabled_pipelines { + if !enabled_pipelines.remove(P::NAME) { + info!("Skipping pipeline {}", P::NAME); + return Ok(None); + } } let mut conn = self.db.connect().await.context("Failed DB connection")?;