Skip to content

Commit

Permalink
indexer-alt: try_for_each_spawned (#20379)
Browse files Browse the repository at this point in the history
## Description

Create a `try_for_each_spawned` which works similarly to
`try_for_each_concurrent` but spawns a new tokio task for each unit of
work that it schedules based on values being pulled from the stream (but
no more than the `limit` set on the max concurrency.

This does introduce some slight overhead if we run it with a limit of
`1` (i.e. no concurrency) but otherwise will give the tokio runtime an
opportunity to schedule each unit of work on a different thread
(assuming the runtime is multi-threaded).

The interface is almost a drop-in replacement for
`try_for_each_concurrent` but with the following slight changes:

- The input stream does not need to be a `Result` itself.
- The future and returned error type must have `'static` lifetimes
because they are being passed to the tokio runtime and we can't easily
determine that they will not outlive the caller.

This addresses one of the main remaining differences between the
existing indexer and `sui-indexer-alt` (`sui-data-ingestion-core`'s
worker pool works roughly like this new abstraction, but without the
support for streams etc).

## Test plan

New unit tests:

```
sui$ cargo nextest run -p sui-indexer-alt -- task::tests
```

Running the indexer locally also makes sure the system is still operable
with the change.

Finally, I created some benchmarks in a repository to test that
`try_for_each_concurrent` does not take advantage of parallelism and
`try_for_each_spawned` does:

https://gist.github.com/amnn/6c6f198693d46d1f6d30bd7ef7be001d

Benchmarking results show that on a job counting the primes up to i for
2 <= i < 50,000, the concurrent and sequential implementations complete
in 2.7s.

`try_for_each_spawned` without parallelism does the same work in 3.2s
(slightly slower because of the overhead creating new tasks), and
`try_for_each_spawned` with parallelism of `16` completes in about 400ms
(an 8x improvement).

The most expensive individual task costs about 3ms to run on its own
(counting the primes up to 50,000), which is roughly in line with the
cost of processing.

The benchmarks also include an implementation that does the same thing
as `try_for_each_spawned` but built out of existing stream extension
libraries (`StreamExt::map`, and `StreamExt::buffered`) which have
similar performance characteristics but don't have the same behaviour
w.r.t. propagating panics and cancellations.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn authored Nov 22, 2024
1 parent 7510d6a commit 9fa49dc
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 19 deletions.
9 changes: 4 additions & 5 deletions crates/sui-indexer-alt/src/ingestion/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use futures::{future::try_join_all, TryStreamExt};
use futures::future::try_join_all;
use mysten_metrics::spawn_monitored_task;
use std::sync::Arc;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use crate::ingestion::error::Error;
use crate::{ingestion::error::Error, task::TrySpawnStreamExt};

use super::{client::IngestionClient, IngestionConfig};

Expand All @@ -31,8 +31,7 @@ pub(super) fn broadcaster(
info!("Starting ingestion broadcaster");

match ReceiverStream::new(checkpoint_rx)
.map(Ok)
.try_for_each_concurrent(/* limit */ config.ingest_concurrency, |cp| {
.try_for_each_spawned(/* limit */ config.ingest_concurrency, |cp| {
let client = client.clone();
let subscribers = subscribers.clone();

Expand Down
7 changes: 3 additions & 4 deletions crates/sui-indexer-alt/src/pipeline/concurrent/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
use std::{sync::Arc, time::Duration};

use backoff::ExponentialBackoff;
use futures::TryStreamExt;
use mysten_metrics::spawn_monitored_task;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
db::Db,
metrics::IndexerMetrics,
pipeline::{Break, PipelineConfig, WatermarkPart},
task::TrySpawnStreamExt,
};

use super::{Batched, Handler};
Expand Down Expand Up @@ -48,8 +48,7 @@ pub(super) fn committer<H: Handler + 'static>(
let write_concurrency = H::WRITE_CONCURRENCY_OVERRIDE.unwrap_or(config.write_concurrency);

match ReceiverStream::new(rx)
.map(Ok)
.try_for_each_concurrent(write_concurrency, |Batched { values, watermark }| {
.try_for_each_spawned(write_concurrency, |Batched { values, watermark }| {
let values = Arc::new(values);
let tx = tx.clone();
let db = db.clone();
Expand Down
11 changes: 5 additions & 6 deletions crates/sui-indexer-alt/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use futures::TryStreamExt;
use mysten_metrics::spawn_monitored_task;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};

use crate::{metrics::IndexerMetrics, pipeline::Break};
use crate::{metrics::IndexerMetrics, pipeline::Break, task::TrySpawnStreamExt};

use super::Indexed;

Expand Down Expand Up @@ -52,15 +51,15 @@ pub(super) fn processor<P: Processor + Send + Sync + 'static>(
spawn_monitored_task!(async move {
info!(pipeline = P::NAME, "Starting processor");
let latest_processed_checkpoint = Arc::new(AtomicU64::new(0));
let processor = Arc::new(processor);

match ReceiverStream::new(rx)
.map(Ok)
.try_for_each_concurrent(P::FANOUT, |checkpoint| {
.try_for_each_spawned(P::FANOUT, |checkpoint| {
let tx = tx.clone();
let metrics = metrics.clone();
let cancel = cancel.clone();
let latest_processed_checkpoint = latest_processed_checkpoint.clone();
let processor = &processor;
let processor = processor.clone();

async move {
if cancel.is_cancelled() {
Expand Down
Loading

0 comments on commit 9fa49dc

Please sign in to comment.