Skip to content

Commit

Permalink
Merge branch 'master' into enhancement_nereids_show-catalog-support
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao-MR authored Nov 28, 2024
2 parents a022d11 + b7d985d commit b4b035b
Show file tree
Hide file tree
Showing 97 changed files with 2,978 additions and 463 deletions.
26 changes: 21 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,15 +662,28 @@ Status Compaction::do_inverted_index_compaction() {
try {
std::vector<std::unique_ptr<DorisCompoundReader>> src_idx_dirs(src_segment_num);
for (int src_segment_id = 0; src_segment_id < src_segment_num; src_segment_id++) {
src_idx_dirs[src_segment_id] =
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
auto res = inverted_index_file_readers[src_segment_id]->open(index_meta);
DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_reader", {
res = ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: Compaction::open_index_file_reader error"));
})
if (!res.has_value()) {
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
src_idx_dirs[src_segment_id] = std::move(res.value());
}
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; dest_segment_id++) {
auto dest_dir =
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
auto res = inverted_index_file_writers[dest_segment_id]->open(index_meta);
DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_writer", {
res = ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: Compaction::open_inverted_index_file_writer error"));
})
if (!res.has_value()) {
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
// Destination directories in dest_index_dirs do not need to be deconstructed,
// but their lifecycle must be managed by inverted_index_file_writers.
dest_index_dirs[dest_segment_id] = dest_dir.get();
dest_index_dirs[dest_segment_id] = res.value().get();
}
auto st = compact_column(index_meta->index_id(), src_idx_dirs, dest_index_dirs,
index_tmp_path.native(), trans_vec, dest_segment_num_rows);
Expand All @@ -681,6 +694,9 @@ Status Compaction::do_inverted_index_compaction() {
} catch (CLuceneError& e) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
} catch (const Exception& e) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
<< ", delete_sign_idx=" << delete_sign_idx;
// for duplicate no keys
if (!key_columns.empty()) {
column_groups->emplace_back(std::move(key_columns));
column_groups->emplace_back(key_columns);
}

std::vector<uint32_t> value_columns;
Expand Down
140 changes: 135 additions & 5 deletions be/src/olap/metadata_adder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
#include <bvar/bvar.h>
#include <stdint.h>

#include "util/runtime_profile.h"

