Skip to content

Commit

Permalink
Merge branch 'master' into show_column_stats
Browse files Browse the repository at this point in the history
  • Loading branch information
msridhar78 authored Dec 27, 2024
2 parents c3e07ca + b6cb442 commit 855f1c2
Show file tree
Hide file tree
Showing 4,202 changed files with 130,678 additions and 112,994 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
Binary file removed aazcp.tar.gz
Binary file not shown.
7 changes: 4 additions & 3 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ Status CloudBaseCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudBaseCompaction, tablet_id={}, cost={}ms range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -343,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("num_output_delete_bitmap", output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ Status CloudCumulativeCompaction::execute_compact() {
<< ", output_version=" << _output_version;
return res;
}
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms", _tablet->tablet_id(),
duration_cast<milliseconds>(steady_clock::now() - start).count())
LOG_INFO("finish CloudCumulativeCompaction, tablet_id={}, cost={}ms, range=[{}-{}]",
_tablet->tablet_id(), duration_cast<milliseconds>(steady_clock::now() - start).count(),
_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version())
.tag("job_id", _uuid)
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
Expand Down Expand Up @@ -299,7 +300,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
.tag("input_rowsets", _input_rowsets.size())
.tag("input_rows", _input_row_num)
.tag("input_segments", _input_segments)
.tag("update_bitmap_size", output_rowset_delete_bitmap->delete_bitmap.size());
.tag("number_output_delete_bitmap",
output_rowset_delete_bitmap->delete_bitmap.size());
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

Expand Down
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,15 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap

// we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns
int64_t lock_id = txn_info.is_txn_load ? txn_info.lock_id : -1;
RETURN_IF_ERROR(
tablet->save_delete_bitmap_to_ms(version, transaction_id, delete_bitmap, lock_id));

LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
<< ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps.";
} else {
if (invisible_rowsets == nullptr) {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id,
Expand Down
17 changes: 7 additions & 10 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class MetaServiceProxy {
}

private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}

static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
Expand All @@ -154,9 +158,6 @@ class MetaServiceProxy {
if (config::meta_service_connection_pooled) {
num_proxies = config::meta_service_connection_pool_size;
}
if (config::meta_service_endpoint.find(',') != std::string::npos) {
is_meta_service_endpoint_list = true;
}
proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
});

Expand All @@ -175,7 +176,7 @@ class MetaServiceProxy {

const char* load_balancer_name = nullptr;
std::string endpoint;
if (is_meta_service_endpoint_list) {
if (is_meta_service_endpoint_list()) {
endpoint = fmt::format("list://{}", config::meta_service_endpoint);
load_balancer_name = "random";
} else {
Expand Down Expand Up @@ -215,7 +216,7 @@ class MetaServiceProxy {
bool is_idle_timeout(long now) {
auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
// idle timeout only works without list endpoint.
return !is_meta_service_endpoint_list && idle_timeout_ms > 0 &&
return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Expand Down Expand Up @@ -243,7 +244,7 @@ class MetaServiceProxy {

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
Expand All @@ -262,16 +263,12 @@ class MetaServiceProxy {
return Status::OK();
}

static std::atomic_bool is_meta_service_endpoint_list;

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;
};

std::atomic_bool MetaServiceProxy::is_meta_service_endpoint_list = false;

template <typename T, typename... Ts>
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
void _check_file_cache_ttl_block_valid();

std::optional<StorageResource> get_storage_resource(const std::string& vault_id) {
LOG(INFO) << "Getting storage resource for vault_id: " << vault_id;
VLOG_DEBUG << "Getting storage resource for vault_id: " << vault_id;

bool synced = false;
do {
Expand Down
59 changes: 43 additions & 16 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/cumulative_compaction_time_series_policy.h"
Expand Down Expand Up @@ -408,6 +409,9 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
auto rs_it = _stale_rs_version_map.find(v_ts->version());
if (rs_it != _stale_rs_version_map.end()) {
expired_rowsets.push_back(rs_it->second);
LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id()
<< " rowset_id=" << rs_it->second->rowset_id().to_string()
<< " version=" << rs_it->first.to_string();
_stale_rs_version_map.erase(rs_it);
} else {
LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
Expand Down Expand Up @@ -657,11 +661,14 @@ void CloudTablet::get_compaction_status(std::string* json_result) {
}

void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
_cumulative_point = new_point;
return;
}
// cumulative point should only be reset to -1, or be increased
CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
<< "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
_cumulative_point = new_point;
// FIXME: could happen in currently unresolved race conditions
LOG(WARNING) << "Unexpected cumulative point: " << new_point
<< ", origin: " << _cumulative_point.load();
}

std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
Expand Down Expand Up @@ -710,10 +717,42 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
rowset_writer->num_rows() > 0) {
DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
return Status::InternalError<false>("injected update_tmp_rowset error.");
});
const auto& rowset_meta = rowset->rowset_meta();
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}

RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
auto sleep_sec = dp->param<int>("sleep", 5);
std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
});

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
auto retry = dp->param<bool>("retry", false);
if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"injected DELETE_BITMAP_LOCK_ERROR");
} else {
return Status::InternalError<false>("injected non-retryable error");
}
});

