From dbcf5b88fffed3ad868cb2f2e0382625d1d13cd3 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Thu, 19 Dec 2024 21:51:44 +0800 Subject: [PATCH 1/4] [fix](cloud-mow) FE should release delete bitmap lock when calculating delete bitmap failed --- .../CloudGlobalTransactionMgr.java | 30 ++-- ...cloud_mow_stream_load_with_commit_fail.out | 7 + ...ud_mow_stream_load_with_commit_fail.groovy | 142 ++++++++++++++++++ 3 files changed, 167 insertions(+), 12 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 11a3f05ead70c4..7569856a958cd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -535,6 +535,10 @@ private void commitTransaction(long dbId, List tableList, long transactio private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, TxnCommitAttachment txnCommitAttachment) throws UserException { + if (DebugPointUtil.isEnable("FE.mow.commit.exception")) { + LOG.info("debug point FE.mow.commit.exception, throw e"); + throw new UserException("debug point FE.mow.commit.exception"); + } boolean txnOperated = false; TransactionState txnState = null; TxnStateChangeCallback cb = null; @@ -679,15 +683,9 @@ private void calcDeleteBitmapForMow(long dbId, List tableList, long t Map> backendToPartitionInfos = getCalcDeleteBitmapInfo( backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints, partitionToSubTxnIds); - try { - sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, - subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds - : Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load); - } catch (UserException e) { - LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage()); - removeDeleteBitmapUpdateLock(tableToPartitions, transactionId); - throw e; - } + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, + subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds + : Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load); } private Map> getPartitionSubTxnIds(List subTransactionStates, @@ -948,10 +946,10 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo } } - private void removeDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId) { - for (Map.Entry> entry : tableToParttions.entrySet()) { + private void removeDeleteBitmapUpdateLock(List
tableList, long transactionId) { + for (Table table : tableList) { RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder(); - builder.setTableId(entry.getKey()) + builder.setTableId(table.getId()) .setLockId(transactionId) .setInitiator(-1); final RemoveDeleteBitmapUpdateLockRequest request = builder.build(); @@ -1102,6 +1100,10 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, beforeCommitTransaction(tableList, transactionId, timeoutMillis); try { commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates); + } catch (Exception e) { + LOG.info("release delete bitmap lock,commit txn=" + transactionId + ",catch exception=" + e.getMessage()); + removeDeleteBitmapUpdateLock(tableList, transactionId); + throw e; } finally { afterCommitTransaction(tableList); } @@ -1187,6 +1189,10 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, beforeCommitTransaction(tableList, transactionId, timeoutMillis); try { commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + } catch (Exception e) { + LOG.info("release delete bitmap lock,commit txn=" + transactionId + ",catch exception=" + e.getMessage()); + removeDeleteBitmapUpdateLock(tableList, transactionId); + throw e; } finally { afterCommitTransaction(tableList); } diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out new file mode 100644 index 00000000000000..b8b3ea3eccac14 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- + +-- !sql -- +5 e 90 +6 f 100 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy new file mode 100644 index 00000000000000..fa71c3644f2027 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllFEs() + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def customFeConfig = [ + calculate_delete_bitmap_task_timeout_seconds: 2, + meta_service_rpc_retry_times : 5 + ] + + // store the original value + get_be_param("mow_stream_load_commit_retry_times") + // disable retry to make this problem more clear + set_be_param("mow_stream_load_commit_retry_times", "1") + + + def tableName = "tbl_basic" + setFeConfigTemporary(customFeConfig) { + try { + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + // this streamLoad will fail on fe commit phase + GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("FE.mow.commit.exception")) + } + } + qt_sql """ select * from ${tableName} order by id""" + + // this streamLoad will success because of removing exception injection + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql """ select * from ${tableName} order by id""" + } finally { + reset_be_param("mow_stream_load_commit_retry_times") + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + sql "DROP TABLE IF EXISTS ${tableName};" + GetDebugPoint().clearDebugPointsForAllFEs() + } + + } +} From dc4227eac7d0c1bbf350b52ce1b0ae0a579b29e2 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Tue, 24 Dec 2024 08:31:55 +0800 Subject: [PATCH 2/4] edit --- .../CloudGlobalTransactionMgr.java | 129 ++++++++++-------- 1 file changed, 75 insertions(+), 54 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 7569856a958cd4..34d6ff3fb126d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -346,7 +346,24 @@ public void commitTransaction(long dbId, List
tableList, public void commitTransaction(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); + List mowTableList = getMowTableList(tableList, tabletCommitInfos); + try { + LOG.info("try to commit transaction, transactionId: {}", transactionId); + Map> backendToPartitionInfos = null; + if (!mowTableList.isEmpty()) { + backendToPartitionInfos = getMowLock(mowTableList, + tabletCommitInfos, transactionId, null); + } + commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false, + mowTableList, backendToPartitionInfos); + } catch (Exception e) { + if (!mowTableList.isEmpty()) { + LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId + , e.getMessage()); + removeDeleteBitmapUpdateLock(mowTableList, transactionId); + } + throw e; + } } /** @@ -465,16 +482,14 @@ private Set getBaseTabletsFromTables(List
tableList, List tableList, long transactionId, - List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC) + List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC, + List mowTableList, Map> backendToPartitionInfos) throws UserException { - - LOG.info("try to commit transaction, transactionId: {}", transactionId); if (Config.disable_load_job) { throw new TransactionCommitFailedException( "disable_load_job is set to true, all load jobs are not allowed"); } - List mowTableList = getMowTableList(tableList, tabletCommitInfos); if (!mowTableList.isEmpty()) { // may be this txn has been calculated by previously task but commit rpc is timeout, // and be will send another commit request to fe, so need to check txn status first @@ -485,7 +500,8 @@ private void commitTransaction(long dbId, List
tableList, long transactio if (null != transactionState && null != transactionState.getTransactionStatus()) { if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId, + LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", + transactionId, transactionState.getTransactionStatus().toString()); return; } else { @@ -493,7 +509,8 @@ private void commitTransaction(long dbId, List
tableList, long transactio transactionState.getTransactionStatus().toString()); } } - calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, null); + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, + Config.calculate_delete_bitmap_task_timeout_seconds); } CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder(); @@ -513,7 +530,8 @@ private void commitTransaction(long dbId, List
tableList, long transactio builder.setCommitAttachment(TxnUtil .loadJobFinalOperationToPb(loadJobFinalOperation)); } else if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { - RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment + = (RLTaskTxnCommitAttachment) txnCommitAttachment; TxnStateChangeCallback cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); if (cb != null) { // use a temporary transaction state to do before commit check, @@ -657,37 +675,6 @@ private List getMowTableList(List
tableList, List tableList, long transactionId, - List tabletCommitInfos, List subTransactionStates) - throws UserException { - Map>> backendToPartitionTablets = Maps.newHashMap(); - Map partitions = Maps.newHashMap(); - Map> tableToPartitions = Maps.newHashMap(); - Map> tableToTabletList = Maps.newHashMap(); - Map tabletToTabletMeta = Maps.newHashMap(); - getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, partitions, backendToPartitionTablets, - tableToTabletList, tabletToTabletMeta); - if (backendToPartitionTablets.isEmpty()) { - throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId); - } - - Map> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates, tableToTabletList, - tabletToTabletMeta); - Map baseCompactionCnts = Maps.newHashMap(); - Map cumulativeCompactionCnts = Maps.newHashMap(); - Map cumulativePoints = Maps.newHashMap(); - getDeleteBitmapUpdateLock(tableToPartitions, transactionId, tableToTabletList, tabletToTabletMeta, - baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints); - Map partitionVersions = getPartitionVersions(partitions); - - Map> backendToPartitionInfos = getCalcDeleteBitmapInfo( - backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts, - cumulativePoints, partitionToSubTxnIds); - sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, - subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds - : Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load); - } - private Map> getPartitionSubTxnIds(List subTransactionStates, Map> tableToTabletList, Map tabletToTabletMeta) { if (subTransactionStates == null) { @@ -816,6 +803,32 @@ private Map> getCalcDeleteBitmapInfo( return backendToPartitionInfos; } + private Map> getMowLock(List mowTableList, + List tabletCommitInfos, long transactionId, + List subTransactionStates) throws UserException { + Map baseCompactionCnts = Maps.newHashMap(); + Map cumulativeCompactionCnts = Maps.newHashMap(); + Map cumulativePoints = Maps.newHashMap(); + Map> tableToPartitions = Maps.newHashMap(); + Map partitions = Maps.newHashMap(); + Map>> backendToPartitionTablets = Maps.newHashMap(); + Map> tableToTabletList = Maps.newHashMap(); + Map tabletToTabletMeta = Maps.newHashMap(); + getPartitionInfo(mowTableList, tabletCommitInfos, tableToPartitions, partitions, + backendToPartitionTablets, + tableToTabletList, tabletToTabletMeta); + getDeleteBitmapUpdateLock(tableToPartitions, transactionId, tableToTabletList, tabletToTabletMeta, + baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints); + if (backendToPartitionTablets.isEmpty()) { + throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId); + } + Map> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates, tableToTabletList, + tabletToTabletMeta); + Map partitionVersions = getPartitionVersions(partitions); + return getCalcDeleteBitmapInfo( + backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts, + cumulativePoints, partitionToSubTxnIds); + } private void getDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId, Map> tableToTabletList, Map tabletToTabletMeta, Map baseCompactionCnts, Map cumulativeCompactionCnts, @@ -852,6 +865,7 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo } StopWatch stopWatch = new StopWatch(); stopWatch.start(); + int totalRetryTime = 0; for (Map.Entry> entry : tableToParttions.entrySet()) { GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder(); @@ -946,8 +960,8 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo } } - private void removeDeleteBitmapUpdateLock(List
tableList, long transactionId) { - for (Table table : tableList) { + private void removeDeleteBitmapUpdateLock(List tableList, long transactionId) { + for (OlapTable table : tableList) { RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder(); builder.setTableId(table.getId()) .setLockId(transactionId) @@ -1098,11 +1112,24 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, .collect(Collectors.toList()); List
tableList = ((Database) db).getTablesOnIdOrderOrThrowException(tableIdList); beforeCommitTransaction(tableList, transactionId, timeoutMillis); + List tabletCommitInfos = subTransactionStates.stream().map( + SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream) + .map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList()); + List mowTableList = getMowTableList(tableList, tabletCommitInfos); try { - commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates); + Map> backendToPartitionInfos = null; + if (!mowTableList.isEmpty()) { + backendToPartitionInfos = getMowLock(mowTableList, + tabletCommitInfos, transactionId, subTransactionStates); + } + commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates, mowTableList, + backendToPartitionInfos); } catch (Exception e) { - LOG.info("release delete bitmap lock,commit txn=" + transactionId + ",catch exception=" + e.getMessage()); - removeDeleteBitmapUpdateLock(tableList, transactionId); + if (!mowTableList.isEmpty()) { + LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId + , e.getMessage()); + removeDeleteBitmapUpdateLock(mowTableList, transactionId); + } throw e; } finally { afterCommitTransaction(tableList); @@ -1111,13 +1138,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, } private void commitTransactionWithSubTxns(long dbId, List
tableList, long transactionId, - List subTransactionStates) throws UserException { - List tabletCommitInfos = subTransactionStates.stream().map( - SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream) - .map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList()); - List mowTableList = getMowTableList(tableList, tabletCommitInfos); + List subTransactionStates, List mowTableList, + Map> backendToPartitionInfos) throws UserException { if (!mowTableList.isEmpty()) { - calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, subTransactionStates); + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, + Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load); } cleanSubTransactions(transactionId); @@ -1189,10 +1214,6 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, beforeCommitTransaction(tableList, transactionId, timeoutMillis); try { commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); - } catch (Exception e) { - LOG.info("release delete bitmap lock,commit txn=" + transactionId + ",catch exception=" + e.getMessage()); - removeDeleteBitmapUpdateLock(tableList, transactionId); - throw e; } finally { afterCommitTransaction(tableList); } From d8e6dcbc2a7a1e49f6e234dec7b21421c212c19f Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Tue, 24 Dec 2024 09:25:57 +0800 Subject: [PATCH 3/4] edit --- .../transaction/CloudGlobalTransactionMgr.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 34d6ff3fb126d9..e7b5569f1cd414 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -500,8 +500,7 @@ private void commitTransaction(long dbId, List
tableList, long transactio if (null != transactionState && null != transactionState.getTransactionStatus()) { if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", - transactionId, + LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId, transactionState.getTransactionStatus().toString()); return; } else { @@ -530,8 +529,7 @@ private void commitTransaction(long dbId, List
tableList, long transactio builder.setCommitAttachment(TxnUtil .loadJobFinalOperationToPb(loadJobFinalOperation)); } else if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) { - RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment - = (RLTaskTxnCommitAttachment) txnCommitAttachment; + RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment; TxnStateChangeCallback cb = callbackFactory.getCallback(rlTaskTxnCommitAttachment.getJobId()); if (cb != null) { // use a temporary transaction state to do before commit check, @@ -865,7 +863,6 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo } StopWatch stopWatch = new StopWatch(); stopWatch.start(); - int totalRetryTime = 0; for (Map.Entry> entry : tableToParttions.entrySet()) { GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder(); @@ -990,6 +987,10 @@ private void removeDeleteBitmapUpdateLock(List tableList, long transa private void sendCalcDeleteBitmaptask(long dbId, long transactionId, Map> backendToPartitionInfos, long calculateDeleteBitmapTaskTimeoutSeconds) throws UserException { + if (backendToPartitionInfos == null) { + throw new UserException("failed to send calculate delete bitmap task to be,transactionId=" + transactionId + + ",but backendToPartitionInfos is null"); + } if (backendToPartitionInfos.isEmpty()) { return; } @@ -1223,7 +1224,8 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, @Override public void commitTransaction2PC(Database db, List
tableList, long transactionId, long timeoutMillis) throws UserException { - commitTransaction(db.getId(), tableList, transactionId, null, null, true); + List mowTableList = getMowTableList(tableList, null); + commitTransaction(db.getId(), tableList, transactionId, null, null, true, mowTableList, null); } @Override From 573c3f5ab2a9991287189743cc9aac1c6942086c Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Tue, 24 Dec 2024 09:52:36 +0800 Subject: [PATCH 4/4] edit --- .../cloud/transaction/CloudGlobalTransactionMgr.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index e7b5569f1cd414..3b4a4e0b740bd5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -358,8 +358,8 @@ public void commitTransaction(long dbId, List
tableList, long transaction mowTableList, backendToPartitionInfos); } catch (Exception e) { if (!mowTableList.isEmpty()) { - LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId - , e.getMessage()); + LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId, + e.getMessage()); removeDeleteBitmapUpdateLock(mowTableList, transactionId); } throw e; @@ -827,6 +827,7 @@ private Map> getMowLock(List> tableToParttions, long transactionId, Map> tableToTabletList, Map tabletToTabletMeta, Map baseCompactionCnts, Map cumulativeCompactionCnts, @@ -1127,8 +1128,8 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, backendToPartitionInfos); } catch (Exception e) { if (!mowTableList.isEmpty()) { - LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId - , e.getMessage()); + LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId, + e.getMessage()); removeDeleteBitmapUpdateLock(mowTableList, transactionId); } throw e;