Skip to content

Commit

Permalink
[fix](cloud-mow) Fix sending commiting rpc to FE twice problem (apach…
Browse files Browse the repository at this point in the history
…e#41395)

Here is an expample while commit rpc will send twice:
1. first commit request try to get delete bitmap lock, there is 2
lock(fe and ms), which take over rpc timeout(60s default) but not send
DELETE_BITMAP_LOCK_ERR to be, and fe will continue to send calculate
delete bitmap task to be
2. be calculate delete bitmap success and remove delete bitmap cache
3. because step 1 take over 60s, be will resend commit rpc to fe
4. after first commit request done, the second commit request from step
3 will do the same thing, but delete bitmap cache has been delete by
first commit, so it will fail on be
5. client will see commit fail

this pr check transaction status before sending delete bitmap task to
be, if transaction status is committed or visible, it no need to
recalculate delete bitmap again, just retrun rpc success to be.
  • Loading branch information
hust-hhb authored Nov 13, 2024
1 parent 5695609 commit 7b2547c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,23 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio

List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
// before sending delete bitmap task to be, if txn is committed or visible, no need to
// calculate delete bitmap again, just return ok to be to finish this commit.
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (null != transactionState && null != transactionState.getTransactionStatus()) {
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED
|| transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
LOG.info("txn={}, status={} not need to calculate delete bitmap again, just return ", transactionId,
transactionState.getTransactionStatus().toString());
return;
} else {
LOG.info("txn={}, status={} need to calculate delete bitmap", transactionId,
transactionState.getTransactionStatus().toString());
}
}
calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos);
}

Expand Down Expand Up @@ -519,6 +536,10 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
try {
txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
txnOperated = true;
if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout")) {
throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
"test delete bitmap update lock timeout, transactionId:" + transactionId);
}
} finally {
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
5 e 90
6 f 100

-- !sql --
5 e 90
6 f 100

Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {

// store the original value
get_be_param("mow_stream_load_commit_retry_times")
// disable retry to make this problem more clear
set_be_param("mow_stream_load_commit_retry_times", "1")


def tableName = "tbl_basic"
// test fe release lock when calculating delete bitmap timeout
setFeConfigTemporary(customFeConfig) {
try {
// disable retry to make this problem more clear
set_be_param("mow_stream_load_commit_retry_times", "1")
// create table
sql """ drop table if exists ${tableName}; """

Expand Down Expand Up @@ -143,4 +143,47 @@ suite("test_cloud_mow_stream_load_with_timeout", "nonConcurrent") {
}

}

//test fe don't send calculating delete bitmap task to be twice when txn is committed or visible
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
try {
// create table
sql """ drop table if exists ${tableName}; """

sql """
CREATE TABLE `${tableName}` (
`id` int(11) NOT NULL,
`name` varchar(1100) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_num" = "1"
);
"""
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}
qt_sql """ select * from ${tableName} order by id"""
} finally {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.commitTransaction.timeout")
sql "DROP TABLE IF EXISTS ${tableName};"
GetDebugPoint().clearDebugPointsForAllFEs()
}

}

0 comments on commit 7b2547c

Please sign in to comment.