Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](cloud-mow) FE should release delete bitmap lock when calculating delete bitmap failed #45673

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,24 @@ public void commitTransaction(long dbId, List<Table> tableList,
public void commitTransaction(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false);
List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
try {
LOG.info("try to commit transaction, transactionId: {}", transactionId);
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
backendToPartitionInfos = getMowLock(mowTableList,
tabletCommitInfos, transactionId, null);
}
commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false,
mowTableList, backendToPartitionInfos);
} catch (Exception e) {
if (!mowTableList.isEmpty()) {
LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId,
e.getMessage());
removeDeleteBitmapUpdateLock(mowTableList, transactionId);
}
throw e;
}
}

/**
Expand Down Expand Up @@ -465,16 +482,14 @@ private Set<Long> getBaseTabletsFromTables(List<Table> tableList, List<TabletCom
}

private void commitTransaction(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC)
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC,
List<OlapTable> mowTableList, Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos)
throws UserException {

LOG.info("try to commit transaction, transactionId: {}", transactionId);
if (Config.disable_load_job) {
throw new TransactionCommitFailedException(
"disable_load_job is set to true, all load jobs are not allowed");
}

List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
if (!mowTableList.isEmpty()) {
// may be this txn has been calculated by previously task but commit rpc is timeout,
// and be will send another commit request to fe, so need to check txn status first
Expand All @@ -493,7 +508,8 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
transactionState.getTransactionStatus().toString());
}
}
calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, null);
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
Config.calculate_delete_bitmap_task_timeout_seconds);
}

CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
Expand Down Expand Up @@ -535,6 +551,10 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio

private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC,
TxnCommitAttachment txnCommitAttachment) throws UserException {
if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
LOG.info("debug point FE.mow.commit.exception, throw e");
throw new UserException("debug point FE.mow.commit.exception");
}
boolean txnOperated = false;
TransactionState txnState = null;
TxnStateChangeCallback cb = null;
Expand Down Expand Up @@ -653,43 +673,6 @@ private List<OlapTable> getMowTableList(List<Table> tableList, List<TabletCommit
return mowTableList;
}

private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, List<SubTransactionState> subTransactionStates)
throws UserException {
Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets = Maps.newHashMap();
Map<Long, Partition> partitions = Maps.newHashMap();
Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
Map<Long, List<Long>> tableToTabletList = Maps.newHashMap();
Map<Long, TabletMeta> tabletToTabletMeta = Maps.newHashMap();
getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, partitions, backendToPartitionTablets,
tableToTabletList, tabletToTabletMeta);
if (backendToPartitionTablets.isEmpty()) {
throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId);
}

Map<Long, List<Long>> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates, tableToTabletList,
tabletToTabletMeta);
Map<Long, Long> baseCompactionCnts = Maps.newHashMap();
Map<Long, Long> cumulativeCompactionCnts = Maps.newHashMap();
Map<Long, Long> cumulativePoints = Maps.newHashMap();
getDeleteBitmapUpdateLock(tableToPartitions, transactionId, tableToTabletList, tabletToTabletMeta,
baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints);
Map<Long, Long> partitionVersions = getPartitionVersions(partitions);

Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = getCalcDeleteBitmapInfo(
backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts,
cumulativePoints, partitionToSubTxnIds);
try {
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds
: Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
} catch (UserException e) {
LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage());
removeDeleteBitmapUpdateLock(tableToPartitions, transactionId);
throw e;
}
}

private Map<Long, List<Long>> getPartitionSubTxnIds(List<SubTransactionState> subTransactionStates,
Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta> tabletToTabletMeta) {
if (subTransactionStates == null) {
Expand Down Expand Up @@ -818,6 +801,33 @@ private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> getCalcDeleteBitmapInfo(
return backendToPartitionInfos;
}

private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> getMowLock(List<OlapTable> mowTableList,
List<TabletCommitInfo> tabletCommitInfos, long transactionId,
List<SubTransactionState> subTransactionStates) throws UserException {
Map<Long, Long> baseCompactionCnts = Maps.newHashMap();
Map<Long, Long> cumulativeCompactionCnts = Maps.newHashMap();
Map<Long, Long> cumulativePoints = Maps.newHashMap();
Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
Map<Long, Partition> partitions = Maps.newHashMap();
Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets = Maps.newHashMap();
Map<Long, List<Long>> tableToTabletList = Maps.newHashMap();
Map<Long, TabletMeta> tabletToTabletMeta = Maps.newHashMap();
getPartitionInfo(mowTableList, tabletCommitInfos, tableToPartitions, partitions,
backendToPartitionTablets,
tableToTabletList, tabletToTabletMeta);
getDeleteBitmapUpdateLock(tableToPartitions, transactionId, tableToTabletList, tabletToTabletMeta,
baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints);
if (backendToPartitionTablets.isEmpty()) {
throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId);
}
Map<Long, List<Long>> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates, tableToTabletList,
tabletToTabletMeta);
Map<Long, Long> partitionVersions = getPartitionVersions(partitions);
return getCalcDeleteBitmapInfo(
backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts,
cumulativePoints, partitionToSubTxnIds);
}

private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, long transactionId,
Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta> tabletToTabletMeta,
Map<Long, Long> baseCompactionCnts, Map<Long, Long> cumulativeCompactionCnts,
Expand Down Expand Up @@ -948,10 +958,10 @@ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, lo
}
}

private void removeDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, long transactionId) {
for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
private void removeDeleteBitmapUpdateLock(List<OlapTable> tableList, long transactionId) {
for (OlapTable table : tableList) {
RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder();
builder.setTableId(entry.getKey())
builder.setTableId(table.getId())
.setLockId(transactionId)
.setInitiator(-1);
final RemoveDeleteBitmapUpdateLockRequest request = builder.build();
Expand All @@ -978,6 +988,10 @@ private void removeDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions,
private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos,
long calculateDeleteBitmapTaskTimeoutSeconds) throws UserException {
if (backendToPartitionInfos == null) {
throw new UserException("failed to send calculate delete bitmap task to be,transactionId=" + transactionId
+ ",but backendToPartitionInfos is null");
}
if (backendToPartitionInfos.isEmpty()) {
return;
}
Expand Down Expand Up @@ -1100,22 +1114,37 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
.collect(Collectors.toList());
List<Table> tableList = ((Database) db).getTablesOnIdOrderOrThrowException(tableIdList);
beforeCommitTransaction(tableList, transactionId, timeoutMillis);
List<TabletCommitInfo> tabletCommitInfos = subTransactionStates.stream().map(
SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
.map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList());
List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
try {
commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates);
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null;
if (!mowTableList.isEmpty()) {
backendToPartitionInfos = getMowLock(mowTableList,
tabletCommitInfos, transactionId, subTransactionStates);
}
commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates, mowTableList,
backendToPartitionInfos);
} catch (Exception e) {
if (!mowTableList.isEmpty()) {
LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId,
e.getMessage());
removeDeleteBitmapUpdateLock(mowTableList, transactionId);
}
throw e;
} finally {
afterCommitTransaction(tableList);
}
return true;
}

private void commitTransactionWithSubTxns(long dbId, List<Table> tableList, long transactionId,
List<SubTransactionState> subTransactionStates) throws UserException {
List<TabletCommitInfo> tabletCommitInfos = subTransactionStates.stream().map(
SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
.map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList());
List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos);
List<SubTransactionState> subTransactionStates, List<OlapTable> mowTableList,
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos) throws UserException {
if (!mowTableList.isEmpty()) {
calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, subTransactionStates);
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
}

cleanSubTransactions(transactionId);
Expand Down Expand Up @@ -1196,7 +1225,8 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
@Override
public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
throws UserException {
commitTransaction(db.getId(), tableList, transactionId, null, null, true);
List<OlapTable> mowTableList = getMowTableList(tableList, null);
commitTransaction(db.getId(), tableList, transactionId, null, null, true, mowTableList, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --

-- !sql --
5 e 90
6 f 100

Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllFEs()

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
def backendId_to_params = [string: [:]]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def set_be_param = { paramName, paramValue ->
// for eache be node, set paramName=paramValue
for (String id in backendId_to_backendIP.keySet()) {
def beIp = backendId_to_backendIP.get(id)
def bePort = backendId_to_backendHttpPort.get(id)
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
assertTrue(out.contains("OK"))
}
}

def reset_be_param = { paramName ->
// for eache be node, reset paramName to default
for (String id in backendId_to_backendIP.keySet()) {
def beIp = backendId_to_backendIP.get(id)
def bePort = backendId_to_backendHttpPort.get(id)
def original_value = backendId_to_params.get(id).get(paramName)
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value))
assertTrue(out.contains("OK"))
}
}

def get_be_param = { paramName ->
// for eache be node, get param value by default
def paramValue = ""
for (String id in backendId_to_backendIP.keySet()) {
def beIp = backendId_to_backendIP.get(id)
def bePort = backendId_to_backendHttpPort.get(id)
// get the config value from be
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName))
assertTrue(code == 0)
assertTrue(out.contains(paramName))
// parsing
def resultList = parseJson(out)[0]
assertTrue(resultList.size() == 4)
// get original value
paramValue = resultList[2]
backendId_to_params.get(id, [:]).put(paramName, paramValue)
}
}

def customFeConfig = [
calculate_delete_bitmap_task_timeout_seconds: 2,
meta_service_rpc_retry_times : 5
]

// store the original value
get_be_param("mow_stream_load_commit_retry_times")
// disable retry to make this problem more clear
set_be_param("mow_stream_load_commit_retry_times", "1")


def tableName = "tbl_basic"
setFeConfigTemporary(customFeConfig) {
try {
// create table
sql """ drop table if exists ${tableName}; """

sql """
CREATE TABLE `${tableName}` (
`id` int(11) NOT NULL,
`name` varchar(1100) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_num" = "1"
);
"""
// this streamLoad will fail on fe commit phase
GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null)
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("FE.mow.commit.exception"))
}
}
qt_sql """ select * from ${tableName} order by id"""

// this streamLoad will success because of removing exception injection
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}
qt_sql """ select * from ${tableName} order by id"""
} finally {
reset_be_param("mow_stream_load_commit_retry_times")
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
sql "DROP TABLE IF EXISTS ${tableName};"
GetDebugPoint().clearDebugPointsForAllFEs()
}

}
}
Loading