Skip to content

Commit

Permalink
branch-3.0: [opt](scan) Release instances of Segment to avoid consumi…
Browse files Browse the repository at this point in the history
…ng a large amount of memory in ParallelScannerBuilder apache#44189 (apache#44445)

Cherry-picked from apache#44189

Co-authored-by: Jerry Hu <[email protected]>
  • Loading branch information
github-actions[bot] and mrhhsg authored Nov 22, 2024
1 parent a1c039e commit 2122f13
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
20 changes: 11 additions & 9 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "cloud/config.h"
#include "common/status.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/segment_loader.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"

Expand Down Expand Up @@ -63,21 +64,18 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
auto rowset = reader->rowset();
const auto rowset_id = rowset->rowset_id();

DCHECK(_segment_cache_handles.contains(rowset_id));
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
const auto& segments_rows = _all_segments_rows[rowset_id];

if (rowset->num_rows() == 0) {
continue;
}

const auto& segments = segment_cache_handle.get_segments();
int segment_start = 0;
auto split = RowSetSplits(reader->clone());

for (size_t i = 0; i != segments.size(); ++i) {
const auto& segment = segments[i];
for (size_t i = 0; i != segments_rows.size(); ++i) {
const size_t rows_of_segment = segments_rows[i];
RowRanges row_ranges;
const size_t rows_of_segment = segment->num_rows();
int64_t offset_in_segment = 0;

// try to split large segments into RowRanges
Expand Down Expand Up @@ -125,15 +123,15 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
// The non-empty `row_ranges` means there are some rows left in this segment not added into `split`.
if (!row_ranges.is_empty()) {
DCHECK_GT(rows_collected, 0);
DCHECK_EQ(row_ranges.to(), segment->num_rows());
DCHECK_EQ(row_ranges.to(), rows_of_segment);
split.segment_row_ranges.emplace_back(std::move(row_ranges));
}
}

DCHECK_LE(rows_collected, _rows_per_scanner);
if (rows_collected > 0) {
split.segment_offsets.first = segment_start;
split.segment_offsets.second = segments.size();
split.segment_offsets.second = segments_rows.size();
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
DCHECK_EQ(split.segment_row_ranges.size(),
split.segment_offsets.second - split.segment_offsets.first);
Expand Down Expand Up @@ -181,11 +179,15 @@ Status ParallelScannerBuilder::_load() {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
SegmentCacheHandle segment_cache_handle;

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false));

for (const auto& segment : segment_cache_handle.get_segments()) {
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
}
_total_rows += rowset->num_rows();
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/parallel_scanner_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ParallelScannerBuilder {

size_t _rows_per_scanner {_min_rows_per_scanner};

std::map<RowsetId, SegmentCacheHandle> _segment_cache_handles;
std::map<RowsetId, std::vector<size_t>> _all_segments_rows;

std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeState* _state;
Expand Down

0 comments on commit 2122f13

Please sign in to comment.