Skip to content

Commit

Permalink
[opt](index compaction)Optimize logic of picking columns for index co…
Browse files Browse the repository at this point in the history
…mpaction(#42051)

bp #42051
  • Loading branch information
qidaye committed Oct 22, 2024
1 parent e9cfbb5 commit 5dcf4b4
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 42 deletions.
61 changes: 23 additions & 38 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
Expand Down Expand Up @@ -370,10 +371,11 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// 2. write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Merger::Statistics stats;
// if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction.
// if ctx.columns_to_do_index_compaction.size() > 0, it means we need to do inverted index compaction.
// the row ID conversion matrix needs to be used for inverted index compaction.
if (ctx.skip_inverted_index.size() > 0 || (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
if (!ctx.columns_to_do_index_compaction.empty() ||
(_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
stats.rowid_conversion = &_rowid_conversion;
}
int64_t way_num = merge_way_num();
Expand Down Expand Up @@ -436,37 +438,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
RETURN_IF_ERROR(check_correctness(stats));

if (_input_row_num > 0 && stats.rowid_conversion && config::inverted_index_compaction_enable &&
!ctx.skip_inverted_index.empty()) {
!ctx.columns_to_do_index_compaction.empty()) {
OlapStopWatch inverted_watch;

// check rowid_conversion correctness
Version version = _tablet->max_version();
DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
// Convert the delete bitmap of the input rowsets to output rowset.
std::size_t missed_rows_size = 0;
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (!allow_delete_in_cumu_compaction()) {
missed_rows_size = missed_rows.size();
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->tablet_state() == TABLET_RUNNING &&
stats.merged_rows != missed_rows_size) {
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
stats.merged_rows, missed_rows_size, _tablet->tablet_id(),
_tablet->table_id());
DCHECK(false) << err_msg;
LOG(WARNING) << err_msg;
}
}

RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));

// translation vec
// <<dest_idx_num, dest_docId>>
// the first level vector: index indicates src segment.
Expand Down Expand Up @@ -649,7 +623,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
};

Status status = Status::OK();
for (auto&& column_uniq_id : ctx.skip_inverted_index) {
for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->get_inverted_index(col);

Expand Down Expand Up @@ -808,7 +782,19 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
InvertedIndexStorageFormatPB::V1) {
for (const auto& index : _cur_tablet_schema->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
auto col_unique_id = index.col_unique_ids()[0];
auto col_unique_ids = index.col_unique_ids();
// check if column unique ids is empty to avoid crash
if (col_unique_ids.empty()) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] index[" << index.index_id()
<< "] has no column unique id, will skip index compaction."
<< " tablet_schema=" << _cur_tablet_schema->dump_full_schema();
continue;
}
auto col_unique_id = col_unique_ids[0];
// Avoid doing inverted index compaction on non-slice type columns
if (!field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
continue;
}
//NOTE: here src_rs may be in building index progress, so it would not contain inverted index info.
bool all_have_inverted_index = std::all_of(
_input_rowsets.begin(), _input_rowsets.end(), [&](const auto& src_rs) {
Expand Down Expand Up @@ -892,7 +878,7 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
reader->close();

// why is 3?
// bkd index will write at least 3 files
// slice type index file at least has 3 files: null_bitmap, segments_N, segments.gen
if (files.size() < 3) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id()
<< "] column_unique_id[" << col_unique_id << "],"
Expand All @@ -905,9 +891,8 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
return true;
return true;
});
if (all_have_inverted_index &&
field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
ctx.skip_inverted_index.insert(col_unique_id);
if (all_have_inverted_index) {
ctx.columns_to_do_index_compaction.insert(col_unique_id);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ struct RowsetWriterContext {

int64_t newest_write_timestamp = -1;
bool enable_unique_key_merge_on_write = false;
// store column_unique_id to skip write inverted index
std::set<int32_t> skip_inverted_index;
// store column_unique_id to do index compaction
std::set<int32_t> columns_to_do_index_compaction;
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
BaseTabletSPtr tablet = nullptr;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
if (_opts.rowset_ctx != nullptr) {
// skip write inverted index for index compaction
skip_inverted_index =
_opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0;
_opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// skip write inverted index for index compaction
skip_inverted_index = _opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0;
skip_inverted_index = _opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
Expand Down

0 comments on commit 5dcf4b4

Please sign in to comment.