Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](index compaction)Optimize logic of picking columns for index compaction (#42051) #42285

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 26 additions & 47 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "olap/cumulative_compaction_policy.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/beta_rowset_writer.h"
Expand Down Expand Up @@ -172,10 +173,11 @@ Status Compaction::merge_input_rowsets() {

// write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
// if ctx.skip_inverted_index.size() > 0, it means we need to do inverted index compaction.
// if ctx.columns_to_do_index_compaction.size() > 0, it means we need to do inverted index compaction.
// the row ID conversion matrix needs to be used for inverted index compaction.
if (!ctx.skip_inverted_index.empty() || (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
if (!ctx.columns_to_do_index_compaction.empty() ||
(_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write())) {
_stats.rowid_conversion = &_rowid_conversion;
}

Expand Down Expand Up @@ -476,46 +478,12 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {
Status Compaction::do_inverted_index_compaction() {
const auto& ctx = _output_rs_writer->context();
if (!config::inverted_index_compaction_enable || _input_row_num <= 0 ||
!_stats.rowid_conversion || ctx.skip_inverted_index.empty()) {
!_stats.rowid_conversion || ctx.columns_to_do_index_compaction.empty()) {
return Status::OK();
}

OlapStopWatch inverted_watch;

int64_t cur_max_version = 0;
{
std::shared_lock rlock(_tablet->get_header_lock());
cur_max_version = _tablet->max_version_unlocked();
}

DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
// Convert the delete bitmap of the input rowsets to output rowset.
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, cur_max_version + 1, &missed_rows, &location_map,
_tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap);

if (!_allow_delete_in_cumu_compaction) {
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_stats.merged_rows != missed_rows.size() && _tablet->tablet_state() == TABLET_RUNNING) {
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
_stats.merged_rows, missed_rows.size(), _tablet->tablet_id(),
_tablet->table_id());
if (config::enable_mow_compaction_correctness_check_core) {
CHECK(false) << err_msg;
} else {
DCHECK(false) << err_msg;
}
// log here just for debugging, do not return error
LOG(WARNING) << err_msg;
}
}

RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));

// translation vec
// <<dest_idx_num, dest_docId>>
// the first level vector: index indicates src segment.
Expand Down Expand Up @@ -712,7 +680,7 @@ Status Compaction::do_inverted_index_compaction() {
};

Status status = Status::OK();
for (auto&& column_uniq_id : ctx.skip_inverted_index) {
for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->get_inverted_index(col);

Expand Down Expand Up @@ -803,13 +771,25 @@ Status Compaction::do_inverted_index_compaction() {
return Status::OK();
}

void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
for (const auto& index : _cur_tablet_schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}

auto col_unique_id = index.col_unique_ids()[0];
auto col_unique_ids = index.col_unique_ids();
// check if column unique ids is empty to avoid crash
if (col_unique_ids.empty()) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] index[" << index.index_id()
<< "] has no column unique id, will skip index compaction."
<< " tablet_schema=" << _cur_tablet_schema->dump_full_schema();
continue;
}
auto col_unique_id = col_unique_ids[0];
// Avoid doing inverted index compaction on non-slice type columns
if (!field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
continue;
}
auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) {
auto* rowset = static_cast<BetaRowset*>(src_rs.get());
if (rowset->is_skip_index_compaction(col_unique_id)) {
Expand Down Expand Up @@ -871,7 +851,7 @@ void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
reader->close();

// why is 3?
// bkd index will write at least 3 files
// slice type index file at least has 3 files: null_bitmap, segments_N, segments.gen
if (files.size() < 3) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id["
<< col_unique_id << "]," << index_file_path
Expand All @@ -891,9 +871,8 @@ void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
bool all_have_inverted_index = std::all_of(_input_rowsets.begin(), _input_rowsets.end(),
std::move(has_inverted_index));

if (all_have_inverted_index &&
field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
ctx.skip_inverted_index.insert(col_unique_id);
if (all_have_inverted_index) {
ctx.columns_to_do_index_compaction.insert(col_unique_id);
}
}
}
Expand All @@ -906,7 +885,7 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx)
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
construct_skip_inverted_index(ctx);
construct_index_compaction_columns(ctx);
}
ctx.version = _output_version;
ctx.rowset_state = VISIBLE;
Expand Down Expand Up @@ -1202,7 +1181,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
construct_skip_inverted_index(ctx);
construct_index_compaction_columns(ctx);
}

// Use the storage resource of the previous rowset
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Compaction {

Status do_inverted_index_compaction();

void construct_skip_inverted_index(RowsetWriterContext& ctx);
void construct_index_compaction_columns(RowsetWriterContext& ctx);

virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) = 0;

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ struct RowsetWriterContext {

int64_t newest_write_timestamp = -1;
bool enable_unique_key_merge_on_write = false;
// store column_unique_id to skip write inverted index
std::set<int32_t> skip_inverted_index;
// store column_unique_id to do index compaction
std::set<int32_t> columns_to_do_index_compaction;
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
BaseTabletSPtr tablet = nullptr;

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,9 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& co
opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// skip write inverted index for index compaction
skip_inverted_index = _opts.rowset_ctx->skip_inverted_index.count(column.unique_id()) > 0;
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT && schema->skip_write_index_on_load()) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// skip write inverted index for index compaction
skip_inverted_index = _opts.rowset_ctx->skip_inverted_index.contains(column.unique_id());
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.contains(column.unique_id());
}
// skip write inverted index on load if skip_write_index_on_load is true
if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
Expand Down
Loading