Skip to content

Commit

Permalink
[opt](index compaction)Optimize logic of picking columns for index co…
Browse files Browse the repository at this point in the history
…mpaction (apache#42051)

## Proposed changes

1. Rename `skip_inverted_index` to `columns_to_do_index_compaction` to
make it more clear.
2. Remove redundant rowid_conversion check in index compaction
3. Rename function `construct_skip_inverted_index` to
`construct_index_compaction_columns`
4. Check `index.col_unique_ids` empty before using to avoid crash and
log tablet schema for debug.
5. Avoid unnecessary bkd and variant index checks by advancing column
type checking before `has_inverted_index`
  • Loading branch information
qidaye authored Oct 18, 2024
1 parent 8c34574 commit 0512983
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 54 deletions.
73 changes: 26 additions & 47 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_writer.h"
Expand Down Expand Up @@ -174,10 +175,11 @@ Status Compaction::merge_input_rowsets() {

// write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
// if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction.
// if ctx.columns_to_do_index_compaction.size() > 0, it means we need to do inverted index compaction.
// the row ID conversion matrix needs to be used for inverted index compaction.
if (!ctx.skip_inverted_index.empty() || (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
if (!ctx.columns_to_do_index_compaction.empty() ||
(_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
_stats.rowid_conversion = _rowid_conversion.get();
}

Expand Down Expand Up @@ -482,46 +484,12 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
Status Compaction::do_inverted_index_compaction() {
const auto& ctx = _output_rs_writer->context();
if (!config::inverted_index_compaction_enable || _input_row_num <= 0 ||
!_stats.rowid_conversion || ctx.skip_inverted_index.empty()) {
!_stats.rowid_conversion || ctx.columns_to_do_index_compaction.empty()) {
return Status::OK();
}

OlapStopWatch inverted_watch;

int64_t cur_max_version = 0;
{
std::shared_lock rlock(_tablet->get_header_lock());
cur_max_version = _tablet->max_version_unlocked();
}

DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
// Convert the delete bitmap of the input rowsets to output rowset.
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, *_rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map,
_tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap);

if (!_allow_delete_in_cumu_compaction) {
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_stats.merged_rows != missed_rows.size() && _tablet->tablet_state() == TABLET_RUNNING) {
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
_stats.merged_rows, missed_rows.size(), _tablet->tablet_id(),
_tablet->table_id());
if (config::enable_mow_compaction_correctness_check_core) {
CHECK(false) << err_msg;
} else {
DCHECK(false) << err_msg;
}
// log here just for debugging, do not return error
LOG(WARNING) << err_msg;
}
}

RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));

// translation vec
// <<dest_idx_num, dest_docId>>
// the first level vector: index indicates src segment.
Expand Down Expand Up @@ -718,7 +686,7 @@ Status Compaction::do_inverted_index_compaction() {
};

Status status = Status::OK();
for (auto&& column_uniq_id : ctx.skip_inverted_index) {
for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->get_inverted_index(col);

Expand Down Expand Up @@ -809,13 +777,25 @@ Status Compaction::do_inverted_index_compaction() {
return Status::OK();
}

void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
for (const auto& index : _cur_tablet_schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}

auto col_unique_id = index.col_unique_ids()[0];
auto col_unique_ids = index.col_unique_ids();
// check if column unique ids is empty to avoid crash
if (col_unique_ids.empty()) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] index[" << index.index_id()
<< "] has no column unique id, will skip index compaction."
<< " tablet_schema=" << _cur_tablet_schema->dump_full_schema();
continue;
}
auto col_unique_id = col_unique_ids[0];
// Avoid doing inverted index compaction on non-slice type columns
if (!field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
continue;
}
auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) {
auto* rowset = static_cast<BetaRowset*>(src_rs.get());
if (rowset->is_skip_index_compaction(col_unique_id)) {
Expand Down Expand Up @@ -877,7 +857,7 @@ void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
reader->close();

// why is 3?
// bkd index will write at least 3 files
// slice type index file at least has 3 files: null_bitmap, segments_N, segments.gen
if (files.size() < 3) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id["
<< col_unique_id << "]," << index_file_path
Expand All @@ -897,9 +877,8 @@ void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
bool all_have_inverted_index = std::all_of(_input_rowsets.begin(), _input_rowsets.end(),
std::move(has_inverted_index));

if (all_have_inverted_index &&
field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
ctx.skip_inverted_index.insert(col_unique_id);
if (all_have_inverted_index) {
ctx.columns_to_do_index_compaction.insert(col_unique_id);
}
}
}
Expand All @@ -912,7 +891,7 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx)
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
construct_skip_inverted_index(ctx);
construct_index_compaction_columns(ctx);
}
ctx.version = _output_version;
ctx.rowset_state = VISIBLE;
Expand Down Expand Up @@ -1209,7 +1188,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
construct_skip_inverted_index(ctx);
construct_index_compaction_columns(ctx);
}

// Use the storage resource of the previous rowset
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Compaction {

Status do_inverted_index_compaction();

void construct_skip_inverted_index(RowsetWriterContext& ctx);
void construct_index_compaction_columns(RowsetWriterContext& ctx);

virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) = 0;

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ struct RowsetWriterContext {

int64_t newest_write_timestamp = -1;
bool enable_unique_key_merge_on_write = false;
// store column_unique_id to skip write inverted index
std::set<int32_t> skip_inverted_index;
// store column_unique_id to do index compaction
std::set<int32_t> columns_to_do_index_compaction;
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
BaseTabletSPtr tablet = nullptr;

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,9 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& co
opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// skip write inverted index for index compaction
skip_inverted_index = _opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0;
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT && schema->skip_write_index_on_load()) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// skip write inverted index for index compaction
skip_inverted_index = _opts.rowset_ctx->skip_inverted_index.contains(column.unique_id());
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.contains(column.unique_id());
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
Expand Down

0 comments on commit 0512983

Please sign in to comment.