From ee35b4764cfb2f55763b291ea60b7c448f98f273 Mon Sep 17 00:00:00 2001 From: kakachen Date: Wed, 25 Dec 2024 23:29:30 +0800 Subject: [PATCH] [Feature](orc-reader) Implement new merge io facality for orc reader. --- be/src/apache-orc | 2 +- .../vec/exec/format/orc/orc_file_reader.cpp | 106 +++++++++++ be/src/vec/exec/format/orc/orc_file_reader.h | 88 +++++++++ be/src/vec/exec/format/orc/vorc_reader.cpp | 150 +++++++++++++--- be/src/vec/exec/format/orc/vorc_reader.h | 87 ++++++++- .../exec/format/orc/orc_file_reader_test.cpp | 170 ++++++++++++++++++ 6 files changed, 571 insertions(+), 32 deletions(-) create mode 100644 be/src/vec/exec/format/orc/orc_file_reader.cpp create mode 100644 be/src/vec/exec/format/orc/orc_file_reader.h create mode 100644 be/test/vec/exec/format/orc/orc_file_reader_test.cpp diff --git a/be/src/apache-orc b/be/src/apache-orc index 2f937bdc76406f..f4e9229d8f2edc 160000 --- a/be/src/apache-orc +++ b/be/src/apache-orc @@ -1 +1 @@ -Subproject commit 2f937bdc76406f150b484b6e57629aa8a03d48b6 +Subproject commit f4e9229d8f2edc1b2dcc197765902f97b227b63b diff --git a/be/src/vec/exec/format/orc/orc_file_reader.cpp b/be/src/vec/exec/format/orc/orc_file_reader.cpp new file mode 100644 index 00000000000000..6f1411563e7812 --- /dev/null +++ b/be/src/vec/exec/format/orc/orc_file_reader.cpp @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/orc/orc_file_reader.h" + +#include "util/runtime_profile.h" + +namespace doris { +namespace vectorized { + +OrcMergeRangeFileReader::OrcMergeRangeFileReader(RuntimeProfile* profile, + io::FileReaderSPtr inner_reader, + io::PrefetchRange range) + : _profile(profile), _inner_reader(std::move(inner_reader)), _range(std::move(range)) { + _size = _inner_reader->size(); + _statistics.apply_bytes += range.end_offset - range.start_offset; + if (_profile != nullptr) { + const char* random_profile = "MergedSmallIO"; + ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1); + _copy_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CopyTime", random_profile, 1); + _read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadTime", random_profile, 1); + _request_io = + ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1); + _merged_io = + ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedIO", TUnit::UNIT, random_profile, 1); + _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES, + random_profile, 1); + _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES, + random_profile, 1); + _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes", TUnit::BYTES, + random_profile, 1); + } +} + +Status OrcMergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const io::IOContext* io_ctx) { + auto request_size = result.size; + + _statistics.request_io++; + _statistics.request_bytes += request_size; + + if (request_size == 0) { + *bytes_read = 0; + return Status::OK(); + } + + if (_cache == nullptr) { + auto range_size = _range.end_offset - _range.start_offset; + _cache = std::make_unique(range_size); + + { + SCOPED_RAW_TIMER(&_statistics.read_time); + Slice cache_slice = {_cache.get(), range_size}; + RETURN_IF_ERROR( + _inner_reader->read_at(_range.start_offset, cache_slice, bytes_read, io_ctx)); + _statistics.merged_io++; + _statistics.merged_bytes += *bytes_read; + } + + if (*bytes_read != range_size) [[unlikely]] { + return Status::InternalError( + "OrcMergeRangeFileReader use inner reader read bytes {} not eq expect size {}", + *bytes_read, range_size); + } + + _current_start_offset = _range.start_offset; + } + + SCOPED_RAW_TIMER(&_statistics.copy_time); + int64_t buffer_offset = offset - _current_start_offset; + memcpy(result.data, _cache.get() + buffer_offset, request_size); + *bytes_read = request_size; + return Status::OK(); +} + +void OrcMergeRangeFileReader::_collect_profile_before_close() { + if (_profile != nullptr) { + COUNTER_UPDATE(_copy_time, _statistics.copy_time); + COUNTER_UPDATE(_read_time, _statistics.read_time); + COUNTER_UPDATE(_request_io, _statistics.request_io); + COUNTER_UPDATE(_merged_io, _statistics.merged_io); + COUNTER_UPDATE(_request_bytes, _statistics.request_bytes); + COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes); + COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes); + if (_inner_reader != nullptr) { + _inner_reader->collect_profile_before_close(); + } + } +} + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h b/be/src/vec/exec/format/orc/orc_file_reader.h new file mode 100644 index 00000000000000..d9d90f3e6e4420 --- /dev/null +++ b/be/src/vec/exec/format/orc/orc_file_reader.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "io/fs/buffered_reader.h" +#include "io/fs/file_reader.h" + +namespace doris { +namespace vectorized { + +class OrcMergeRangeFileReader : public io::FileReader { +public: + struct Statistics { + int64_t copy_time = 0; + int64_t read_time = 0; + int64_t request_io = 0; + int64_t merged_io = 0; + int64_t request_bytes = 0; + int64_t merged_bytes = 0; + int64_t apply_bytes = 0; + }; + + OrcMergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader, + io::PrefetchRange range); + + ~OrcMergeRangeFileReader() override = default; + + Status close() override { + if (!_closed) { + _closed = true; + } + return Status::OK(); + } + + const io::Path& path() const override { return _inner_reader->path(); } + + size_t size() const override { return _size; } + + bool closed() const override { return _closed; } + + // for test only + const Statistics& statistics() const { return _statistics; } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const io::IOContext* io_ctx) override; + + void _collect_profile_before_close() override; + +private: + RuntimeProfile::Counter* _copy_time = nullptr; + RuntimeProfile::Counter* _read_time = nullptr; + RuntimeProfile::Counter* _request_io = nullptr; + RuntimeProfile::Counter* _merged_io = nullptr; + RuntimeProfile::Counter* _request_bytes = nullptr; + RuntimeProfile::Counter* _merged_bytes = nullptr; + RuntimeProfile::Counter* _apply_bytes = nullptr; + + RuntimeProfile* _profile; + io::FileReaderSPtr _inner_reader; + io::PrefetchRange _range; + + std::unique_ptr _cache; + int64_t _current_start_offset = -1; + + size_t _size; + bool _closed = false; + + Statistics _statistics; +}; + +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 4d41830668960c..33dbc9412a473e 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -72,6 +72,7 @@ #include "vec/data_types/data_type_map.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_struct.h" +#include "vec/exec/format/orc/orc_file_reader.h" #include "vec/exec/format/table/transactional_hive_common.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vdirect_in_predicate.h" @@ -140,6 +141,34 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { } } +void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t offset) { + _statistics->fs_read_calls++; + _statistics->fs_read_bytes += length; + SCOPED_RAW_TIMER(&_statistics->fs_read_time); + uint64_t has_read = 0; + char* out = reinterpret_cast(buf); + while (has_read < length) { + if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { + throw orc::ParseError("stop"); + } + size_t loop_read; + Slice result(out + has_read, length - has_read); + Status st = _inner_reader->read_at(offset + has_read, result, &loop_read, _io_ctx); + if (!st.ok()) { + throw orc::ParseError( + strings::Substitute("Failed to read $0: $1", _file_name, st.to_string())); + } + if (loop_read == 0) { + break; + } + has_read += loop_read; + } + if (has_read != length) { + throw orc::ParseError(strings::Substitute("Try to read $0 bytes from $1, actually read $2", + length, has_read, _file_name)); + } +} + OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, size_t batch_size, const std::string& ctz, io::IOContext* io_ctx, @@ -252,7 +281,8 @@ Status OrcReader::_create_file_reader() { _profile, _system_properties, _file_description, reader_options, io::DelegateReader::AccessMode::RANDOM, _io_ctx)); _file_input_stream = std::make_unique( - _scan_range.path, std::move(inner_reader), &_statistics, _io_ctx, _profile); + _scan_range.path, std::move(inner_reader), &_statistics, _io_ctx, _profile, + _orc_once_max_read_bytes, _orc_max_merge_distance_bytes); } if (_file_input_stream->getLength() == 0) { return Status::EndOfFile("empty orc file: " + _scan_range.path); @@ -303,6 +333,13 @@ Status OrcReader::init_reader( } _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; _obj_pool = std::make_shared(); + + if (_state != nullptr) { + _orc_tiny_stripe_threshold_bytes = _state->query_options().orc_tiny_stripe_threshold_bytes; + _orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes; + _orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes; + } + { SCOPED_RAW_TIMER(&_statistics.create_reader_time); RETURN_IF_ERROR(_create_file_reader()); @@ -1055,18 +1092,6 @@ Status OrcReader::set_fill_columns( int64_t range_end_offset = _range_start_offset + _range_size; - // If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes merge io optimization will not be used. - int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L; - int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L; - int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L; - - if (_state != nullptr) { - orc_tiny_stripe_threshold_bytes = - _state->query_options().orc_tiny_stripe_threshold_bytes; - orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes; - orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes; - } - bool all_tiny_stripes = true; std::vector tiny_stripe_ranges; @@ -1079,7 +1104,7 @@ Status OrcReader::set_fill_columns( !all_stripes_needed[i]) { continue; } - if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) { + if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) { all_tiny_stripes = false; break; } @@ -1089,8 +1114,8 @@ Status OrcReader::set_fill_columns( if (all_tiny_stripes && number_of_stripes > 0) { std::vector prefetch_merge_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges, - orc_max_merge_distance_bytes, - orc_once_max_read_bytes); + _orc_max_merge_distance_bytes, + _orc_once_max_read_bytes); auto range_finder = std::make_shared(std::move(prefetch_merge_ranges)); @@ -2659,17 +2684,23 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column( void ORCFileInputStream::beforeReadStripe( std::unique_ptr current_strip_information, - std::vector selected_columns) { + std::vector selected_columns, + std::unordered_map>& streams) { if (_is_all_tiny_stripes) { return; } if (_file_reader != nullptr) { _file_reader->collect_profile_before_close(); } - // Generate prefetch ranges, build stripe file reader. + for (const auto& stripe_stream : _stripe_streams) { + if (stripe_stream != nullptr) { + stripe_stream->collect_profile_before_close(); + } + } + _stripe_streams.clear(); + uint64_t offset = current_strip_information->getOffset(); - std::vector prefetch_ranges; - size_t total_io_size = 0; + std::unordered_map prefetch_ranges; for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams(); ++stream_id) { std::unique_ptr stream = @@ -2677,19 +2708,74 @@ void ORCFileInputStream::beforeReadStripe( uint32_t columnId = stream->getColumnId(); uint64_t length = stream->getLength(); if (selected_columns[columnId]) { - total_io_size += length; doris::io::PrefetchRange prefetch_range = {offset, offset + length}; - prefetch_ranges.emplace_back(std::move(prefetch_range)); + orc::StreamId streamId(stream->getColumnId(), stream->getKind()); + prefetch_ranges.emplace(std::move(streamId), std::move(prefetch_range)); } offset += length; } - size_t num_columns = std::count_if(selected_columns.begin(), selected_columns.end(), - [](bool selected) { return selected; }); - if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) { - // The underlying page reader will prefetch data in column. - _file_reader.reset(new io::MergeRangeFileReader(_profile, _inner_reader, prefetch_ranges)); - } else { - _file_reader = _inner_reader; + _build_input_stripe_streams(prefetch_ranges, streams); +} + +void ORCFileInputStream::_build_input_stripe_streams( + const std::unordered_map& ranges, + std::unordered_map>& streams) { + if (ranges.empty()) { + return; + } + + std::unordered_map small_ranges; + std::unordered_map large_ranges; + + for (const auto& range : ranges) { + if (range.second.end_offset - range.second.start_offset <= _orc_once_max_read_bytes) { + small_ranges.emplace(range.first, range.second); + } else { + large_ranges.emplace(range.first, range.second); + } + } + + _build_small_ranges_input_stripe_streams(small_ranges, streams); + _build_large_ranges_input_stripe_streams(large_ranges, streams); +} + +void ORCFileInputStream::_build_small_ranges_input_stripe_streams( + const std::unordered_map& ranges, + std::unordered_map>& streams) { + std::vector all_ranges; + all_ranges.reserve(ranges.size()); + std::transform(ranges.begin(), ranges.end(), std::back_inserter(all_ranges), + [](const auto& pair) { return pair.second; }); + + auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges( + all_ranges, _orc_max_merge_distance_bytes, _orc_once_max_read_bytes); + + for (const auto& merged_range : merged_ranges) { + auto merge_range_file_reader = + std::make_shared(_profile, _file_reader, merged_range); + + for (const auto& [key, range] : ranges) { + if (range.start_offset >= merged_range.start_offset && + range.end_offset <= merged_range.end_offset) { + auto stripe_stream_input_stream = std::make_shared( + getName(), merge_range_file_reader, _statistics, _io_ctx, _profile); + streams.emplace(key, stripe_stream_input_stream); + _stripe_streams.emplace_back(stripe_stream_input_stream); + } + } + } +} + +void ORCFileInputStream::_build_large_ranges_input_stripe_streams( + const std::unordered_map& ranges, + std::unordered_map>& streams) { + for (const auto& range : ranges) { + auto stripe_stream_input_stream = std::make_shared( + getName(), _file_reader, _statistics, _io_ctx, _profile); + streams.emplace(range.first, + std::make_shared(getName(), _file_reader, + _statistics, _io_ctx, _profile)); + _stripe_streams.emplace_back(stripe_stream_input_stream); } } @@ -2697,7 +2783,13 @@ void ORCFileInputStream::_collect_profile_before_close() { if (_file_reader != nullptr) { _file_reader->collect_profile_before_close(); } + for (const auto& stripe_stream : _stripe_streams) { + if (stripe_stream != nullptr) { + stripe_stream->collect_profile_before_close(); + } + } } + void OrcReader::_execute_filter_position_delete_rowids(IColumn::Filter& filter) { if (_position_delete_ordered_rowids == nullptr) { return; diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 6bbf3bead1efce..cb566d08eb05a0 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -650,16 +650,73 @@ class OrcReader : public GenericReader { std::unordered_map _vslot_ref_to_orc_predicate_data_type; std::unordered_map _vliteral_to_orc_literal; + + // If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes merge io optimization will not be used. + int64_t _orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L; + int64_t _orc_once_max_read_bytes = 8L * 1024L * 1024L; + int64_t _orc_max_merge_distance_bytes = 1L * 1024L * 1024L; +}; + +class StripeStreamInputStream : public orc::InputStream, public ProfileCollector { +public: + StripeStreamInputStream(const std::string& file_name, io::FileReaderSPtr inner_reader, + OrcReader::Statistics* statistics, const io::IOContext* io_ctx, + RuntimeProfile* profile) + : _file_name(file_name), + _inner_reader(inner_reader), + _statistics(statistics), + _io_ctx(io_ctx), + _profile(profile) {} + + ~StripeStreamInputStream() override { + if (_inner_reader != nullptr) { + _inner_reader->collect_profile_before_close(); + } + } + + uint64_t getLength() const override { return _inner_reader->size(); } + + uint64_t getNaturalReadSize() const override { return config::orc_natural_read_size_mb << 20; } + + void read(void* buf, uint64_t length, uint64_t offset) override; + + const std::string& getName() const override { return _file_name; } + + RuntimeProfile* profile() const { return _profile; } + + void beforeReadStripe( + std::unique_ptr current_strip_information, + std::vector selected_columns, + std::unordered_map>& streams) override {} + +protected: + void _collect_profile_at_runtime() override {}; + void _collect_profile_before_close() override { + if (_inner_reader != nullptr) { + _inner_reader->collect_profile_before_close(); + } + }; + +private: + const std::string& _file_name; + io::FileReaderSPtr _inner_reader; + // Owned by OrcReader + OrcReader::Statistics* _statistics = nullptr; + const io::IOContext* _io_ctx = nullptr; + RuntimeProfile* _profile = nullptr; }; class ORCFileInputStream : public orc::InputStream, public ProfileCollector { public: ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr inner_reader, OrcReader::Statistics* statistics, const io::IOContext* io_ctx, - RuntimeProfile* profile) + RuntimeProfile* profile, int64_t orc_once_max_read_bytes, + int64_t orc_max_merge_distance_bytes) : _file_name(file_name), _inner_reader(inner_reader), _file_reader(inner_reader), + _orc_once_max_read_bytes(orc_once_max_read_bytes), + _orc_max_merge_distance_bytes(orc_max_merge_distance_bytes), _statistics(statistics), _io_ctx(io_ctx), _profile(profile) {} @@ -668,6 +725,12 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { if (_file_reader != nullptr) { _file_reader->collect_profile_before_close(); } + for (const auto& stripe_stream : _stripe_streams) { + if (stripe_stream != nullptr) { + stripe_stream->collect_profile_before_close(); + } + } + _stripe_streams.clear(); } uint64_t getLength() const override { return _file_reader->size(); } @@ -679,7 +742,9 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { const std::string& getName() const override { return _file_name; } void beforeReadStripe(std::unique_ptr current_strip_information, - std::vector selected_columns) override; + std::vector selected_columns, + std::unordered_map>& + stripe_streams) override; void set_all_tiny_stripes() { _is_all_tiny_stripes = true; } @@ -692,13 +757,31 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { void _collect_profile_before_close() override; private: + void _build_input_stripe_streams( + const std::unordered_map& ranges, + std::unordered_map>& streams); + + void _build_small_ranges_input_stripe_streams( + const std::unordered_map& ranges, + std::unordered_map>& streams); + + void _build_large_ranges_input_stripe_streams( + const std::unordered_map& ranges, + std::unordered_map>& streams); + const std::string& _file_name; io::FileReaderSPtr _inner_reader; io::FileReaderSPtr _file_reader; bool _is_all_tiny_stripes = false; + int64_t _orc_once_max_read_bytes; + int64_t _orc_max_merge_distance_bytes; + + std::vector> _stripe_streams; + // Owned by OrcReader OrcReader::Statistics* _statistics = nullptr; const io::IOContext* _io_ctx = nullptr; RuntimeProfile* _profile = nullptr; }; + } // namespace doris::vectorized diff --git a/be/test/vec/exec/format/orc/orc_file_reader_test.cpp b/be/test/vec/exec/format/orc/orc_file_reader_test.cpp new file mode 100644 index 00000000000000..9e1003c397f07f --- /dev/null +++ b/be/test/vec/exec/format/orc/orc_file_reader_test.cpp @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/orc/orc_file_reader.h" + +#include + +#include "io/fs/local_file_system.h" +#include "util/slice.h" + +namespace doris { +namespace vectorized { + +class MockFileReader : public io::FileReader { +public: + MockFileReader() = default; + ~MockFileReader() override = default; + + Status close() override { + _closed = true; + return Status::OK(); + } + + const io::Path& path() const override { return _path; } + + size_t size() const override { return _data.size(); } + + bool closed() const override { return _closed; } + + void set_data(const std::string& data) { _data = data; } + +protected: + Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, + const io::IOContext* io_ctx) override { + if (offset >= _data.size()) { + *bytes_read = 0; + return Status::OK(); + } + *bytes_read = std::min(result.size, _data.size() - offset); + memcpy(result.data, _data.data() + offset, *bytes_read); + return Status::OK(); + } + +private: + std::string _data; + bool _closed = false; + io::Path _path = "/tmp/mock"; +}; + +class OrcMergeRangeFileReaderTest : public testing::Test { +protected: + void SetUp() override { _mock_reader = std::make_shared(); } + + std::shared_ptr _mock_reader; +}; + +TEST_F(OrcMergeRangeFileReaderTest, basic_init) { + std::string test_data(1024, 'A'); + _mock_reader->set_data(test_data); + + io::PrefetchRange range {0, 1024}; + OrcMergeRangeFileReader reader(nullptr, _mock_reader, range); + EXPECT_EQ(1024, reader.size()); + EXPECT_FALSE(reader.closed()); +} + +TEST_F(OrcMergeRangeFileReaderTest, read_with_cache) { + std::string test_data(1024, 'A'); + _mock_reader->set_data(test_data); + + io::PrefetchRange range {0, 1024}; + const size_t test_size = 128; + + OrcMergeRangeFileReader reader(nullptr, _mock_reader, range); + + // Read from cache + char buffer[test_size]; + Slice result(buffer, test_size); + size_t bytes_read = 0; + + // Read from start + ASSERT_TRUE(reader.read_at(0, result, &bytes_read, nullptr).ok()); + EXPECT_EQ(bytes_read, test_size); + EXPECT_EQ(std::string(buffer, test_size), std::string(test_size, 'A')); + + // Read from middle + ASSERT_TRUE(reader.read_at(512, result, &bytes_read, nullptr).ok()); + EXPECT_EQ(bytes_read, test_size); + EXPECT_EQ(std::string(buffer, test_size), std::string(test_size, 'A')); + + // Verify statistics + EXPECT_EQ(reader.statistics().merged_io, 1); + EXPECT_EQ(reader.statistics().merged_bytes, 1024); +} + +TEST_F(OrcMergeRangeFileReaderTest, read_empty_data) { + _mock_reader->set_data(""); + + io::PrefetchRange range {0, 1024}; + OrcMergeRangeFileReader reader(nullptr, _mock_reader, range); + + char buffer[128]; + Slice result(buffer, 128); + size_t bytes_read = 0; + + ASSERT_FALSE(reader.read_at(0, result, &bytes_read, nullptr).ok()); + EXPECT_EQ(bytes_read, 0); +} + +TEST_F(OrcMergeRangeFileReaderTest, close) { + std::string test_data(1024, 'A'); + _mock_reader->set_data(test_data); + + io::PrefetchRange range {0, 1024}; + OrcMergeRangeFileReader reader(nullptr, _mock_reader, range); + ASSERT_FALSE(reader.closed()); + + ASSERT_TRUE(reader.close().ok()); + ASSERT_TRUE(reader.closed()); +} + +TEST_F(OrcMergeRangeFileReaderTest, multiple_reads_from_cache) { + std::string test_data; + for (int i = 0; i < 1024; i++) { + test_data.push_back(i % 256); + } + _mock_reader->set_data(test_data); + + io::PrefetchRange range {0, 1024}; + OrcMergeRangeFileReader reader(nullptr, _mock_reader, range); + + // Perform multiple reads with different sizes and offsets + const std::vector> read_patterns = { + {0, 128}, // Start, 128 bytes + {256, 64}, // Middle, 64 bytes + {1000, 24}, // Near end, 24 bytes + {512, 256}, // Middle, large read + }; + + for (const auto& pattern : read_patterns) { + std::vector buffer(pattern.second); + Slice result(buffer.data(), pattern.second); + size_t bytes_read = 0; + + ASSERT_TRUE(reader.read_at(pattern.first, result, &bytes_read, nullptr).ok()); + EXPECT_EQ(bytes_read, pattern.second); + EXPECT_EQ(memcmp(buffer.data(), test_data.data() + pattern.first, pattern.second), 0); + } + + // Verify that we only did one actual read + EXPECT_EQ(reader.statistics().merged_io, 1); + EXPECT_EQ(reader.statistics().merged_bytes, 1024); +} + +} // namespace vectorized +} // namespace doris