From a1ea993ccd356cd77b94c26a1701ad66b313bc1c Mon Sep 17 00:00:00 2001 From: Xun Li Date: Fri, 22 Nov 2024 10:57:31 -0800 Subject: [PATCH] [indexer-alt] Temporarily remove transaction in sequential pipeline --- .../src/pipeline/sequential/committer.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs index 220efc1237d70..a6765755966b2 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs @@ -3,7 +3,7 @@ use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; -use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection}; +//use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection}; use mysten_metrics::spawn_monitored_task; use tokio::{ sync::mpsc, @@ -226,6 +226,19 @@ pub(super) fn committer( continue; }; + // TODO: The following code is an experiment to see whether it makes + // a difference to the commit throughput if we do not use a transaction. + // It can lead to inconsistent state for live queries, as well as + // inaccurate affected row counts in case of watermark update failure. + // Remove it later. + let affected = match H::commit(&batch, &mut conn).await { + Ok(affected) => { + watermark.update(&mut conn).await.map(|_| affected).map_err(|e| e.into()) + } + e => e, + }; + + /* // Write all the object updates out along with the watermark update, in a // single transaction. The handler's `commit` implementation is responsible for // chunking up the writes into a manageable size. @@ -235,6 +248,7 @@ pub(super) fn committer( watermark.update(conn).await?; H::commit(&batch, conn).await }.scope_boxed()).await; + */ // Drop the connection eagerly to avoid it holding on to references borrowed by // the transaction closure.