Skip to content

Commit

Permalink
Merge branch 'master' into 20240730_fix_arrow_connect
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Aug 13, 2024
2 parents 2f4a800 + 66f6243 commit 59931e2
Show file tree
Hide file tree
Showing 540 changed files with 18,008 additions and 3,974 deletions.
2 changes: 1 addition & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class BeExecVersionManager {
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 6;
constexpr inline int BeExecVersionManager::max_be_exec_version = 7;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;

/// functional
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2052,6 +2052,7 @@ void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskReq
finish_task_request.__set_signature(req.signature);
finish_task_request.__set_report_version(s_report_version);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions);

finish_task(finish_task_request);
remove_task_info(req.task_type, req.signature);
Expand Down
16 changes: 14 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status;
int64_t txn_expiration;
TxnPublishInfo previous_publish_info;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
_transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
&partial_update_info, &publish_status);
&partial_update_info, &publish_status, &previous_publish_info);
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
Expand All @@ -204,8 +205,19 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
txn_info.publish_status = publish_status;
txn_info.publish_info = {.publish_version = _version,
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt = _ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
auto update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) {
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED) &&
_version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
_ms_cumulative_compaction_cnt == previous_publish_info.cumulative_compaction_cnt &&
_ms_cumulative_point == previous_publish_info.cumulative_point) {
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap
LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
} else {
Expand Down
23 changes: 19 additions & 4 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,29 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64
}
txn_processed.insert(txn_id);
DeleteBitmapPtr tmp_delete_bitmap;
RowsetIdUnorderedSet tmp_rowset_ids;
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status);
if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) {
delete_bitmap->merge(*tmp_delete_bitmap);
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status);
// CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services.
// If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can
// use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other
// stats(version or compaction stats)
if (status.ok() && *publish_status == PublishStatus::SUCCEED) {
// tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap.
// Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON,
// we should replace it with the rowset's real version
DCHECK(rs_meta.start_version() == rs_meta.end_version());
int64_t rowset_version = rs_meta.start_version();
for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) {
// skip sentinel mark, which is used for delete bitmap correctness check
if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) {
delete_bitmap->merge({std::get<0>(delete_bitmap_key),
std::get<1>(delete_bitmap_key), rowset_version},
bitmap_value);
}
}
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
Expand Down
41 changes: 39 additions & 2 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,38 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
},
.download_done {},
});

auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.download_done {},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
auto schema_ptr = rowset_meta->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : schema_ptr->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rowset_meta, seg_id, index.index_id(),
index.get_index_suffix());
download_idx_file(idx_path);
}
}
} else if (idx_version == InvertedIndexStorageFormatPB::V2) {
if (schema_ptr->has_inverted_index()) {
auto idx_path = storage_resource.value()->remote_idx_v2_path(
*rowset_meta, seg_id);
download_idx_file(idx_path);
}
}
}
#endif
}
Expand Down Expand Up @@ -648,8 +680,13 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED);

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap,
cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info);

return Status::OK();
}
Expand Down
19 changes: 14 additions & 5 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "cpp/sync_point.h"
#include "olap/olap_common.h"
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"

