Skip to content

Commit

Permalink
Merge branch 'master' into show_data_types
Browse files Browse the repository at this point in the history
  • Loading branch information
msridhar78 authored Dec 18, 2024
2 parents fa8221d + 011d393 commit 5417fcc
Show file tree
Hide file tree
Showing 293 changed files with 6,756 additions and 1,622 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ github:
- COMPILE (DORIS_COMPILE)
- Need_2_Approval
- Cloud UT (Doris Cloud UT)
- performance (Doris Performance)

required_pull_request_reviews:
dismiss_stale_reviews: true
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "agent/workload_group_listener.h"

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/workload_group_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <glog/logging.h>

#include "agent/topic_listener.h"
#include "runtime/exec_env.h"

namespace doris {

class ExecEnv;

class WorkloadGroupListener : public TopicListener {
public:
~WorkloadGroupListener() {}
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class [[nodiscard]] Status {
// and another thread is call to_string method, it may core, because the _err_msg is an unique ptr and
// it is deconstructed during copy method.
// And also we could not use lock, because we need get status frequently to check if it is cancelled.
// The defaule value is ok.
// The default value is ok.
class AtomicStatus {
public:
AtomicStatus() : error_st_(Status::OK()) {}
Expand Down
5 changes: 4 additions & 1 deletion be/src/common/version_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ int doris_build_version_minor() {
int doris_build_version_patch() {
return DORIS_BUILD_VERSION_PATCH;
}
int doris_build_version_hotfix() {
return DORIS_BUILD_VERSION_HOTFIX;
}
const char* doris_build_version_rc_version() {
return DORIS_BUILD_VERSION_RC_VERSION;
}
Expand All @@ -56,4 +59,4 @@ const char* doris_build_info() {

} // namespace version

} // namespace doris
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/common/version_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern const char* doris_build_version_prefix();
extern int doris_build_version_major();
extern int doris_build_version_minor();
extern int doris_build_version_patch();
extern int doris_build_version_hotfix();
extern const char* doris_build_version_rc_version();

extern const char* doris_build_version();
Expand All @@ -34,4 +35,4 @@ extern const char* doris_build_info();

} // namespace version

} // namespace doris
} // namespace doris
6 changes: 3 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,12 @@ class RuntimePredicateWrapper {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (!_context->hybrid_set) {
_context->ignored = true;
set_ignored();
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
set_ignored();
// release in filter
_context->hybrid_set.reset();
}
Expand Down Expand Up @@ -1337,7 +1337,7 @@ void IRuntimeFilter::set_synced_size(uint64_t global_size) {
}

void IRuntimeFilter::set_ignored() {
_wrapper->_context->ignored = true;
_wrapper->set_ignored();
}

bool IRuntimeFilter::get_ignored() {
Expand Down
15 changes: 13 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,19 @@ void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
if (tablet->tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
// now CCR not support for variant + index v1
constexpr std::string_view index_suffix = "";
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
std::stoll(segment_index_id), index_suffix);
} else {
DCHECK(segment_index_id == "-1");
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
}
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void memory_info_handler(std::stringstream* output) {
auto* _opaque = static_cast<std::string*>(opaque);
_opaque->append(buf);
};
malloc_stats_print(write_cb, &tmp, "a");
jemalloc_stats_print(write_cb, &tmp, "a");
boost::replace_all(tmp, "\n", "<br>");
(*output) << tmp;
#else
Expand Down
10 changes: 5 additions & 5 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) {
using namespace Aws::Http;
switch (err.GetResponseCode()) {
case HttpResponseCode::NOT_FOUND:
return Status::Error<NOT_FOUND, false>("{}: {} {} type={}, request_id={}", msg,
err.GetExceptionName(), err.GetMessage(),
return Status::Error<NOT_FOUND, false>("{}: {} {} code=NOT_FOUND, type={}, request_id={}",
msg, err.GetExceptionName(), err.GetMessage(),
err.GetErrorType(), err.GetRequestId());
case HttpResponseCode::FORBIDDEN:
return Status::Error<PERMISSION_DENIED, false>("{}: {} {} type={}, request_id={}", msg,
err.GetExceptionName(), err.GetMessage(),
err.GetErrorType(), err.GetRequestId());
return Status::Error<PERMISSION_DENIED, false>(
"{}: {} {} code=FORBIDDEN, type={}, request_id={}", msg, err.GetExceptionName(),
err.GetMessage(), err.GetErrorType(), err.GetRequestId());
default:
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"{}: {} {} code={} type={}, request_id={}", msg, err.GetExceptionName(),
Expand Down
68 changes: 67 additions & 1 deletion be/src/olap/cumulative_compaction_time_series_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ namespace doris {

uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
uint32_t score = 0;
uint32_t level0_score = 0;
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();

int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
std::list<RowsetMetaSharedPtr> checked_rs_metas;
// NOTE: tablet._meta_lock is hold
auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
// check the base rowset and collect the rowsets of cumulative part
Expand All @@ -50,6 +53,12 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
} else {
// collect the rowsets of cumulative part
score += rs_meta->get_compaction_score();
if (rs_meta->compaction_level() == 0) {
level0_total_size += rs_meta->total_disk_size();
level0_score += rs_meta->get_compaction_score();
} else {
checked_rs_metas.push_back(rs_meta);
}
}
}

Expand All @@ -64,7 +73,64 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return 0;
}

return score;
// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (level0_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return score;
}

// Condition 3: level1 achieve compaction_goal_size
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
return a->version().first < b->version().first;
});
int32_t rs_meta_count = 0;
int64_t continuous_size = 0;
for (const auto& rs_meta : checked_rs_metas) {
rs_meta_count++;
continuous_size += rs_meta->total_disk_size();
if (rs_meta_count >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
return score;
}
} else if (score > 0) {
// If the compaction process has not been successfully executed,
// the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met
tablet->set_last_cumu_compaction_success_time(now);
}

