diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 33e2762aa44621..88c69ab5c9a584 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -24,6 +24,7 @@ #include "cloud/config.h" #include "common/status.h" #include "olap/rowset/beta_rowset.h" +#include "olap/segment_loader.h" #include "pipeline/exec/olap_scan_operator.h" #include "vec/exec/scan/new_olap_scanner.h" @@ -63,21 +64,18 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& auto rowset = reader->rowset(); const auto rowset_id = rowset->rowset_id(); - DCHECK(_segment_cache_handles.contains(rowset_id)); - auto& segment_cache_handle = _segment_cache_handles[rowset_id]; + const auto& segments_rows = _all_segments_rows[rowset_id]; if (rowset->num_rows() == 0) { continue; } - const auto& segments = segment_cache_handle.get_segments(); int segment_start = 0; auto split = RowSetSplits(reader->clone()); - for (size_t i = 0; i != segments.size(); ++i) { - const auto& segment = segments[i]; + for (size_t i = 0; i != segments_rows.size(); ++i) { + const size_t rows_of_segment = segments_rows[i]; RowRanges row_ranges; - const size_t rows_of_segment = segment->num_rows(); int64_t offset_in_segment = 0; // try to split large segments into RowRanges @@ -125,7 +123,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& // The non-empty `row_ranges` means there are some rows left in this segment not added into `split`. if (!row_ranges.is_empty()) { DCHECK_GT(rows_collected, 0); - DCHECK_EQ(row_ranges.to(), segment->num_rows()); + DCHECK_EQ(row_ranges.to(), rows_of_segment); split.segment_row_ranges.emplace_back(std::move(row_ranges)); } } @@ -133,7 +131,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& DCHECK_LE(rows_collected, _rows_per_scanner); if (rows_collected > 0) { split.segment_offsets.first = segment_start; - split.segment_offsets.second = segments.size(); + split.segment_offsets.second = segments_rows.size(); DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); DCHECK_EQ(split.segment_row_ranges.size(), split.segment_offsets.second - split.segment_offsets.first); @@ -181,11 +179,15 @@ Status ParallelScannerBuilder::_load() { auto rowset = rs_split.rs_reader->rowset(); RETURN_IF_ERROR(rowset->load()); const auto rowset_id = rowset->rowset_id(); - auto& segment_cache_handle = _segment_cache_handles[rowset_id]; + SegmentCacheHandle segment_cache_handle; RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( std::dynamic_pointer_cast(rowset), &segment_cache_handle, enable_segment_cache, false)); + + for (const auto& segment : segment_cache_handle.get_segments()) { + _all_segments_rows[rowset_id].emplace_back(segment->num_rows()); + } _total_rows += rowset->num_rows(); } } diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index 934d769ed59aa0..7c6b5648e89e04 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -83,7 +83,7 @@ class ParallelScannerBuilder { size_t _rows_per_scanner {_min_rows_per_scanner}; - std::map _segment_cache_handles; + std::map> _all_segments_rows; std::shared_ptr _scanner_profile; RuntimeState* _state;