Skip to content

Commit

Permalink
[opt](index compaction)Add dual write inverted index file switch (apa…
Browse files Browse the repository at this point in the history
…che#42280)

## Proposed changes

To check correctness of index file produced by index compaction process,
we add a switch `dual_write_inverted_index_enable` (default is `false`).
When both `inverted_index_compaction_enable` and
`dual_write_inverted_index_enable` are `true`, Doris will produce index
file through both normal compaction process and index compaction
process, and compares both index files, which are theoretically
identical. Doris will log FATAL and crash after the check failed.

We add this feature only for test, *DO NOT* use it in production.
  • Loading branch information
qidaye authored Oct 25, 2024
1 parent 89286af commit e9a14d7
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 3 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,8 @@ DEFINE_Int32(max_depth_in_bkd_tree, "32");
DEFINE_mBool(inverted_index_compaction_enable, "false");
// Only for debug, do not use in production
DEFINE_mBool(debug_inverted_index_compaction, "false");
// Only for debug, do not use in production
DEFINE_mBool(dual_write_inverted_index_enable, "false");
// index by RAM directory
DEFINE_mBool(inverted_index_ram_dir_enable, "true");
// use num_broadcast_buffer blocks as buffer to do broadcast
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,11 @@ DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_mBool(inverted_index_compaction_enable);
// Only for debug, do not use in production
// Debug switch for collecting intermediate data in inverted index compaction
DECLARE_mBool(debug_inverted_index_compaction);
// Only for debug, do not use in production
// Debug switch for writing inverted index both in compaction process and index compaction process
DECLARE_mBool(dual_write_inverted_index_enable);
// index by RAM directory
DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
Expand Down
151 changes: 150 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/compaction.h"

#include <fmt/format.h>
#include <gen_cpp/olap_common.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>

Expand All @@ -35,6 +36,8 @@
#include <shared_mutex>
#include <utility>

#include "CLucene/config/repl_wchar.h"
#include "CLucene/index/Terms.h"
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "common/config.h"
Expand Down Expand Up @@ -619,6 +622,7 @@ Status Compaction::do_inverted_index_compaction() {
// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))};
Expand All @@ -644,6 +648,12 @@ Status Compaction::do_inverted_index_compaction() {
}
compacted_idx_file_size[seg_id] = fsize;
}
// if dual_write_inverted_index_enable is true, we need to write inverted index to tmp dir
if (config::dual_write_inverted_index_enable) {
auto tmp_index_path_prefix =
tmp_file_dir / (dest_rowset_id.to_string() + "_" + std::to_string(seg_id));
index_path_prefix = tmp_index_path_prefix;
}
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
Expand All @@ -667,7 +677,6 @@ Status Compaction::do_inverted_index_compaction() {
}

// use tmp file dir to store index files
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
auto index_tmp_path = tmp_file_dir / dest_rowset_id.to_string();
LOG(INFO) << "start index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num
Expand Down Expand Up @@ -754,6 +763,70 @@ Status Compaction::do_inverted_index_compaction() {
return status;
}

// check idx file correctness only when dual_write_inverted_index_enable is true
if (config::dual_write_inverted_index_enable) {
for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->get_inverted_index(col);
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; dest_segment_id++) {
// create index file reader for normal compaction index file
std::string index_path_prefix {InvertedIndexDescriptor::get_index_file_path_prefix(
ctx.segment_path(dest_segment_id))};
io::Path cfs_path;
if (_cur_tablet_schema->get_inverted_index_storage_format() !=
doris::InvertedIndexStorageFormatPB::V1) {
cfs_path = InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
} else {
cfs_path = InvertedIndexDescriptor::get_index_file_path_v1(
index_path_prefix, index_meta->index_id(),
index_meta->get_index_suffix());
}
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
ctx.fs(), index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache);
if (!st.ok()) {
LOG(FATAL) << "inverted_index_file_reader init failed in index compaction "
"correctness check, error:"
<< st;
}
auto index_reader = DORIS_TRY(inverted_index_file_reader->open(index_meta));

// create index file reader for tmp index compaction index file
auto tmp_index_path_prefix = tmp_file_dir / (dest_rowset_id.to_string() + "_" +
std::to_string(dest_segment_id));
auto tmp_inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
doris::io::global_local_filesystem(), tmp_index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
st = tmp_inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache);
if (!st.ok()) {
LOG(FATAL) << "tmp_inverted_index_file_reader init failed in index compaction "
"correctness check, error:"
<< st;
}
auto tmp_index_reader = DORIS_TRY(tmp_inverted_index_file_reader->open(index_meta));