return Status::OK();
}

Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id) {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
Expand All @@ -724,18 +763,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
iter->second);
}
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}

Expand Down Expand Up @@ -863,9 +893,6 @@ Status CloudTablet::sync_meta() {
}
return st;
}
if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible
return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id());
}

auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ class CloudTablet final : public BaseTablet {
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id = -1) override;

Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id);

Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion,
Expand Down
3 changes: 0 additions & 3 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,6 @@ void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) {

for (auto& weak_tablet : weak_tablets) {
if (auto tablet = weak_tablet.lock()) {
if (tablet->tablet_state() != TABLET_RUNNING) {
continue;
}
int64_t last_sync_time = tablet->last_sync_time_s;
if (last_sync_time <= last_sync_time_bound) {
sync_time_tablet_set.emplace(last_sync_time, weak_tablet);
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ DEFINE_Bool(hide_webserver_config_page, "false");
DEFINE_Bool(enable_segcompaction, "true");

// Max number of segments allowed in a single segcompaction task.
DEFINE_Int32(segcompaction_batch_size, "10");
DEFINE_mInt32(segcompaction_batch_size, "10");

// Max row count allowed in a single source segment, bigger segments will be skipped.
DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ DECLARE_Bool(hide_webserver_config_page);
DECLARE_Bool(enable_segcompaction);

// Max number of segments allowed in a single segcompaction task.
DECLARE_Int32(segcompaction_batch_size);
DECLARE_mInt32(segcompaction_batch_size);

// Max row count allowed in a single source segment, bigger segments will be skipped.
DECLARE_Int32(segcompaction_candidate_max_rows);
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaActiveQueriesScanner::_s_tbls_columns = {
// name, type, size
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true},
Expand Down Expand Up @@ -92,7 +94,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
_active_query_block->reserve(_block_rows_limit);

if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("active queries schema is not match for FE and BE");
}
Expand All @@ -119,7 +121,7 @@ Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* bl

if (_active_query_block == nullptr) {
RETURN_IF_ERROR(_get_active_queries_block_from_fe());
_total_rows = _active_query_block->rows();
_total_rows = (int)_active_query_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
Expand Down Expand Up @@ -76,7 +78,7 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_block(
_task_stats_block.get());
_total_rows = _task_stats_block->rows();
_total_rows = (int)_task_stats_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaCatalogMetaCacheStatsScanner::_s_tbls_columns = {
{"CATALOG_NAME", TYPE_STRING, sizeof(StringRef), true},
{"CACHE_NAME", TYPE_STRING, sizeof(StringRef), true},
Expand Down Expand Up @@ -86,7 +88,7 @@ Status SchemaCatalogMetaCacheStatsScanner::_get_meta_cache_from_fe() {
_block->reserve(_block_rows_limit);

if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>(
"catalog meta cache stats schema is not match for FE and BE");
Expand Down Expand Up @@ -115,7 +117,7 @@ Status SchemaCatalogMetaCacheStatsScanner::get_next_block_internal(vectorized::B

if (_block == nullptr) {
RETURN_IF_ERROR(_get_meta_cache_from_fe());
_total_rows = _block->rows();
_total_rows = (int)_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
12 changes: 7 additions & 5 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "vec/common/string_ref.h"

namespace doris {
#include "common/compile_check_begin.h"

class RuntimeState;

namespace vectorized {
Expand Down Expand Up @@ -411,7 +413,7 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
{
std::vector<StringRef> strs(columns_num);
int offset_index = 0;
int cur_table_index = _table_index - _desc_result.tables_offset.size();
int cur_table_index = int(_table_index - _desc_result.tables_offset.size());

for (int i = 0; i < columns_num; ++i) {
while (_desc_result.tables_offset[offset_index] <= i) {
Expand Down Expand Up @@ -609,14 +611,14 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
// EXTRA
{
StringRef str = StringRef("", 0);
std::vector<void*> datas(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, datas));
std::vector<void*> filled_values(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, filled_values));
}
// PRIVILEGES
{
StringRef str = StringRef("", 0);
std::vector<void*> datas(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, datas));
std::vector<void*> filled_values(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, filled_values));
}
// COLUMN_COMMENT
{
Expand Down
Loading

0 comments on commit 855f1c2

Please sign in to comment.