namespace doris {

inline bvar::Adder<int64_t> g_rowset_meta_mem_bytes("doris_rowset_meta_mem_bytes");
inline bvar::Adder<int64_t> g_rowset_meta_num("doris_rowset_meta_num");

inline bvar::Adder<int64_t> g_all_rowsets_mem_bytes("doris_all_rowsets_mem_bytes");
inline bvar::Adder<int64_t> g_all_rowsets_num("doris_all_rowsets_num");

inline bvar::Adder<int64_t> g_tablet_meta_mem_bytes("doris_tablet_meta_mem_bytes");
inline bvar::Adder<int64_t> g_tablet_meta_num("doris_tablet_meta_num");

Expand All @@ -37,8 +42,8 @@ inline bvar::Adder<int64_t> g_tablet_index_num("doris_tablet_index_num");
inline bvar::Adder<int64_t> g_tablet_schema_mem_bytes("doris_tablet_schema_mem_bytes");
inline bvar::Adder<int64_t> g_tablet_schema_num("doris_tablet_schema_num");

inline bvar::Adder<int64_t> g_segment_mem_bytes("doris_segment_mem_bytes");
inline bvar::Adder<int64_t> g_segment_num("doris_segment_num");
inline bvar::Adder<int64_t> g_all_segments_mem_bytes("doris_all_segments_mem_bytes");
inline bvar::Adder<int64_t> g_all_segments_num("doris_all_segments_num");

inline bvar::Adder<int64_t> g_column_reader_mem_bytes("doris_column_reader_mem_bytes");
inline bvar::Adder<int64_t> g_column_reader_num("doris_column_reader_num");
Expand Down Expand Up @@ -104,6 +109,8 @@ class MetadataAdder {
public:
MetadataAdder();

static void dump_metadata_object(RuntimeProfile* object_heap_dump_snapshot);

protected:
MetadataAdder(const MetadataAdder& other);

Expand Down Expand Up @@ -159,6 +166,8 @@ void MetadataAdder<T>::add_mem_size(int64_t val) {
}
if constexpr (std::is_same_v<T, RowsetMeta>) {
g_rowset_meta_mem_bytes << val;
} else if constexpr (std::is_same_v<T, Rowset>) {
g_all_rowsets_mem_bytes << val;
} else if constexpr (std::is_same_v<T, TabletMeta>) {
g_tablet_meta_mem_bytes << val;
} else if constexpr (std::is_same_v<T, TabletColumn>) {
Expand All @@ -168,7 +177,7 @@ void MetadataAdder<T>::add_mem_size(int64_t val) {
} else if constexpr (std::is_same_v<T, TabletSchema>) {
g_tablet_schema_mem_bytes << val;
} else if constexpr (std::is_same_v<T, segment_v2::Segment>) {
g_segment_mem_bytes << val;
g_all_segments_mem_bytes << val;
} else if constexpr (std::is_same_v<T, segment_v2::ColumnReader>) {
g_column_reader_mem_bytes << val;
} else if constexpr (std::is_same_v<T, segment_v2::BitmapIndexReader>) {
Expand All @@ -185,6 +194,9 @@ void MetadataAdder<T>::add_mem_size(int64_t val) {
g_ordinal_index_reader_mem_bytes << val;
} else if constexpr (std::is_same_v<T, segment_v2::ZoneMapIndexReader>) {
g_zone_map_index_reader_mem_bytes << val;
} else {
LOG(FATAL) << "add_mem_size not match class type: " << typeid(T).name() << ", " << val;
__builtin_unreachable();
}
}

Expand All @@ -195,6 +207,8 @@ void MetadataAdder<T>::add_num(int64_t val) {
}
if constexpr (std::is_same_v<T, RowsetMeta>) {
g_rowset_meta_num << val;
} else if constexpr (std::is_same_v<T, Rowset>) {
g_all_rowsets_num << val;
} else if constexpr (std::is_same_v<T, TabletMeta>) {
g_tablet_meta_num << val;
} else if constexpr (std::is_same_v<T, TabletColumn>) {
Expand All @@ -204,7 +218,7 @@ void MetadataAdder<T>::add_num(int64_t val) {
} else if constexpr (std::is_same_v<T, TabletSchema>) {
g_tablet_schema_num << val;
} else if constexpr (std::is_same_v<T, segment_v2::Segment>) {
g_segment_num << val;
g_all_segments_num << val;
} else if constexpr (std::is_same_v<T, segment_v2::ColumnReader>) {
g_column_reader_num << val;
} else if constexpr (std::is_same_v<T, segment_v2::BitmapIndexReader>) {
Expand All @@ -221,7 +235,123 @@ void MetadataAdder<T>::add_num(int64_t val) {
g_ordinal_index_reader_num << val;
} else if constexpr (std::is_same_v<T, segment_v2::ZoneMapIndexReader>) {
g_zone_map_index_reader_num << val;
} else {
LOG(FATAL) << "add_num not match class type: " << typeid(T).name() << ", " << val;
__builtin_unreachable();
}
}

}; // namespace doris
template <typename T>
void MetadataAdder<T>::dump_metadata_object(RuntimeProfile* object_heap_dump_snapshot) {
RuntimeProfile::Counter* rowset_meta_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "RowsetMetaMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* rowset_meta_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "RowsetMetaNum", TUnit::UNIT);
COUNTER_SET(rowset_meta_mem_bytes_counter, g_rowset_meta_mem_bytes.get_value());
COUNTER_SET(rowset_meta_num_counter, g_rowset_meta_num.get_value());

RuntimeProfile::Counter* all_rowsets_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "AllRowsetsMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* all_rowsets_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "AllRowsetsNum", TUnit::UNIT);
COUNTER_SET(all_rowsets_mem_bytes_counter, g_all_rowsets_mem_bytes.get_value());
COUNTER_SET(all_rowsets_num_counter, g_all_rowsets_num.get_value());

RuntimeProfile::Counter* tablet_meta_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletMetaMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* tablet_meta_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletMetaNum", TUnit::UNIT);
COUNTER_SET(tablet_meta_mem_bytes_counter, g_tablet_meta_mem_bytes.get_value());
COUNTER_SET(tablet_meta_num_counter, g_tablet_meta_num.get_value());

RuntimeProfile::Counter* tablet_column_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletColumnMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* tablet_column_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletColumnNum", TUnit::UNIT);
COUNTER_SET(tablet_column_mem_bytes_counter, g_tablet_column_mem_bytes.get_value());
COUNTER_SET(tablet_column_num_counter, g_tablet_column_num.get_value());

RuntimeProfile::Counter* tablet_index_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletIndexMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* tablet_index_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletIndexNum", TUnit::UNIT);
COUNTER_SET(tablet_index_mem_bytes_counter, g_tablet_index_mem_bytes.get_value());
COUNTER_SET(tablet_index_num_counter, g_tablet_index_num.get_value());

RuntimeProfile::Counter* tablet_schema_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletSchemaMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* tablet_schema_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "TabletSchemaNum", TUnit::UNIT);
COUNTER_SET(tablet_schema_mem_bytes_counter, g_tablet_schema_mem_bytes.get_value());
COUNTER_SET(tablet_schema_num_counter, g_tablet_schema_num.get_value());

RuntimeProfile::Counter* all_segments_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "AllSegmentsMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* all_segments_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "AllSegmentsNum", TUnit::UNIT);
COUNTER_SET(all_segments_mem_bytes_counter, g_all_segments_mem_bytes.get_value());
COUNTER_SET(all_segments_num_counter, g_all_segments_num.get_value());

RuntimeProfile::Counter* column_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "ColumnReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* column_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "ColumnReaderNum", TUnit::UNIT);
COUNTER_SET(column_reader_mem_bytes_counter, g_column_reader_mem_bytes.get_value());
COUNTER_SET(column_reader_num_counter, g_column_reader_num.get_value());

RuntimeProfile::Counter* bitmap_index_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "BitmapIndexReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* bitmap_index_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "BitmapIndexReaderNum", TUnit::UNIT);
COUNTER_SET(bitmap_index_reader_mem_bytes_counter, g_bitmap_index_reader_mem_bytes.get_value());
COUNTER_SET(bitmap_index_reader_num_counter, g_bitmap_index_reader_num.get_value());

RuntimeProfile::Counter* bloom_filter_index_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "BloomFilterIndexReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* filter_index_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "BloomFilterIndexReaderNum", TUnit::UNIT);
COUNTER_SET(bloom_filter_index_reader_mem_bytes_counter,
g_bloom_filter_index_reader_mem_bytes.get_value());
COUNTER_SET(filter_index_reader_num_counter, g_bloom_filter_index_reader_num.get_value());

RuntimeProfile::Counter* index_page_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "IndexPageReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* index_page_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "IndexPageReaderNum", TUnit::UNIT);
COUNTER_SET(index_page_reader_mem_bytes_counter, g_index_page_reader_mem_bytes.get_value());
COUNTER_SET(index_page_reader_num_counter, g_index_page_reader_num.get_value());

RuntimeProfile::Counter* indexed_column_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "IndexedColumnReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* indexed_column_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "IndexedColumnReaderNum", TUnit::UNIT);
COUNTER_SET(indexed_column_reader_mem_bytes_counter,
g_indexed_column_reader_mem_bytes.get_value());
COUNTER_SET(indexed_column_reader_num_counter, g_indexed_column_reader_num.get_value());

RuntimeProfile::Counter* inverted_index_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "InvertedIndexReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* inverted_index_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "InvertedIndexReaderNum", TUnit::UNIT);
COUNTER_SET(inverted_index_reader_mem_bytes_counter,
g_inverted_index_reader_mem_bytes.get_value());
COUNTER_SET(inverted_index_reader_num_counter, g_inverted_index_reader_num.get_value());

RuntimeProfile::Counter* ordinal_index_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "OrdinalIndexReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* ordinal_index_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "OrdinalIndexReaderNum", TUnit::UNIT);
COUNTER_SET(ordinal_index_reader_mem_bytes_counter,
g_ordinal_index_reader_mem_bytes.get_value());
COUNTER_SET(ordinal_index_reader_num_counter, g_ordinal_index_reader_num.get_value());

RuntimeProfile::Counter* zone_map_index_reader_mem_bytes_counter =
ADD_COUNTER(object_heap_dump_snapshot, "ZoneMapIndexReaderMemBytes", TUnit::BYTES);
RuntimeProfile::Counter* zone_map_index_reader_num_counter =
ADD_COUNTER(object_heap_dump_snapshot, "ZoneMapIndexReaderNum", TUnit::UNIT);
COUNTER_SET(zone_map_index_reader_mem_bytes_counter,
g_zone_map_index_reader_mem_bytes.get_value());
COUNTER_SET(zone_map_index_reader_num_counter, g_zone_map_index_reader_num.get_value());
}

}; // namespace doris
7 changes: 0 additions & 7 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

namespace doris {

static bvar::Adder<size_t> g_total_rowset_num("doris_total_rowset_num");

Rowset::Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta,
std::string tablet_path)
: _rowset_meta(std::move(rowset_meta)),
Expand Down Expand Up @@ -56,11 +54,6 @@ Rowset::Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta,
}
// build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema
_schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema;
g_total_rowset_num << 1;
}

