Skip to content

Commit

Permalink
Merge branch 'master' into alter_catalog_rename
Browse files Browse the repository at this point in the history
  • Loading branch information
msridhar78 authored Dec 27, 2024
2 parents e93859b + b6cb442 commit 7c3320d
Show file tree
Hide file tree
Showing 759 changed files with 123,515 additions and 8,852 deletions.
Binary file removed aazcp.tar.gz
Binary file not shown.
9 changes: 8 additions & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,15 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap

// we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns
int64_t lock_id = txn_info.is_txn_load ? txn_info.lock_id : -1;
RETURN_IF_ERROR(
tablet->save_delete_bitmap_to_ms(version, transaction_id, delete_bitmap, lock_id));

LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
<< ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps.";
} else {
if (invisible_rowsets == nullptr) {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, transaction_id,
Expand Down
17 changes: 7 additions & 10 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class MetaServiceProxy {
}

private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}

static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
Expand All @@ -154,9 +158,6 @@ class MetaServiceProxy {
if (config::meta_service_connection_pooled) {
num_proxies = config::meta_service_connection_pool_size;
}
if (config::meta_service_endpoint.find(',') != std::string::npos) {
is_meta_service_endpoint_list = true;
}
proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
});

Expand All @@ -175,7 +176,7 @@ class MetaServiceProxy {

const char* load_balancer_name = nullptr;
std::string endpoint;
if (is_meta_service_endpoint_list) {
if (is_meta_service_endpoint_list()) {
endpoint = fmt::format("list://{}", config::meta_service_endpoint);
load_balancer_name = "random";
} else {
Expand Down Expand Up @@ -215,7 +216,7 @@ class MetaServiceProxy {
bool is_idle_timeout(long now) {
auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
// idle timeout only works without list endpoint.
return !is_meta_service_endpoint_list && idle_timeout_ms > 0 &&
return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Expand Down Expand Up @@ -243,7 +244,7 @@ class MetaServiceProxy {

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
Expand All @@ -262,16 +263,12 @@ class MetaServiceProxy {
return Status::OK();
}

static std::atomic_bool is_meta_service_endpoint_list;

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;
};

std::atomic_bool MetaServiceProxy::is_meta_service_endpoint_list = false;

template <typename T, typename... Ts>
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};

Expand Down
44 changes: 32 additions & 12 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,10 +717,42 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
rowset_writer->num_rows() > 0) {
DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
return Status::InternalError<false>("injected update_tmp_rowset error.");
});
const auto& rowset_meta = rowset->rowset_meta();
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}

RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
auto sleep_sec = dp->param<int>("sleep", 5);
std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
});

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
auto retry = dp->param<bool>("retry", false);
if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"injected DELETE_BITMAP_LOCK_ERROR");
} else {
return Status::InternalError<false>("injected non-retryable error");
}
});

return Status::OK();
}

Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id) {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
Expand All @@ -731,18 +763,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
iter->second);
}
}

auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, ms_lock_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}

Expand Down Expand Up @@ -870,9 +893,6 @@ Status CloudTablet::sync_meta() {
}
return st;
}
if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible
return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id());
}

auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ class CloudTablet final : public BaseTablet {
const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id = -1) override;

Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, int64_t lock_id);

Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion,
Expand Down
3 changes: 0 additions & 3 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,6 @@ void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) {

for (auto& weak_tablet : weak_tablets) {
if (auto tablet = weak_tablet.lock()) {
if (tablet->tablet_state() != TABLET_RUNNING) {
continue;
}
int64_t last_sync_time = tablet->last_sync_time_s;
if (last_sync_time <= last_sync_time_bound) {
sync_time_tablet_set.emplace(last_sync_time, weak_tablet);
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ DEFINE_Bool(hide_webserver_config_page, "false");
DEFINE_Bool(enable_segcompaction, "true");

// Max number of segments allowed in a single segcompaction task.
DEFINE_Int32(segcompaction_batch_size, "10");
DEFINE_mInt32(segcompaction_batch_size, "10");

// Max row count allowed in a single source segment, bigger segments will be skipped.
DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ DECLARE_Bool(hide_webserver_config_page);
DECLARE_Bool(enable_segcompaction);

// Max number of segments allowed in a single segcompaction task.
DECLARE_Int32(segcompaction_batch_size);
DECLARE_mInt32(segcompaction_batch_size);

// Max row count allowed in a single source segment, bigger segments will be skipped.
DECLARE_Int32(segcompaction_candidate_max_rows);
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaActiveQueriesScanner::_s_tbls_columns = {
// name, type, size
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), true},
Expand Down Expand Up @@ -92,7 +94,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
_active_query_block->reserve(_block_rows_limit);

if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("active queries schema is not match for FE and BE");
}
Expand All @@ -119,7 +121,7 @@ Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* bl

if (_active_query_block == nullptr) {
RETURN_IF_ERROR(_get_active_queries_block_from_fe());
_total_rows = _active_query_block->rows();
_total_rows = (int)_active_query_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
Expand Down Expand Up @@ -76,7 +78,7 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_active_be_tasks_block(
_task_stats_block.get());
_total_rows = _task_stats_block->rows();
_total_rows = (int)_task_stats_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaCatalogMetaCacheStatsScanner::_s_tbls_columns = {
{"CATALOG_NAME", TYPE_STRING, sizeof(StringRef), true},
{"CACHE_NAME", TYPE_STRING, sizeof(StringRef), true},
Expand Down Expand Up @@ -86,7 +88,7 @@ Status SchemaCatalogMetaCacheStatsScanner::_get_meta_cache_from_fe() {
_block->reserve(_block_rows_limit);

if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>(
"catalog meta cache stats schema is not match for FE and BE");
Expand Down Expand Up @@ -115,7 +117,7 @@ Status SchemaCatalogMetaCacheStatsScanner::get_next_block_internal(vectorized::B

if (_block == nullptr) {
RETURN_IF_ERROR(_get_meta_cache_from_fe());
_total_rows = _block->rows();
_total_rows = (int)_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
12 changes: 7 additions & 5 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "vec/common/string_ref.h"

namespace doris {
#include "common/compile_check_begin.h"

class RuntimeState;

namespace vectorized {
Expand Down Expand Up @@ -411,7 +413,7 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
{
std::vector<StringRef> strs(columns_num);
int offset_index = 0;
int cur_table_index = _table_index - _desc_result.tables_offset.size();
int cur_table_index = int(_table_index - _desc_result.tables_offset.size());

for (int i = 0; i < columns_num; ++i) {
while (_desc_result.tables_offset[offset_index] <= i) {
Expand Down Expand Up @@ -609,14 +611,14 @@ Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
// EXTRA
{
StringRef str = StringRef("", 0);
std::vector<void*> datas(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, datas));
std::vector<void*> filled_values(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 17, filled_values));
}
// PRIVILEGES
{
StringRef str = StringRef("", 0);
std::vector<void*> datas(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, datas));
std::vector<void*> filled_values(columns_num, &str);
RETURN_IF_ERROR(fill_dest_column_for_range(block, 18, filled_values));
}
// COLUMN_COMMENT
{
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/schema_scanner/schema_file_cache_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaFileCacheStatisticsScanner::_s_tbls_columns = {
// name, type, size
Expand Down Expand Up @@ -68,7 +69,7 @@ Status SchemaFileCacheStatisticsScanner::get_next_block_internal(vectorized::Blo
_stats_block->reserve(_block_rows_limit);

ExecEnv::GetInstance()->file_cache_factory()->get_cache_stats_block(_stats_block.get());
_total_rows = _stats_block->rows();
_total_rows = (int)_stats_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_partitions_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

class RuntimeState;
namespace vectorized {
class Block;
Expand Down Expand Up @@ -138,7 +140,7 @@ Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) {
}
_partitions_block->reserve(_block_rows_limit);
if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("table options schema is not match for FE and BE");
}
Expand Down Expand Up @@ -178,7 +180,7 @@ Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* block
if (_db_index < _db_result.db_ids.size()) {
RETURN_IF_ERROR(get_onedb_info_from_fe(_db_result.db_ids[_db_index]));
_row_idx = 0; // reset row index so that it start filling for next block.
_total_rows = _partitions_block->rows();
_total_rows = (int)_partitions_block->rows();
_db_index++;
}
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/schema_scanner/schema_processlist_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaProcessListScanner::_s_processlist_columns = {
{"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
Expand Down Expand Up @@ -126,7 +127,7 @@ Status SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) {
datas[row_idx] = &int_vals[row_idx];
} else if (_s_processlist_columns[col_idx].type == TYPE_DATETIMEV2) {
auto* dv = reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(&int_vals[row_idx]);
if (!dv->from_date_str(column_value.data(), column_value.size(), -1,
if (!dv->from_date_str(column_value.data(), (int)column_value.size(), -1,
config::allow_zero_date)) {
return Status::InternalError(
"process list meet invalid data, column={}, data={}, reason={}",
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaRoutinesScanner::_s_tbls_columns = {
{"SPECIFIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
{"ROUTINE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
Expand Down Expand Up @@ -94,7 +96,7 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
}
_routines_block->reserve(_block_rows_limit);
if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
auto col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("routine table schema is not match for FE and BE");
}
Expand All @@ -121,7 +123,7 @@ Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block,

if (_routines_block == nullptr) {
RETURN_IF_ERROR(get_block_from_fe());
_total_rows = _routines_block->rows();
_total_rows = (int)_routines_block->rows();
}

if (_row_idx == _total_rows) {
Expand Down
Loading

0 comments on commit 7c3320d

Please sign in to comment.