Skip to content

Commit

Permalink
[indexer-alt] Skip commit if batch is empty (#20124)
Browse files Browse the repository at this point in the history
## Description 

If we did not collect anything to the batch, we should not commit to the
DB. We should also avoid reporting any metrics.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:

---------

Co-authored-by: Ashok Menon <[email protected]>
  • Loading branch information
lxfind and amnn authored Nov 13, 2024
1 parent f04e4cd commit 43d7c3d
Showing 1 changed file with 62 additions and 81 deletions.
143 changes: 62 additions & 81 deletions crates/sui-indexer-alt/src/pipeline/sequential/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub(super) fn committer<H: Handler + 'static>(
let mut poll = interval(config.collect_interval);
poll.set_missed_tick_behavior(MissedTickBehavior::Delay);

// If no checkpoint lag is specified, we default it to `0` (no lag).
let checkpoint_lag = checkpoint_lag.unwrap_or_default();

// Buffer to gather the next batch to write. A checkpoint's data is only added to the batch
// when it is known to come from the next checkpoint after `watermark` (the current tip of
// the batch), and data from previous checkpoints will be discarded to avoid double writes.
Expand All @@ -71,7 +74,6 @@ pub(super) fn committer<H: Handler + 'static>(
// The task keeps track of the highest (inclusive) checkpoint it has added to the batch,
// and whether that batch needs to be written out. By extension it also knows the next
// checkpoint to expect and add to the batch.
let mut watermark_needs_update = false;
let (mut watermark, mut next_checkpoint) = if let Some(watermark) = watermark {
let next = watermark.checkpoint_hi_inclusive as u64 + 1;
(watermark, next)
Expand Down Expand Up @@ -99,6 +101,15 @@ pub(super) fn committer<H: Handler + 'static>(
}

_ = poll.tick() => {
if batch_checkpoints == 0
&& rx.is_closed()
&& rx.is_empty()
&& !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
{
info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
break;
}

if pending.len() > WARN_PENDING_WATERMARKS {
warn!(
pipeline = H::NAME,
Expand All @@ -107,40 +118,6 @@ pub(super) fn committer<H: Handler + 'static>(
);
}

let Ok(mut conn) = db.connect().await else {
warn!(pipeline = H::NAME, "Failed to get connection for DB");
continue;
};

// Determine whether we need to hold back checkpoints from being committed
// because of checkpoint lag.
//
// TODO(amnn): Test this (depends on migrations and tempdb)
let commit_hi_inclusive = match (checkpoint_lag, pending.last_key_value()) {
(Some(lag), None) => {
debug!(pipeline = H::NAME, lag, "No pending checkpoints");
if rx.is_closed() && rx.is_empty() {
info!(pipeline = H::NAME, "Processor closed channel before priming");
break;
} else {
continue;
}
}

(Some(lag), Some((pending_hi, _))) if *pending_hi < lag => {
debug!(pipeline = H::NAME, lag, pending_hi, "Priming pipeline");
if rx.is_closed() && rx.is_empty() {
info!(pipeline = H::NAME, "Processor closed channel while priming");
break;
} else {
continue;
}
}

(Some(lag), Some((pending_hi, _))) => Some(*pending_hi - lag),
(None, _) => None,
};

let guard = metrics
.collector_gather_latency
.with_label_values(&[H::NAME])
Expand All @@ -155,13 +132,13 @@ pub(super) fn committer<H: Handler + 'static>(
// and batch together as a way to impose some limit on the size of the batch
// (and therefore the length of the write transaction).
while batch_checkpoints < H::MAX_BATCH_CHECKPOINTS {
let Some(entry) = pending.first_entry() else {
if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
break;
};
}

if matches!(commit_hi_inclusive, Some(hi) if hi < *entry.key()) {
let Some(entry) = pending.first_entry() else {
break;
}
};

match next_checkpoint.cmp(entry.key()) {
// Next pending checkpoint is from the future.
Expand All @@ -174,7 +151,6 @@ pub(super) fn committer<H: Handler + 'static>(
batch_checkpoints += 1;
H::batch(&mut batch, indexed.values);
watermark = indexed.watermark;
watermark_needs_update = true;
next_checkpoint += 1;
}

Expand All @@ -185,9 +161,9 @@ pub(super) fn committer<H: Handler + 'static>(
.total_watermarks_out_of_order
.with_label_values(&[H::NAME])
.inc();

let indexed = entry.remove();
pending_rows -= indexed.len();
continue;
}
}
}
Expand All @@ -201,6 +177,15 @@ pub(super) fn committer<H: Handler + 'static>(
"Gathered batch",
);