Rowset::~Rowset() {
g_total_rowset_num << -1;
}

Status Rowset::load(bool use_cache) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include "common/logging.h"
#include "common/status.h"
#include "olap/metadata_adder.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -116,10 +117,8 @@ class RowsetStateMachine {
RowsetState _rowset_state;
};

class Rowset : public std::enable_shared_from_this<Rowset> {
class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder<Rowset> {
public:
virtual ~Rowset();

// Open all segment files in this rowset and load necessary metadata.
// - `use_cache` : whether to use fd cache, only applicable to alpha rowset now
//
Expand Down
10 changes: 9 additions & 1 deletion be/src/runtime/process_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>

#include "olap/metadata_adder.h"
#include "runtime/memory/memory_profile.h"

namespace doris {
Expand All @@ -37,8 +38,15 @@ void ProcessProfile::refresh_profile() {
std::unique_ptr<RuntimeProfile> process_profile =
std::make_unique<RuntimeProfile>("ProcessProfile");
_memory_profile->make_memory_profile(process_profile.get());
_process_profile.set(std::move(process_profile));
// TODO make other profile

// 3. dump object heap
RuntimeProfile* object_heap_dump_snapshot =
process_profile->create_child("ObjectHeapDump", true, false);
MetadataAdder<ProcessProfile>::dump_metadata_object(object_heap_dump_snapshot);
// TODO dump other object (block, column, etc.)

_process_profile.set(std::move(process_profile));
}

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/runtime/workload_management/workload_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
std::stringstream msg;
msg << "query " << query_info->query_id
<< " cancelled by workload policy: " << query_info->policy_name
<< ", id:" << query_info->policy_id;
<< ", id:" << query_info->policy_id << ", " << query_info->cond_eval_msg;
std::string msg_str = msg.str();
LOG(INFO) << "[workload_schedule]" << msg_str;
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_info->tquery_id,
Expand Down
20 changes: 20 additions & 0 deletions be/src/runtime/workload_management/workload_condition.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class WorkloadCondition {
virtual bool eval(std::string str_val) = 0;

virtual WorkloadMetricType get_workload_metric_type() = 0;

virtual std::string get_metric_string() = 0;

virtual std::string get_metric_value_string() = 0;
};

class WorkloadConditionQueryTime : public WorkloadCondition {
Expand All @@ -45,6 +49,10 @@ class WorkloadConditionQueryTime : public WorkloadCondition {
return WorkloadMetricType::QUERY_TIME;
}

std::string get_metric_string() override { return "query_time"; }

std::string get_metric_value_string() override { return std::to_string(_query_time); }

private:
int64_t _query_time;
WorkloadCompareOperator _op;
Expand All @@ -56,6 +64,10 @@ class WorkloadConditionScanRows : public WorkloadCondition {
bool eval(std::string str_val) override;
WorkloadMetricType get_workload_metric_type() override { return WorkloadMetricType::SCAN_ROWS; }

std::string get_metric_string() override { return "scan_rows"; }

std::string get_metric_value_string() override { return std::to_string(_scan_rows); }

private:
int64_t _scan_rows;
WorkloadCompareOperator _op;
Expand All @@ -69,6 +81,10 @@ class WorkloadConditionScanBytes : public WorkloadCondition {
return WorkloadMetricType::SCAN_BYTES;
}

std::string get_metric_string() override { return "scan_bytes"; }

std::string get_metric_value_string() override { return std::to_string(_scan_bytes); }

private:
int64_t _scan_bytes;
WorkloadCompareOperator _op;
Expand All @@ -82,6 +98,10 @@ class WorkloadConditionQueryMemory : public WorkloadCondition {
return WorkloadMetricType::QUERY_MEMORY_BYTES;
}

std::string get_metric_string() override { return "query_memory"; }

std::string get_metric_value_string() override { return std::to_string(_query_memory_bytes); }

private:
int64_t _query_memory_bytes;
WorkloadCompareOperator _op;
Expand Down
Loading

0 comments on commit b4b035b

Please sign in to comment.