Skip to content

Commit

Permalink
Merge branch 'master' into backup_cooldown_data_master_squash
Browse files Browse the repository at this point in the history
  • Loading branch information
justfortaste authored Dec 19, 2024
2 parents 60b9dc2 + 011d393 commit 065de6a
Show file tree
Hide file tree
Showing 498 changed files with 9,766 additions and 2,496 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ github:
- COMPILE (DORIS_COMPILE)
- Need_2_Approval
- Cloud UT (Doris Cloud UT)
- performance (Doris Performance)

required_pull_request_reviews:
dismiss_stale_reviews: true
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "agent/workload_group_listener.h"

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/workload_group_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <glog/logging.h>

#include "agent/topic_listener.h"
#include "runtime/exec_env.h"

namespace doris {

class ExecEnv;

class WorkloadGroupListener : public TopicListener {
public:
~WorkloadGroupListener() {}
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ Status CloudCumulativeCompaction::prepare_compact() {
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
update_cumulative_point();
if (!config::enable_sleep_between_delete_cumu_compaction) {
st = Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
"_last_delete_version.first not equal to -1");
}
}
return st;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>() &&
st.msg() != "_last_delete_version.first not equal to -1") {
// Backoff strategy if no suitable version
tablet->last_cumu_no_suitable_version_ms = now;
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");
// whether to download small files in batch
DEFINE_mBool(enable_batch_download, "false");
DEFINE_mBool(enable_batch_download, "true");

DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
Expand Down Expand Up @@ -1211,7 +1211,7 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_Bool(enable_flush_file_cache_async, "true");

// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_String(doris_cgroup_cpu_path, "");

DEFINE_mBool(enable_be_proc_monitor, "false");
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");
Expand Down Expand Up @@ -1402,6 +1402,7 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");
// Enable validation to check the correctness of table size.
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

// clang-format off
#ifdef BE_TEST
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ DECLARE_mInt32(tablet_schema_cache_capacity);
DECLARE_mBool(exit_on_exception);

// cgroup
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_String(doris_cgroup_cpu_path);
DECLARE_mBool(enable_be_proc_monitor);
DECLARE_mInt32(be_proc_monitor_interval_ms);
DECLARE_Int32(workload_group_metrics_interval_ms);
Expand Down Expand Up @@ -1487,6 +1487,8 @@ DECLARE_Bool(force_regenerate_rowsetid_on_start_error);
DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
// Enable validation to check the correctness of table size.
DECLARE_Bool(enable_table_size_correctness_check);
// Enable sleep 5s between delete cumulative compaction.
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ void Daemon::calculate_metrics_thread() {
// update lst map
DorisMetrics::instance()->system_metrics()->get_network_traffic(
&lst_net_send_bytes, &lst_net_receive_bytes);

DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num();
}
update_rowsets_and_segments_num_metrics();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class [[nodiscard]] Status {
// and another thread is call to_string method, it may core, because the _err_msg is an unique ptr and
// it is deconstructed during copy method.
// And also we could not use lock, because we need get status frequently to check if it is cancelled.
// The defaule value is ok.
// The default value is ok.
class AtomicStatus {
public:
AtomicStatus() : error_st_(Status::OK()) {}
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/version_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ int doris_build_version_minor() {
int doris_build_version_patch() {
return DORIS_BUILD_VERSION_PATCH;
}
int doris_build_version_hotfix() {
return DORIS_BUILD_VERSION_HOTFIX;
}
const char* doris_build_version_rc_version() {
return DORIS_BUILD_VERSION_RC_VERSION;
}
Expand All @@ -56,4 +59,4 @@ const char* doris_build_info() {

} // namespace version

} // namespace doris
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/common/version_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern const char* doris_build_version_prefix();
extern int doris_build_version_major();
extern int doris_build_version_minor();
extern int doris_build_version_patch();
extern int doris_build_version_hotfix();
extern const char* doris_build_version_rc_version();

extern const char* doris_build_version();
Expand All @@ -34,4 +35,4 @@ extern const char* doris_build_info();

} // namespace version

} // namespace doris
} // namespace doris
24 changes: 12 additions & 12 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,15 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
auto* output_ptr = output;

