Skip to content

Commit

Permalink
[Fix](merge-on-write) Should update pending delete bitmap KVs in MS w…
Browse files Browse the repository at this point in the history
…hen no need to calc delete bitmaps in publish phase (#46039)

consider the following situation:
1. Txn A acquires the lock, obtains version X to publish, calculates the
delete bitmap, writes the pending delete bitmap KVs to the MS, but fails
for some reason before committing the transaction in the MS.
2. Txn B acquires the lock, obtains version X to publish, **cleans up
the pending delete bitmap KV written by Txn A**, calculates the delete
bitmap, **writes its pending delete bitmap KV to the MS**, but also
fails for some reason before committing the transaction in the MS.
3. Txn A then reacquires the lock, obtains version X to publish, and
notices that neither the version nor the compaction counts have changed.
It will skip the process of calculating the delete bitmap and writing
the pending delete bitmap KV to the MS
#39018 and eventually succeeds in
committing the transaction in the MS.

In this case, Txn A will save the wrong delete bitmaps(generated by Txn
B) in MS and causing correctness problem.

To solve the problem, we should still update delete bitmap KVs in MS
when we skip the calculation of delete bitmap on BE in publish phase.

Also add a defensive check: record `lock_id` when writing pending delete
bitmap keys and check if the `lock_id` is correct when commit txn in MS.
  • Loading branch information
bobhan1 authored Dec 27, 2024
1 parent c596e81 commit 2240053
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 11 deletions.
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,15 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap

// we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns
int64_t lock_id = txn_info.is_txn_load ? txn_info.lock_id : -1;
RETURN_IF_ERROR(
tablet->save_delete_bitmap_to_ms(version, transaction_id, delete_bitmap, lock_id));

LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
<< ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps.";
} else {
if (invisible_rowsets == nullptr) {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id,
Expand Down
38 changes: 29 additions & 9 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,35 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}

RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
auto sleep_sec = dp->param<int>("sleep", 5);
std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
});

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
auto retry = dp->param<bool>("retry", false);
if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"injected DELETE_BITMAP_LOCK_ERROR");
} else {
return Status::InternalError<false>("injected non-retryable error");
}
});

return Status::OK();
}

Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id) {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
Expand All @@ -734,18 +763,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
iter->second);
}
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ class CloudTablet final : public BaseTablet {
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id = -1) override;

Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id);

Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion,
Expand Down
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont

// 3. store all pending delete bitmap for this txn
PendingDeleteBitmapPB delete_bitmap_keys;
delete_bitmap_keys.set_lock_id(request->lock_id());
for (size_t i = 0; i < request->rowset_ids_size(); ++i) {
MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i),
request->versions(i), request->segment_ids(i)};
Expand Down
31 changes: 31 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,39 @@ void process_mow_when_commit_txn(
LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i])
<< " txn_id=" << txn_id;

int64_t lock_id = lock_info.lock_id();
for (auto tablet_id : table_id_tablet_ids[table_id]) {
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});

// check that if the pending info's lock_id is correct
std::string pending_val;
err = txn->get(pending_key, &pending_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "failed to get delete bitmap pending info, instance_id=" << instance_id
<< " tablet_id=" << tablet_id << " key=" << hex(pending_key) << " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
return;
}

if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) continue;

PendingDeleteBitmapPB pending_info;
if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse PendingDeleteBitmapPB";
return;
}

if (pending_info.has_lock_id() && pending_info.lock_id() != lock_id) {
code = MetaServiceCode::PENDING_DELETE_BITMAP_WRONG;
msg = fmt::format(
"wrong lock_id in pending delete bitmap infos, expect lock_id={}, but "
"found {} tablet_id={} instance_id={}",
lock_id, pending_info.lock_id(), tablet_id, instance_id);
return;
}

