Skip to content

Commit

Permalink
5
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Apr 20, 2024
1 parent d24486c commit 19dbd6c
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 7 deletions.
89 changes: 82 additions & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,40 @@ void Compaction::init_profile(const std::string& label) {
}

Status Compaction::merge_input_rowsets() {
LOG(WARNING) << "execute_compact111111 222 13 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
std::vector<RowsetReaderSharedPtr> input_rs_readers;
input_rs_readers.reserve(_input_rowsets.size());
for (auto& rowset : _input_rowsets) {
RowsetReaderSharedPtr rs_reader;
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
LOG(WARNING) << "execute_compact111111 222 24 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
input_rs_readers.push_back(std::move(rs_reader));
}
LOG(WARNING) << "execute_compact111111 222 14 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

RowsetWriterContext ctx;
RETURN_IF_ERROR(construct_output_rowset_writer(ctx));
LOG(WARNING) << "execute_compact111111 222 15 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

// write merged rows to output rowset
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Expand All @@ -190,9 +214,23 @@ Status Compaction::merge_input_rowsets() {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &_stats);
LOG(WARNING)
<< "execute_compact111111 222 16 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(), &_stats);
LOG(WARNING)
<< "execute_compact111111 222 17 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
}
}

Expand All @@ -209,6 +247,12 @@ Status Compaction::merge_input_rowsets() {
RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset),
fmt::format("rowset writer build failed. output_version: {}",
_output_version.to_string()));
LOG(WARNING) << "execute_compact111111 222 18 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

//RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

Expand All @@ -218,20 +262,45 @@ Status Compaction::merge_input_rowsets() {
// Output start version > 2 means we must set the delete predicate in the output rowset
if (_allow_delete_in_cumu_compaction && _output_rowset->version().first > 2) {
DeletePredicatePB delete_predicate;
std::accumulate(_input_rowsets.begin(), _input_rowsets.end(), &delete_predicate,
[](DeletePredicatePB* delete_predicate, const RowsetSharedPtr& rs) {
if (rs->rowset_meta()->has_delete_predicate()) {
delete_predicate->MergeFrom(rs->rowset_meta()->delete_predicate());
}
return delete_predicate;
});
std::accumulate(
_input_rowsets.begin(), _input_rowsets.end(), &delete_predicate,
[this](DeletePredicatePB* delete_predicate, const RowsetSharedPtr& rs) {
if (rs->rowset_meta()->has_delete_predicate()) {
delete_predicate->MergeFrom(rs->rowset_meta()->delete_predicate());
LOG(WARNING)
<< "execute_compact111111 222 20 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()
<< ", "
<< thread_context()
->thread_mem_tracker_mgr->limiter_mem_tracker()
.use_count()
<< ", "
<< thread_context()
->thread_mem_tracker_mgr->limiter_mem_tracker()
->label();
}
return delete_predicate;
});
LOG(WARNING) << "execute_compact111111 222 19 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
// now version in delete_predicate is deprecated
if (!delete_predicate.in_predicates().empty() ||
!delete_predicate.sub_predicates_v2().empty() ||
!delete_predicate.sub_predicates().empty()) {
_output_rowset->rowset_meta()->set_delete_predicate(std::move(delete_predicate));
}
}
LOG(WARNING) << "execute_compact111111 222 18 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());
COUNTER_UPDATE(_output_row_num_counter, _output_rowset->num_rows());
Expand Down Expand Up @@ -899,6 +968,12 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx)
_tablet->keys_type() == KeysType::DUP_KEYS))) {
construct_skip_inverted_index(ctx);
}
LOG(WARNING) << "execute_compact111111 222 23 " << _mem_tracker << ", "
<< _mem_tracker.use_count() << ", " << _mem_tracker->label() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
ctx.version = _output_version;
ctx.rowset_state = VISIBLE;
ctx.segments_overlap = NONOVERLAPPING;
Expand Down
70 changes: 70 additions & 0 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,32 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
reader_params.reader_type = reader_type;

TabletReader::ReadSource read_source;
LOG(WARNING) << "execute_compact111111 222 31 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
read_source.rs_splits.reserve(src_rowset_readers.size());
for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
read_source.rs_splits.emplace_back(rs_reader);
}
read_source.fill_delete_predicates();
reader_params.set_read_source(std::move(read_source));
LOG(WARNING) << "execute_compact111111 222 37 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

reader_params.version = dst_rowset_writer->version();

TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(cur_tablet_schema);
LOG(WARNING) << "execute_compact111111 222 30 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

// Merge the columns in delete predicate that not in latest schema in to current tablet schema
for (auto& del_pred_rs : reader_params.delete_predicates) {
Expand All @@ -111,11 +126,21 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
if (stats_output && stats_output->rowid_conversion) {
reader_params.record_rowids = true;
}
LOG(WARNING) << "execute_compact111111 222 25 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

reader_params.return_columns.resize(cur_tablet_schema.num_columns());
std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_IF_ERROR(reader.init(reader_params));
LOG(WARNING) << "execute_compact111111 222 29 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
Expand All @@ -129,6 +154,11 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
}

vectorized::Block block = cur_tablet_schema.create_block(reader_params.return_columns);
LOG(WARNING) << "execute_compact111111 222 26 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
size_t output_rows = 0;
bool eof = false;
while (!eof && !ExecEnv::GetInstance()->storage_engine().stopped()) {
Expand All @@ -146,10 +176,20 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
stats_output->rowid_conversion->add(reader.current_block_row_locations(),
segment_num_rows);
}
LOG(WARNING) << "execute_compact111111 222 38 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

output_rows += block.rows();
block.clear_column_data();
}
LOG(WARNING) << "execute_compact111111 222 27 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
if (ExecEnv::GetInstance()->storage_engine().stopped()) {
return Status::Error<INTERNAL_ERROR>("tablet {} failed to do compaction, engine stopped",
tablet->tablet_id());
Expand All @@ -164,6 +204,11 @@ Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->flush(),
"failed to flush rowset when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));
LOG(WARNING) << "execute_compact111111 222 28 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

return Status::OK();
}
Expand Down Expand Up @@ -391,11 +436,26 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
Statistics* stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
LOG(WARNING) << "execute_compact111111 222 32 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
vertical_split_columns(tablet_schema, &column_groups);
LOG(WARNING) << "execute_compact111111 222 33 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

std::vector<uint32_t> key_group_cluster_key_idxes;
_generate_key_group_cluster_key_idxes(tablet_schema, column_groups,
key_group_cluster_key_idxes);
LOG(WARNING) << "execute_compact111111 222 34 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();

vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
reader_type);
Expand All @@ -407,10 +467,20 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output,
key_group_cluster_key_idxes));
LOG(WARNING) << "execute_compact111111 222 35 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
LOG(WARNING) << "execute_compact111111 222 36 "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker() << ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker().use_count()
<< ", "
<< thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label();
}

// finish compact, build output rowset
Expand Down

0 comments on commit 19dbd6c

Please sign in to comment.