while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-lz4 decompress, input_len=$0", input_len));
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

//if faild, fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
Expand Down Expand Up @@ -609,15 +609,15 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
auto* output_ptr = output;

while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-snappy decompress, input_len=$0", input_len));
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

//if faild, fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
Expand Down
6 changes: 3 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,12 @@ class RuntimePredicateWrapper {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (!_context->hybrid_set) {
_context->ignored = true;
set_ignored();
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
set_ignored();
// release in filter
_context->hybrid_set.reset();
}
Expand Down Expand Up @@ -1337,7 +1337,7 @@ void IRuntimeFilter::set_synced_size(uint64_t global_size) {
}

void IRuntimeFilter::set_ignored() {
_wrapper->_context->ignored = true;
_wrapper->set_ignored();
}

bool IRuntimeFilter::get_ignored() {
Expand Down
15 changes: 13 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,19 @@ void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
if (tablet->tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
// now CCR not support for variant + index v1
constexpr std::string_view index_suffix = "";
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
std::stoll(segment_index_id), index_suffix);
} else {
DCHECK(segment_index_id == "-1");
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
}
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct FileCacheProfile {
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
RuntimeProfile::Counter* num_inverted_index_remote_io_total = nullptr;
RuntimeProfile::Counter* local_io_timer = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
Expand All @@ -90,6 +91,8 @@ struct FileCacheProfileReporter {
cache_profile, 1);
num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
num_inverted_index_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "NumInvertedIndexRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1);
remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1);
write_cache_io_timer =
Expand All @@ -107,6 +110,8 @@ struct FileCacheProfileReporter {
void update(const FileCacheStatistics* statistics) const {
COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total);
COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total);
COUNTER_UPDATE(num_inverted_index_remote_io_total,
statistics->num_inverted_index_remote_io_total);
COUNTER_UPDATE(local_io_timer, statistics->local_io_timer);
COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer);
COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer);
Expand Down
8 changes: 6 additions & 2 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
_update_state(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
Expand Down Expand Up @@ -312,14 +312,18 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}

void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
FileCacheStatistics* statis) const {
FileCacheStatistics* statis,
bool is_inverted_index) const {
if (statis == nullptr) {
return;
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
if (is_inverted_index) {
statis->num_inverted_index_remote_io_total++;
}
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class CachedRemoteFileReader final : public FileReader {
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
};
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state) const;
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
};

} // namespace doris::io
10 changes: 5 additions & 5 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) {
using namespace Aws::Http;
switch (err.GetResponseCode()) {
case HttpResponseCode::NOT_FOUND:
return Status::Error<NOT_FOUND, false>("{}: {} {} type={}, request_id={}", msg,
err.GetExceptionName(), err.GetMessage(),
return Status::Error<NOT_FOUND, false>("{}: {} {} code=NOT_FOUND, type={}, request_id={}",
msg, err.GetExceptionName(), err.GetMessage(),
err.GetErrorType(), err.GetRequestId());
case HttpResponseCode::FORBIDDEN:
return Status::Error<PERMISSION_DENIED, false>("{}: {} {} type={}, request_id={}", msg,
err.GetExceptionName(), err.GetMessage(),
err.GetErrorType(), err.GetRequestId());
return Status::Error<PERMISSION_DENIED, false>(
"{}: {} {} code=FORBIDDEN, type={}, request_id={}", msg, err.GetExceptionName(),
err.GetMessage(), err.GetErrorType(), err.GetRequestId());
default:
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"{}: {} {} code={} type={}, request_id={}", msg, err.GetExceptionName(),
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace io {
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
int64_t num_inverted_index_remote_io_total = 0;
int64_t local_io_timer = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
Expand All @@ -60,6 +61,7 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
bool is_inverted_index = false;
};

} // namespace io
Expand Down
Loading

0 comments on commit 065de6a

Please sign in to comment.