st = check_idx_file_correctness(*index_reader, *tmp_index_reader);
if (!st.ok()) {
LOG(FATAL) << "index compaction correctness check failed"
<< ", tablet=" << _tablet->tablet_id() << ", index_path=" << cfs_path
<< ", tmp_index_path="
<< (tmp_index_path_prefix.string() + "_" +
std::to_string(index_meta->index_id()) + ".idx")
<< ", error=" << st.msg();
}
LOG(INFO) << "index compaction correctness check succeed"
<< ", tablet=" << _tablet->tablet_id() << ", index_path=" << cfs_path
<< ", tmp_index_path="
<< (tmp_index_path_prefix.string() + "_" +
std::to_string(index_meta->index_id()) + ".idx");
}
}
}

// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
Expand All @@ -776,6 +849,82 @@ Status Compaction::do_inverted_index_compaction() {
return Status::OK();
}

Status Compaction::check_idx_file_correctness(DorisCompoundReader& index_reader,
DorisCompoundReader& tmp_index_reader) {
lucene::index::IndexReader* idx_reader = lucene::index::IndexReader::open(&index_reader);
lucene::index::IndexReader* tmp_idx_reader =
lucene::index::IndexReader::open(&tmp_index_reader);

// compare numDocs
if (idx_reader->numDocs() != tmp_idx_reader->numDocs()) {
return Status::InternalError(
"index compaction correctness check failed, numDocs not equal, idx_numDocs={}, "
"tmp_idx_numDocs={}",
idx_reader->numDocs(), tmp_idx_reader->numDocs());
}

lucene::index::TermEnum* term_enum = idx_reader->terms();
lucene::index::TermEnum* tmp_term_enum = tmp_idx_reader->terms();

// iterate TermEnum
while (term_enum->next() && tmp_term_enum->next()) {
std::string token = lucene_wcstoutf8string(term_enum->term(false)->text(),
term_enum->term(false)->textLength());
std::string field = lucene_wcstoutf8string(term_enum->term(false)->field(),
lenOfString(term_enum->term(false)->field()));
std::string tmp_token = lucene_wcstoutf8string(tmp_term_enum->term(false)->text(),
tmp_term_enum->term(false)->textLength());
std::string tmp_field =
lucene_wcstoutf8string(tmp_term_enum->term(false)->field(),
lenOfString(tmp_term_enum->term(false)->field()));
// compare token and field
if (field != tmp_field) {
return Status::InternalError(
"index compaction correctness check failed, fields not equal, field={}, "
"tmp_field={}",
field, field);
}
if (token != tmp_token) {
return Status::InternalError(
"index compaction correctness check failed, tokens not equal, token={}, "
"tmp_token={}",
token, tmp_token);
}

// get term's docId and freq
lucene::index::TermDocs* term_docs = idx_reader->termDocs(term_enum->term());
lucene::index::TermDocs* tmp_term_docs = tmp_idx_reader->termDocs(tmp_term_enum->term());

// compare term's docId and freq
while (term_docs->next() && tmp_term_docs->next()) {
if (term_docs->doc() != tmp_term_docs->doc() ||
term_docs->freq() != tmp_term_docs->freq()) {
return Status::InternalError(
"index compaction correctness check failed, docId or freq not equal, "
"docId={}, tmp_docId={}, freq={}, tmp_freq={}",
term_docs->doc(), tmp_term_docs->doc(), term_docs->freq(),
tmp_term_docs->freq());
}
}

// check if there are remaining docs
if (term_docs->next() || tmp_term_docs->next()) {
return Status::InternalError(
"index compaction correctness check failed, number of docs not equal for "
"term={}, tmp_term={}",
token, tmp_token);
}
}

// check if there are remaining terms
if (term_enum->next() || tmp_term_enum->next()) {
return Status::InternalError(
"index compaction correctness check failed, number of terms not equal");
}

return Status::OK();
}

void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
for (const auto& index : _cur_tablet_schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class Compaction {

Status do_inverted_index_compaction();

Status check_idx_file_correctness(DorisCompoundReader& index_reader,
DorisCompoundReader& tmp_index_reader);

void construct_index_compaction_columns(RowsetWriterContext& ctx);

virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) = 0;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& co

opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// if dual_write_inverted_index_enable is true, do not skip write inverted index on index compaction columns
if (_opts.rowset_ctx != nullptr && !config::dual_write_inverted_index_enable) {
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo

opts.need_bitmap_index = column.has_bitmap_index();
bool skip_inverted_index = false;
if (_opts.rowset_ctx != nullptr) {
// if dual_write_inverted_index_enable is true, do not skip write inverted index on index compaction columns
if (_opts.rowset_ctx != nullptr && !config::dual_write_inverted_index_enable) {
// skip write inverted index for index compaction column
skip_inverted_index =
_opts.rowset_ctx->columns_to_do_index_compaction.contains(column.unique_id());
Expand Down

0 comments on commit e9a14d7

Please sign in to comment.