txn->remove(pending_key);
LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key)
<< " txn_id=" << txn_id;
Expand Down
139 changes: 138 additions & 1 deletion cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5351,7 +5351,7 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {

// case: first version of rowset
{
int64_t txn_id = -1;
int64_t txn_id = 98765;
int64_t table_id = 123456; // same as table_id of tmp rowset
int64_t db_id = 222;
int64_t tablet_id_base = 8113;
Expand Down Expand Up @@ -5430,11 +5430,16 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
std::string lock_val;
auto ret = txn->get(lock_key, &lock_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
DeleteBitmapUpdateLockPB lock_info;
ASSERT_TRUE(lock_info.ParseFromString(lock_val));

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id());
}

// commit txn
Expand Down Expand Up @@ -5468,6 +5473,138 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
}
}

TEST(MetaServiceTest, WrongPendingBitmapTest) {
auto meta_service = get_meta_service();
extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr,
const std::string& cloud_unique_id);
auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id");

// case: first version of rowset
{
int64_t txn_id = 56789;
int64_t table_id = 123456; // same as table_id of tmp rowset
int64_t db_id = 222;
int64_t tablet_id_base = 8113;
int64_t partition_id = 1234;
// begin txn
{
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label("test_label");
txn_info_pb.add_table_ids(table_id);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
txn_id = res.txn_id();
}

// mock rowset and tablet
for (int i = 0; i < 5; ++i) {
create_tablet(meta_service.get(), table_id, 1235, partition_id, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
tmp_rowset.set_partition_id(partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

// update delete bitmap
{
// get delete bitmap update lock
brpc::Controller cntl;
GetDeleteBitmapUpdateLockRequest get_lock_req;
GetDeleteBitmapUpdateLockResponse get_lock_res;
get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
get_lock_req.set_table_id(table_id);
get_lock_req.add_partition_ids(partition_id);
get_lock_req.set_expiration(5);
get_lock_req.set_lock_id(txn_id);
get_lock_req.set_initiator(-1);
meta_service->get_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req,
&get_lock_res, nullptr);
ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);

// first update delete bitmap
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
update_delete_bitmap_req.set_table_id(table_id);
update_delete_bitmap_req.set_partition_id(partition_id);
update_delete_bitmap_req.set_lock_id(txn_id);
update_delete_bitmap_req.set_initiator(-1);
update_delete_bitmap_req.set_tablet_id(tablet_id_base);

update_delete_bitmap_req.add_rowset_ids("123");
update_delete_bitmap_req.add_segment_ids(1);
update_delete_bitmap_req.add_versions(2);
update_delete_bitmap_req.add_segment_delete_bitmaps("abc0");

meta_service->update_delete_bitmap(
reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&update_delete_bitmap_req, &update_delete_bitmap_res, nullptr);
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);
}

// check delete bitmap update lock and pending delete bitmap
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
std::string lock_val;
auto ret = txn->get(lock_key, &lock_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
DeleteBitmapUpdateLockPB lock_info;
ASSERT_TRUE(lock_info.ParseFromString(lock_val));

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id());
}

{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
// pending bitmap have been modified by other txn
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
auto ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
// change pending bitmap's lock_id
pending_info.set_lock_id(pending_info.lock_id() + 1);
ASSERT_TRUE(pending_info.SerializeToString(&pending_val));
txn->put(pending_key, pending_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}

// commit txn
{
brpc::Controller cntl;
CommitTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(txn_id);
req.add_mow_table_ids(table_id);
CommitTxnResponse res;
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::PENDING_DELETE_BITMAP_WRONG);
}
}
}

TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest1) {
auto meta_service = get_meta_service();
SyncPoint::get_instance()->enable_processing();
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,7 @@ enum MetaServiceCode {
LOCK_EXPIRED = 8001;
LOCK_CONFLICT = 8002;
ROWSETS_EXPIRED = 8003;
PENDING_DELETE_BITMAP_WRONG = 8004;

// partial update
ROWSET_META_NOT_FOUND = 9001;
Expand Down Expand Up @@ -1446,6 +1447,7 @@ message RemoveDeleteBitmapResponse {

message PendingDeleteBitmapPB {
repeated bytes delete_bitmap_keys = 1;
optional int64 lock_id = 2;
}

message DeleteBitmapUpdateLockPB {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
2 2 2
3 3 3

-- !sql --
1 999 999
2 2 2
3 3 3

Loading

0 comments on commit 2240053

Please sign in to comment.