namespace doris {

Expand Down Expand Up @@ -54,7 +55,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset,
DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
std::shared_ptr<PartialUpdateInfo>* partial_update_info,
std::shared_ptr<PublishStatus>* publish_status) {
std::shared_ptr<PublishStatus>* publish_status, TxnPublishInfo* previous_publish_info) {
{
std::shared_lock<std::shared_mutex> rlock(_rwlock);
TxnKey key(transaction_id, tablet_id);
Expand All @@ -68,6 +69,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
*txn_expiration = iter->second.txn_expiration;
*partial_update_info = iter->second.partial_update_info;
*publish_status = iter->second.publish_status;
*previous_publish_info = iter->second.publish_info;
}
RETURN_IF_ERROR(
get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr));
Expand Down Expand Up @@ -96,7 +98,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
if (val) {
*delete_bitmap = val->delete_bitmap;
*rowset_ids = val->rowset_ids;
if (rowset_ids) {
*rowset_ids = val->rowset_ids;
}
// must call release handle to reduce the reference count,
// otherwise there will be memory leak
release(handle);
Expand Down Expand Up @@ -153,12 +157,17 @@ void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status) {
PublishStatus publish_status,
TxnPublishInfo publish_info) {
{
std::unique_lock<std::shared_mutex> wlock(_rwlock);
TxnKey txn_key(transaction_id, tablet_id);
CHECK(_txn_map.count(txn_key) > 0);
*(_txn_map[txn_key].publish_status.get()) = publish_status;
CHECK(_txn_map.contains(txn_key));
TxnVal& txn_val = _txn_map[txn_key];
*(txn_val.publish_status) = publish_status;
if (publish_status == PublishStatus::SUCCEED) {
txn_val.publish_info = publish_info;
}
}
std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
CacheKey key(key_str);
Expand Down
11 changes: 9 additions & 2 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
RowsetSharedPtr* rowset, DeleteBitmapPtr* delete_bitmap,
RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
std::shared_ptr<PartialUpdateInfo>* partial_update_info,
std::shared_ptr<PublishStatus>* publish_status);
std::shared_ptr<PublishStatus>* publish_status,
TxnPublishInfo* previous_publish_info);

void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids,
Expand All @@ -52,12 +53,16 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
void update_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids,
PublishStatus publish_status);
PublishStatus publish_status, TxnPublishInfo publish_info = {});

void remove_expired_tablet_txn_info();

void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id);

// !!!ATTENTION!!!: the delete bitmap stored in CloudTxnDeleteBitmapCache contains sentinel marks,
// and the version in BitmapKey is DeleteBitmap::TEMP_VERSION_COMMON.
// when using delete bitmap from this cache, the caller should manually remove these marks if don't need it
// and should replace versions in BitmapKey by the correct version
Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id,
DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids,
std::shared_ptr<PublishStatus>* publish_status);
Expand Down Expand Up @@ -88,6 +93,8 @@ class CloudTxnDeleteBitmapCache : public LRUCachePolicyTrackingManual {
int64_t txn_expiration;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status = nullptr;
// used to determine if the retry needs to re-calculate the delete bitmap
TxnPublishInfo publish_info;
TxnVal() : txn_expiration(0) {};
TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_,
std::shared_ptr<PartialUpdateInfo> partial_update_info_,
Expand Down
39 changes: 39 additions & 0 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,45 @@ void CloudWarmUpManager::handle_jobs() {
wait->signal();
},
});

auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.download_done =
[wait](Status st) {
if (!st) {
LOG_WARNING("Warm up error ").error(st);
}
wait->signal();
},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
auto schema_ptr = rs->tablet_schema();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : schema_ptr->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
wait->add_count();
auto idx_path = storage_resource.value()->remote_idx_v1_path(
*rs, seg_id, index.index_id(), index.get_index_suffix());
download_idx_file(idx_path);
}
}
} else if (idx_version == InvertedIndexStorageFormatPB::V2) {
if (schema_ptr->has_inverted_index()) {
wait->add_count();
auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
download_idx_file(idx_path);
}
}
}
}
timespec time;
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ DEFINE_mInt32(doris_scan_range_row_count, "524288");
DEFINE_mInt32(doris_scan_range_max_mb, "1024");
// max bytes number for single scan block, used in segmentv2
DEFINE_mInt32(doris_scan_block_max_mb, "67108864");
// size of scanner queue between scanner thread and compute thread
DEFINE_mInt32(doris_scanner_queue_size, "1024");
// single read execute fragment row number
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ DECLARE_mInt32(doris_scan_range_row_count);
DECLARE_mInt32(doris_scan_range_max_mb);
// max bytes number for single scan block, used in segmentv2
DECLARE_mInt32(doris_scan_block_max_mb);
// size of scanner queue between scanner thread and compute thread
DECLARE_mInt32(doris_scanner_queue_size);
// single read execute fragment row number
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
Expand Down
Loading

0 comments on commit 59931e2

Please sign in to comment.