diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 5bdb2a44c111af..1a80058759bba9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -475,6 +475,23 @@ private void commitTransaction(long dbId, List tableList, long transactio List mowTableList = getMowTableList(tableList, tabletCommitInfos); if (!mowTableList.isEmpty()) { + // may be this txn has been calculated by previously task but commit rpc is timeout, + // and be will send another commit request to fe, so need to check txn status first + // before sending delete bitmap task to be, if txn is committed or visible, no need to + // calculate delete bitmap again, just return ok to be to finish this commit. + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, transactionId); + if (null != transactionState && null != transactionState.getTransactionStatus()) { + if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED + || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId, + transactionState.getTransactionStatus().toString()); + return; + } else { + LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId, + transactionState.getTransactionStatus().toString()); + } + } calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos); } @@ -519,6 +536,10 @@ private void commitTransaction(long dbId, List
tableList, long transactio try { txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList); txnOperated = true; + if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout")) { + throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, + "test delete bitmap update lock timeout, transactionId:" + transactionId); + } } finally { if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out index b8b3ea3eccac14..72273f8955720f 100644 --- a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.out @@ -5,3 +5,7 @@ 5 e 90 6 f 100 +-- !sql -- +5 e 90 +6 f 100 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy index 122503b1611f11..7176aec702f411 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_timeout.groovy @@ -74,13 +74,13 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") { // store the original value get_be_param("mow_stream_load_commit_retry_times") - // disable retry to make this problem more clear - set_be_param("mow_stream_load_commit_retry_times", "1") - def tableName = "tbl_basic" + // test fe release lock when calculating delete bitmap timeout setFeConfigTemporary(customFeConfig) { try { + // disable retry to make this problem more clear + set_be_param("mow_stream_load_commit_retry_times", "1") // create table sql """ drop table if exists ${tableName}; """ @@ -143,4 +143,47 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") { } } + + //test fe don't send calculating delete bitmap task to be twice when txn is committed or visible + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout") + try { + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql """ select * from ${tableName} order by id""" + } finally { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout") + sql "DROP TABLE IF EXISTS ${tableName};" + GetDebugPoint().clearDebugPointsForAllFEs() + } + } \ No newline at end of file