// Condition 5: If there is a continuous set of empty rowsets, prioritize merging.
std::vector<RowsetSharedPtr> input_rowsets;
std::vector<RowsetSharedPtr> candidate_rowsets =
tablet->pick_candidate_rowsets_to_cumulative_compaction();
tablet->calc_consecutive_empty_rowsets(
&input_rowsets, candidate_rowsets,
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!input_rowsets.empty()) {
return score;
}

return 0;
}

void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
Expand Down
33 changes: 24 additions & 9 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/rowset/beta_rowset.h"

#include <ctype.h>
#include <errno.h>
#include <fmt/format.h>

#include <algorithm>
Expand Down Expand Up @@ -557,10 +558,6 @@ Status BetaRowset::add_to_binlog() {
}

const auto& fs = io::global_local_filesystem();

// all segments are in the same directory, so cache binlog_dir without multi times check
std::string binlog_dir;

auto segments_num = num_segments();
VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, segments_num={}",
rowset_id().to_string(), segments_num);
Expand All @@ -569,17 +566,25 @@ Status BetaRowset::add_to_binlog() {
std::vector<string> linked_success_files;
Defer remove_linked_files {[&]() { // clear linked files if errors happen
if (!status.ok()) {
LOG(WARNING) << "will delete linked success files due to error " << status;
LOG(WARNING) << "will delete linked success files due to error "
<< status.to_string_no_stack();
std::vector<io::Path> paths;
for (auto& file : linked_success_files) {
paths.emplace_back(file);
LOG(WARNING) << "will delete linked success file " << file << " due to error";
}
static_cast<void>(fs->batch_delete(paths));
LOG(WARNING) << "done delete linked success files due to error " << status;
LOG(WARNING) << "done delete linked success files due to error "
<< status.to_string_no_stack();
}
}};

// The publish_txn might fail even if the add_to_binlog success, so we need to check
// whether a file already exists before linking.
auto errno_is_file_exists = []() { return Errno::no() == EEXIST; };

// all segments are in the same directory, so cache binlog_dir without multi times check
std::string binlog_dir;
for (int i = 0; i < segments_num; ++i) {
auto seg_file = local_segment_path(_tablet_path, rowset_id().to_string(), i);

Expand All @@ -597,7 +602,7 @@ Status BetaRowset::add_to_binlog() {
(std::filesystem::path(binlog_dir) / std::filesystem::path(seg_file).filename())
.string();
VLOG_DEBUG << "link " << seg_file << " to " << binlog_file;
if (!fs->link_file(seg_file, binlog_file).ok()) {
if (!fs->link_file(seg_file, binlog_file).ok() && !errno_is_file_exists()) {
status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}",
seg_file, binlog_file, Errno::no());
return status;
Expand All @@ -614,7 +619,12 @@ Status BetaRowset::add_to_binlog() {
std::filesystem::path(index_file).filename())
.string();
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
RETURN_IF_ERROR(fs->link_file(index_file, binlog_index_file));
if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) {
status = Status::Error<OS_ERROR>(
"fail to create hard link. from={}, to={}, errno={}", index_file,
binlog_index_file, Errno::no());
return status;
}
linked_success_files.push_back(binlog_index_file);
}
} else {
Expand All @@ -625,7 +635,12 @@ Status BetaRowset::add_to_binlog() {
std::filesystem::path(index_file).filename())
.string();
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
RETURN_IF_ERROR(fs->link_file(index_file, binlog_index_file));
if (!fs->link_file(index_file, binlog_index_file).ok() && !errno_is_file_exists()) {
status = Status::Error<OS_ERROR>(
"fail to create hard link. from={}, to={}, errno={}", index_file,
binlog_index_file, Errno::no());
return status;
}
linked_success_files.push_back(binlog_index_file);
}
}
Expand Down
42 changes: 36 additions & 6 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,18 @@ Status MapFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr
size_t num_read = *n;
auto null_map_ptr =
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
bool null_signs_has_null = false;
RETURN_IF_ERROR(_null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null));
// in not-null to null linked-schemachange mode,
// actually we do not change dat data include meta in footer,
// so may dst from changed meta which is nullable but old data is not nullable,
// if so, we should set null_map to all null by default
if (_null_iterator) {
bool null_signs_has_null = false;
RETURN_IF_ERROR(
_null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null));
} else {
auto& null_map = assert_cast<vectorized::ColumnUInt8&>(*null_map_ptr);
null_map.insert_many_vals(0, num_read);
}
DCHECK(num_read == *n);
}
return Status::OK();
Expand Down Expand Up @@ -932,8 +942,18 @@ Status StructFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumn
size_t num_read = *n;
auto null_map_ptr =
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
bool null_signs_has_null = false;
RETURN_IF_ERROR(_null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null));
// in not-null to null linked-schemachange mode,
// actually we do not change dat data include meta in footer,
// so may dst from changed meta which is nullable but old data is not nullable,
// if so, we should set null_map to all null by default
if (_null_iterator) {
bool null_signs_has_null = false;
RETURN_IF_ERROR(
_null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null));
} else {
auto& null_map = assert_cast<vectorized::ColumnUInt8&>(*null_map_ptr);
null_map.insert_many_vals(0, num_read);
}
DCHECK(num_read == *n);
}

Expand Down Expand Up @@ -1086,8 +1106,18 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnP
auto null_map_ptr =
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
size_t num_read = *n;
bool null_signs_has_null = false;
RETURN_IF_ERROR(_null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null));
// in not-null to null linked-schemachange mode,
// actually we do not change dat data include meta in footer,
// so may dst from changed meta which is nullable but old data is not nullable,
// if so, we should set null_map to all null by default
if (_null_iterator) {
bool null_signs_has_null = false;
RETURN_IF_ERROR(
_null_iterator->next_batch(&num_read, null_map_ptr, &null_signs_has_null));
} else {
auto& null_map = assert_cast<vectorized::ColumnUInt8&>(*null_map_ptr);
null_map.insert_many_vals(0, num_read);
}
DCHECK(num_read == *n);
}

Expand Down
Loading

0 comments on commit 5417fcc

Please sign in to comment.