// If there is no new data to commit, we can skip the rest of the process. Note
// that we cannot use batch_rows for the check, since it is possible that there
// are empty checkpoints with no new rows added, but the watermark still needs
// to be updated.
if batch_checkpoints == 0 {
assert_eq!(batch_rows, 0);
continue;
}

metrics
.collector_batch_size
.with_label_values(&[H::NAME])
Expand Down Expand Up @@ -236,6 +221,11 @@ pub(super) fn committer<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.start_timer();

let Ok(mut conn) = db.connect().await else {
warn!(pipeline = H::NAME, "Failed to get connection for DB");
continue;
};

// Write all the object updates out along with the watermark update, in a
// single transaction. The handler's `commit` implementation is responsible for
// chunking up the writes into a manageable size.
Expand Down Expand Up @@ -337,37 +327,21 @@ pub(super) fn committer<H: Handler + 'static>(
);
}

if watermark_needs_update {
// Ignore the result -- the ingestion service will close this channel
// once it is done, but there may still be checkpoints buffered that need
// processing.
let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive as u64));
}
// Ignore the result -- the ingestion service will close this channel
// once it is done, but there may still be checkpoints buffered that need
// processing.
let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive as u64));

let _ = std::mem::take(&mut batch);
watermark_needs_update = false;
pending_rows -= batch_rows;
batch_checkpoints = 0;
batch_rows = 0;
attempt = 0;

// If there is a pending checkpoint, no greater than the expected next
// checkpoint, and less than or equal to the inclusive upperbound due to
// checkpoint lag, then the pipeline can do more work immediately (without
// waiting).
//
// Otherwise, if its channels have been closed, we know that it is guaranteed
// not to make any more progress, and we can stop the task.
if pending
.first_key_value()
.is_some_and(|(next, _)| {
*next <= next_checkpoint && commit_hi_inclusive.map_or(true, |hi| *next <= hi)
})
{
// If we could make more progress immediately, then schedule more work without
// waiting.
if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
poll.reset_immediately();
} else if rx.is_closed() && rx.is_empty() {
info!(pipeline = H::NAME, "Processor closed channel, pending rows empty");
break;
}
}

Expand All @@ -379,28 +353,14 @@ pub(super) fn committer<H: Handler + 'static>(
// next polling interval. This is appropriate if there are a minimum number of
// rows to write, and they are already in the batch, or we can process the next
// checkpoint to extract them.

if pending_rows < H::MIN_EAGER_ROWS {
continue;
}

if batch_rows > 0 {
if batch_checkpoints > 0
|| can_process_pending(next_checkpoint, checkpoint_lag, &pending)
{
poll.reset_immediately();
continue;
}

let Some((next, _)) = pending.first_key_value() else {
continue;
};

match (checkpoint_lag, pending.last_key_value()) {
(Some(_), None) => continue,
(Some(lag), Some((last, _))) if last.saturating_sub(lag) <= *next => {
continue;
}
_ => if *next <= next_checkpoint {
poll.reset_immediately();
}
}
}
}
Expand All @@ -409,3 +369,24 @@ pub(super) fn committer<H: Handler + 'static>(
info!(pipeline = H::NAME, ?watermark, "Stopping committer");
})
}

// Tests whether the first checkpoint in the `pending` buffer can be processed immediately, which
// is subject to the following conditions:
//
// - It is at or before the `next_checkpoint` expected by the committer.
// - It is at least `checkpoint_lag` checkpoints before the last checkpoint in the buffer.
fn can_process_pending<T>(
next_checkpoint: u64,
checkpoint_lag: u64,
pending: &BTreeMap<u64, T>,
) -> bool {
let Some((&first, _)) = pending.first_key_value() else {
return false;
};

let Some((&last, _)) = pending.last_key_value() else {
return false;
};

first <= next_checkpoint && first + checkpoint_lag <= last
}

0 comments on commit 43d7c3d

Please sign in to comment.