From 22400530cc07670ba647f31728bdbf0f932cddee Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 27 Dec 2024 19:32:51 +0800 Subject: [PATCH] [Fix](merge-on-write) Should update pending delete bitmap KVs in MS when no need to calc delete bitmaps in publish phase (#46039) consider the following situation: 1. Txn A acquires the lock, obtains version X to publish, calculates the delete bitmap, writes the pending delete bitmap KVs to the MS, but fails for some reason before committing the transaction in the MS. 2. Txn B acquires the lock, obtains version X to publish, **cleans up the pending delete bitmap KV written by Txn A**, calculates the delete bitmap, **writes its pending delete bitmap KV to the MS**, but also fails for some reason before committing the transaction in the MS. 3. Txn A then reacquires the lock, obtains version X to publish, and notices that neither the version nor the compaction counts have changed. It will skip the process of calculating the delete bitmap and writing the pending delete bitmap KV to the MS https://github.com/apache/doris/pull/39018 and eventually succeeds in committing the transaction in the MS. In this case, Txn A will save the wrong delete bitmaps(generated by Txn B) in MS and causing correctness problem. To solve the problem, we should still update delete bitmap KVs in MS when we skip the calculation of delete bitmap on BE in publish phase. Also add a defensive check: record `lock_id` when writing pending delete bitmap keys and check if the `lock_id` is correct when commit txn in MS. --- .../cloud_engine_calc_delete_bitmap_task.cpp | 9 +- be/src/cloud/cloud_tablet.cpp | 38 +++-- be/src/cloud/cloud_tablet.h | 3 + cloud/src/meta-service/meta_service.cpp | 1 + cloud/src/meta-service/meta_service_txn.cpp | 31 ++++ cloud/test/meta_service_test.cpp | 139 +++++++++++++++++- gensrc/proto/cloud.proto | 2 + ...ng_delete_bitmaps_removed_by_other_txn.out | 11 ++ ...delete_bitmaps_removed_by_other_txn.groovy | 96 ++++++++++++ 9 files changed, 319 insertions(+), 11 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.groovy diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index fbf4b9cf303570..f203891423f120 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -283,8 +283,15 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset( // if version or compaction stats can't match, it means that this is a retry and there are // compaction or other loads finished successfully on the same tablet. So the previous publish // is stale and we should re-calculate the delete bitmap + + // we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps, + // because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns + int64_t lock_id = txn_info.is_txn_load ? txn_info.lock_id : -1; + RETURN_IF_ERROR( + tablet->save_delete_bitmap_to_ms(version, transaction_id, delete_bitmap, lock_id)); + LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str - << ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap."; + << ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps."; } else { if (invisible_rowsets == nullptr) { status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id, diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index f0f6f92e9d0c36..f2f21162bf05b5 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -724,6 +724,35 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta)); } + RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id)); + + // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, + // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do + // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail + RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( + txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, + txn_info->publish_info)); + + DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", { + auto sleep_sec = dp->param("sleep", 5); + std::this_thread::sleep_for(std::chrono::seconds(sleep_sec)); + }); + + DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", { + auto retry = dp->param("retry", false); + if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry + return Status::Error( + "injected DELETE_BITMAP_LOCK_ERROR"); + } else { + return Status::InternalError("injected non-retryable error"); + } + }); + + return Status::OK(); +} + +Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id, + DeleteBitmapPtr delete_bitmap, int64_t lock_id) { DeleteBitmapPtr new_delete_bitmap = std::make_shared(tablet_id()); for (auto iter = delete_bitmap->delete_bitmap.begin(); iter != delete_bitmap->delete_bitmap.end(); ++iter) { @@ -734,18 +763,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx iter->second); } } - auto ms_lock_id = lock_id == -1 ? txn_id : lock_id; RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID, new_delete_bitmap.get())); - - // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, - // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do - // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail - RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( - txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, - txn_info->publish_info)); - return Status::OK(); } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index fc0d64a493d316..c876518d868a49 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -173,6 +173,9 @@ class CloudTablet final : public BaseTablet { const RowsetIdUnorderedSet& cur_rowset_ids, int64_t lock_id = -1) override; + Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id, + DeleteBitmapPtr delete_bitmap, int64_t lock_id); + Status calc_delete_bitmap_for_compaction(const std::vector& input_rowsets, const RowsetSharedPtr& output_rowset, const RowIdConversion& rowid_conversion, diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 4967750762da48..17154a24777905 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1813,6 +1813,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont // 3. store all pending delete bitmap for this txn PendingDeleteBitmapPB delete_bitmap_keys; + delete_bitmap_keys.set_lock_id(request->lock_id()); for (size_t i = 0; i < request->rowset_ids_size(); ++i) { MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i), request->versions(i), request->segment_ids(i)}; diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 5d696220b72dab..34657cbdb821ce 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -972,8 +972,39 @@ void process_mow_when_commit_txn( LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i]) << " txn_id=" << txn_id; + int64_t lock_id = lock_info.lock_id(); for (auto tablet_id : table_id_tablet_ids[table_id]) { std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); + + // check that if the pending info's lock_id is correct + std::string pending_val; + err = txn->get(pending_key, &pending_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "failed to get delete bitmap pending info, instance_id=" << instance_id + << " tablet_id=" << tablet_id << " key=" << hex(pending_key) << " err=" << err; + msg = ss.str(); + code = cast_as(err); + return; + } + + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) continue; + + PendingDeleteBitmapPB pending_info; + if (!pending_info.ParseFromString(pending_val)) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse PendingDeleteBitmapPB"; + return; + } + + if (pending_info.has_lock_id() && pending_info.lock_id() != lock_id) { + code = MetaServiceCode::PENDING_DELETE_BITMAP_WRONG; + msg = fmt::format( + "wrong lock_id in pending delete bitmap infos, expect lock_id={}, but " + "found {} tablet_id={} instance_id={}", + lock_id, pending_info.lock_id(), tablet_id, instance_id); + return; + } + txn->remove(pending_key); LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key) << " txn_id=" << txn_id; diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 777c7419b701ad..49b1587228c912 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -5351,7 +5351,7 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) { // case: first version of rowset { - int64_t txn_id = -1; + int64_t txn_id = 98765; int64_t table_id = 123456; // same as table_id of tmp rowset int64_t db_id = 222; int64_t tablet_id_base = 8113; @@ -5430,11 +5430,16 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) { std::string lock_val; auto ret = txn->get(lock_key, &lock_val); ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + DeleteBitmapUpdateLockPB lock_info; + ASSERT_TRUE(lock_info.ParseFromString(lock_val)); std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base}); std::string pending_val; ret = txn->get(pending_key, &pending_val); ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + PendingDeleteBitmapPB pending_info; + ASSERT_TRUE(pending_info.ParseFromString(pending_val)); + ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id()); } // commit txn @@ -5468,6 +5473,138 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) { } } +TEST(MetaServiceTest, WrongPendingBitmapTest) { + auto meta_service = get_meta_service(); + extern std::string get_instance_id(const std::shared_ptr& rc_mgr, + const std::string& cloud_unique_id); + auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id"); + + // case: first version of rowset + { + int64_t txn_id = 56789; + int64_t table_id = 123456; // same as table_id of tmp rowset + int64_t db_id = 222; + int64_t tablet_id_base = 8113; + int64_t partition_id = 1234; + // begin txn + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + txn_id = res.txn_id(); + } + + // mock rowset and tablet + for (int i = 0; i < 5; ++i) { + create_tablet(meta_service.get(), table_id, 1235, partition_id, tablet_id_base + i); + auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); + tmp_rowset.set_partition_id(partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // update delete bitmap + { + // get delete bitmap update lock + brpc::Controller cntl; + GetDeleteBitmapUpdateLockRequest get_lock_req; + GetDeleteBitmapUpdateLockResponse get_lock_res; + get_lock_req.set_cloud_unique_id("test_cloud_unique_id"); + get_lock_req.set_table_id(table_id); + get_lock_req.add_partition_ids(partition_id); + get_lock_req.set_expiration(5); + get_lock_req.set_lock_id(txn_id); + get_lock_req.set_initiator(-1); + meta_service->get_delete_bitmap_update_lock( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req, + &get_lock_res, nullptr); + ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK); + + // first update delete bitmap + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id"); + update_delete_bitmap_req.set_table_id(table_id); + update_delete_bitmap_req.set_partition_id(partition_id); + update_delete_bitmap_req.set_lock_id(txn_id); + update_delete_bitmap_req.set_initiator(-1); + update_delete_bitmap_req.set_tablet_id(tablet_id_base); + + update_delete_bitmap_req.add_rowset_ids("123"); + update_delete_bitmap_req.add_segment_ids(1); + update_delete_bitmap_req.add_versions(2); + update_delete_bitmap_req.add_segment_delete_bitmaps("abc0"); + + meta_service->update_delete_bitmap( + reinterpret_cast(&cntl), + &update_delete_bitmap_req, &update_delete_bitmap_res, nullptr); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK); + } + + // check delete bitmap update lock and pending delete bitmap + { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + std::string lock_val; + auto ret = txn->get(lock_key, &lock_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + DeleteBitmapUpdateLockPB lock_info; + ASSERT_TRUE(lock_info.ParseFromString(lock_val)); + + std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base}); + std::string pending_val; + ret = txn->get(pending_key, &pending_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + PendingDeleteBitmapPB pending_info; + ASSERT_TRUE(pending_info.ParseFromString(pending_val)); + ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id()); + } + + { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + // pending bitmap have been modified by other txn + std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base}); + std::string pending_val; + auto ret = txn->get(pending_key, &pending_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + PendingDeleteBitmapPB pending_info; + ASSERT_TRUE(pending_info.ParseFromString(pending_val)); + // change pending bitmap's lock_id + pending_info.set_lock_id(pending_info.lock_id() + 1); + ASSERT_TRUE(pending_info.SerializeToString(&pending_val)); + txn->put(pending_key, pending_val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + } + + // commit txn + { + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(txn_id); + req.add_mow_table_ids(table_id); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::PENDING_DELETE_BITMAP_WRONG); + } + } +} + TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest1) { auto meta_service = get_meta_service(); SyncPoint::get_instance()->enable_processing(); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index c113868a2c3286..4e00faa0c6f4eb 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1381,6 +1381,7 @@ enum MetaServiceCode { LOCK_EXPIRED = 8001; LOCK_CONFLICT = 8002; ROWSETS_EXPIRED = 8003; + PENDING_DELETE_BITMAP_WRONG = 8004; // partial update ROWSET_META_NOT_FOUND = 9001; @@ -1446,6 +1447,7 @@ message RemoveDeleteBitmapResponse { message PendingDeleteBitmapPB { repeated bytes delete_bitmap_keys = 1; + optional int64 lock_id = 2; } message DeleteBitmapUpdateLockPB { diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.out new file mode 100644 index 00000000000000..8b8e97822cf766 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 999 999 +2 2 2 +3 3 3 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.groovy new file mode 100644 index 00000000000000..2b34d2bbb49d63 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_pending_delete_bitmaps_removed_by_other_txn", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def customFeConfig = [ + delete_bitmap_lock_expiration_seconds : 5, + calculate_delete_bitmap_task_timeout_seconds : 20, + ] + + setFeConfigTemporary(customFeConfig) { + + def table1 = "test_cloud_pending_delete_bitmaps_removed_by_other_txn" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // let the first load fail and retry after it writes pending delete bitmaps in MS and before + // it commit txn in MS + GetDebugPoint().enableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.enable_sleep", [sleep: 5]) + GetDebugPoint().enableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.injected_error", [retry: true]) + + // the first load + def t1 = Thread.start { + sql "insert into ${table1} values(1,999,999);" + } + + Thread.sleep(1000) + + def t2 = Thread.start { + try { + // this should fail + sql "insert into ${table1} values(2,888,888);" + } catch(Exception e) { + logger.info(e.getMessage()) + } + } + + // let the second load fail after it remove the pending delete bitmaps in MS written by load 1 + Thread.sleep(5000) + GetDebugPoint().enableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.injected_error", [retry: false]) + + + Thread.sleep(5000) + GetDebugPoint().disableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.injected_error") + + t1.join() + t2.join() + + Thread.sleep(300) + // force it read delete bitmaps from MS rather than BE's cache + GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss") + qt_sql "select * from ${table1} order by k1,c1,c2;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +}