From bd330a599fef2808ad70065ed09fa5a3307cb9e9 Mon Sep 17 00:00:00 2001 From: plat1ko Date: Thu, 14 Mar 2024 11:46:30 +0800 Subject: [PATCH] [enhance](io) Set segment file size to rowset meta (#31853) --- be/src/cloud/cloud_rowset_writer.cpp | 8 +- be/src/cloud/cloud_tablet.cpp | 2 +- be/src/cloud/cloud_vertical_rowset_writer.cpp | 71 ----- be/src/cloud/cloud_vertical_rowset_writer.h | 60 ----- be/src/io/fs/benchmark/base_benchmark.h | 2 +- be/src/olap/base_tablet.cpp | 2 +- be/src/olap/rowset/beta_rowset.cpp | 5 +- be/src/olap/rowset/beta_rowset_writer.cpp | 105 +++++++- be/src/olap/rowset/beta_rowset_writer.h | 31 ++- be/src/olap/rowset/beta_rowset_writer_v2.cpp | 3 +- be/src/olap/rowset/beta_rowset_writer_v2.h | 6 +- be/src/olap/rowset/rowset.cpp | 12 - be/src/olap/rowset/rowset.h | 2 +- be/src/olap/rowset/rowset_factory.cpp | 5 +- be/src/olap/rowset/rowset_meta.cpp | 34 +++ be/src/olap/rowset/rowset_meta.h | 17 +- be/src/olap/rowset/segment_creator.cpp | 126 ++++----- be/src/olap/rowset/segment_creator.h | 21 +- .../rowset/vertical_beta_rowset_writer.cpp | 185 +++++++++++-- .../olap/rowset/vertical_beta_rowset_writer.h | 24 +- .../vertical_beta_rowset_writer_helper.cpp | 244 ------------------ .../vertical_beta_rowset_writer_helper.h | 83 ------ be/src/vec/runtime/ipv4_value.h | 2 +- 23 files changed, 430 insertions(+), 620 deletions(-) delete mode 100644 be/src/cloud/cloud_vertical_rowset_writer.cpp delete mode 100644 be/src/cloud/cloud_vertical_rowset_writer.h delete mode 100644 be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp delete mode 100644 be/src/olap/rowset/vertical_beta_rowset_writer_helper.h diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index 98f7752b660020..97cc7457c36e45 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -58,7 +58,6 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) _rowset_meta->set_tablet_schema(_context.tablet_schema); _context.segment_collector = std::make_shared>(this); _context.file_writer_creator = std::make_shared>(this); - RETURN_IF_ERROR(_segment_creator.init(_context)); return Status::OK(); } @@ -105,7 +104,12 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) { _rowset_meta->set_newest_write_timestamp(UnixSeconds()); } - // TODO(plat1ko): Record segment file size in rowset meta + if (auto seg_file_size = _seg_files.segments_file_size(_segment_start_id); + !seg_file_size.has_value()) [[unlikely]] { + LOG(ERROR) << "expected segment file sizes, but none presents: " << seg_file_size.error(); + } else { + _rowset_meta->add_segments_file_size(seg_file_size.value()); + } RETURN_NOT_OK_STATUS_WITH_WARN( RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta, diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 6805194b950453..ca58b22a2ac7d6 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -379,7 +379,7 @@ Result> CloudTablet::create_transient_rowset_write context.rowset_dir = remote_tablet_path(tablet_id()); context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write(); context.txn_expiration = txn_expiration; - context.fs = _engine.latest_fs(); + context.fs = rowset.rowset_meta()->fs(); return RowsetFactory::create_rowset_writer(_engine, context, false) .transform([&](auto&& writer) { writer->set_segment_start_id(rowset.num_segments()); diff --git a/be/src/cloud/cloud_vertical_rowset_writer.cpp b/be/src/cloud/cloud_vertical_rowset_writer.cpp deleted file mode 100644 index 861670abd7537d..00000000000000 --- a/be/src/cloud/cloud_vertical_rowset_writer.cpp +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "cloud/cloud_vertical_rowset_writer.h" - -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" -#include "io/fs/file_system.h" -#include "io/fs/file_writer.h" -#include "olap/rowset/beta_rowset.h" -#include "olap/rowset/rowset_meta.h" -#include "olap/rowset/rowset_writer_context.h" -#include "olap/rowset/vertical_beta_rowset_writer_helper.h" -#include "util/slice.h" -#include "util/spinlock.h" -#include "vec/core/block.h" - -namespace doris { -using namespace ErrorCode; - -CloudVerticalRowsetWriter::CloudVerticalRowsetWriter() : CloudRowsetWriter() { - _helper = std::make_shared( - &_segment_writers, _already_built, _rowset_meta, &_num_segment, _context, - &_num_rows_written, &_segments_encoded_key_bounds, &_segment_num_rows, - &_total_index_size, &_file_writers, &_total_data_size, &_lock); -} - -CloudVerticalRowsetWriter::~CloudVerticalRowsetWriter() { - _helper->destruct_writer(); -} - -Status CloudVerticalRowsetWriter::add_columns(const vectorized::Block* block, - const std::vector& col_ids, bool is_key, - uint32_t max_rows_per_segment) { - return _helper->add_columns(block, col_ids, is_key, max_rows_per_segment); -} - -Status CloudVerticalRowsetWriter::flush_columns(bool is_key) { - return _helper->flush_columns(is_key); -} - -Status CloudVerticalRowsetWriter::final_flush() { - return _helper->final_flush(); -} - -} // namespace doris diff --git a/be/src/cloud/cloud_vertical_rowset_writer.h b/be/src/cloud/cloud_vertical_rowset_writer.h deleted file mode 100644 index f6804c011f6ea8..00000000000000 --- a/be/src/cloud/cloud_vertical_rowset_writer.h +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include -#include - -#include "cloud/cloud_rowset_writer.h" -#include "cloud/cloud_storage_engine.h" -#include "common/status.h" -#include "olap/rowset/segment_v2/segment_writer.h" -#include "olap/rowset/vertical_beta_rowset_writer.h" -#include "olap/rowset/vertical_beta_rowset_writer_helper.h" - -namespace doris { -namespace vectorized { -class Block; -} // namespace vectorized - -class CloudVerticalRowsetWriter final : public CloudRowsetWriter { -public: - CloudVerticalRowsetWriter(); - ~CloudVerticalRowsetWriter() override; - - Status add_columns(const vectorized::Block* block, const std::vector& col_ids, - bool is_key, uint32_t max_rows_per_segment) override; - - // flush last segment's column - Status flush_columns(bool is_key) override; - - // flush when all column finished, flush column footer - Status final_flush() override; - - int64_t num_rows() const override { return _total_key_group_rows; } - -private: - std::vector> _segment_writers; - size_t _total_key_group_rows = 0; - std::shared_ptr _helper; -}; - -} // namespace doris \ No newline at end of file diff --git a/be/src/io/fs/benchmark/base_benchmark.h b/be/src/io/fs/benchmark/base_benchmark.h index 4cd0d5b41917f8..8f263b80d93de0 100644 --- a/be/src/io/fs/benchmark/base_benchmark.h +++ b/be/src/io/fs/benchmark/base_benchmark.h @@ -128,7 +128,7 @@ class BaseBenchmark { size_t size = std::min(buffer_size, (size_t)remaining_size); data.size = size; status = reader->read_at(offset, data, &bytes_read); - if (!status.ok() || bytes_read < 0) { + if (!status.ok()) { bm_log("reader read_at error: {}", status.to_string()); break; } diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index e5a41abdbd90d3..3b446e82947503 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1260,7 +1260,7 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, const Tablet RETURN_IF_ERROR(rowset_writer->flush()); RowsetSharedPtr transient_rowset; RETURN_IF_ERROR(rowset_writer->build(transient_rowset)); - rowset->merge_rowset_meta(transient_rowset->rowset_meta()); + rowset->rowset_meta()->merge_rowset_meta(*transient_rowset->rowset_meta()); // erase segment cache cause we will add a segment to rowset SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments()); diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 58e495be22265c..c8b9e0bef17d46 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -152,6 +152,7 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE : io::FileCachePolicy::NO_CACHE, .is_doris_table = true, + .file_size = _rowset_meta->segment_file_size(seg_id), }; auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options, segment); @@ -418,7 +419,9 @@ bool BetaRowset::check_current_rowset_segment() { io::FileReaderOptions reader_options { .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE : io::FileCachePolicy::NO_CACHE, - .is_doris_table = true}; + .is_doris_table = true, + .file_size = _rowset_meta->segment_file_size(seg_id), + }; auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options, &segment); if (!s.ok()) { diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 609a06e40bbb12..41d597d4dc7e0c 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -26,6 +26,7 @@ #include // time #include #include +#include #include #include @@ -96,12 +97,103 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, } // namespace +SegmentFileCollection::~SegmentFileCollection() = default; + +Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { + std::lock_guard lock(_lock); + if (_closed) [[unlikely]] { + DCHECK(false) << writer->path(); + return Status::InternalError("add to closed SegmentFileCollection"); + } + + _file_writers.emplace(seg_id, std::move(writer)); + return Status::OK(); +} + +io::FileWriter* SegmentFileCollection::get(int seg_id) const { + std::lock_guard lock(_lock); + if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) { + return it->second.get(); + } else { + return nullptr; + } +} + +Status SegmentFileCollection::close() { + { + std::lock_guard lock(_lock); + if (_closed) [[unlikely]] { + DCHECK(false); + return Status::InternalError("double close SegmentFileCollection"); + } + _closed = true; + } + + for (auto&& [_, writer] : _file_writers) { + RETURN_IF_ERROR(writer->close()); + } + + return Status::OK(); +} + +Result> SegmentFileCollection::segments_file_size(int seg_id_offset) { + std::lock_guard lock(_lock); + if (!_closed) [[unlikely]] { + DCHECK(false); + return ResultError(Status::InternalError("get segments file size without closed")); + } + + Status st; + std::vector seg_file_size(_file_writers.size(), 0); + bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { + auto&& [seg_id, writer] = it; + + int idx = seg_id - seg_id_offset; + if (idx >= seg_file_size.size()) [[unlikely]] { + auto err_msg = fmt::format( + "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id, + seg_file_size.size(), seg_id_offset, writer->path().native()); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + + auto& fsize = seg_file_size[idx]; + if (fsize != 0) { + // File size should not been set + auto err_msg = + fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + + fsize = writer->bytes_appended(); + if (fsize <= 0) { + auto err_msg = + fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + + return true; + }); + + if (succ) { + return seg_file_size; + } + + return ResultError(st); +} + BaseBetaRowsetWriter::BaseBetaRowsetWriter() : _num_segment(0), _segment_start_id(0), _num_rows_written(0), _total_data_size(0), - _total_index_size(0) {} + _total_index_size(0), + _segment_creator(_context, _seg_files) {} BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) : _engine(engine), _segcompaction_worker(std::make_shared(this)) {} @@ -158,7 +250,6 @@ Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte _rowset_meta->set_tablet_schema(_context.tablet_schema); _context.segment_collector = std::make_shared>(this); _context.file_writer_creator = std::make_shared>(this); - RETURN_IF_ERROR(_segment_creator.init(_context)); return Status::OK(); } @@ -533,12 +624,7 @@ RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& sp } Status BaseBetaRowsetWriter::_close_file_writers() { - // TODO(lingbin): move to more better place, or in a CreateBlockBatch? - for (auto& file_writer : _file_writers) { - RETURN_NOT_OK_STATUS_WITH_WARN( - file_writer->close(), - fmt::format("failed to close file writer, path={}", file_writer->path().string())); - } + // Flush and close segment files RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), "failed to close segment creator when build new rowset"); return Status::OK(); @@ -667,6 +753,7 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch // TODO write zonemap to meta rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); rowset_meta->set_creation_time(time(nullptr)); + return Status::OK(); } @@ -805,7 +892,7 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati } if (_context.mow_context != nullptr) { // ensure that the segment file writing is complete - auto* file_writer = _segment_creator.get_file_writer(segment_id); + auto* file_writer = _seg_files.get(segment_id); if (file_writer) { RETURN_IF_ERROR(file_writer->close()); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index cffb951451e1ea..9d61d32a28d5e3 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -56,6 +56,29 @@ class SegmentWriter; using SegCompactionCandidates = std::vector; using SegCompactionCandidatesSharedPtr = std::shared_ptr; +class SegmentFileCollection { +public: + ~SegmentFileCollection(); + + Status add(int seg_id, io::FileWriterPtr&& writer); + + // Return `nullptr` if no file writer matches `seg_id` + io::FileWriter* get(int seg_id) const; + + // Close all file writers + Status close(); + + // Get segments file size in segment id order. + // `seg_id_offset` is the offset of the segment id relative to the subscript of `_file_writers`, + // for more details, see `Tablet::create_transient_rowset_writer`. + Result> segments_file_size(int seg_id_offset); + +private: + mutable SpinLock _lock; + std::unordered_map _file_writers; + bool _closed {false}; +}; + class BaseBetaRowsetWriter : public RowsetWriter { public: BaseBetaRowsetWriter(); @@ -100,7 +123,7 @@ class BaseBetaRowsetWriter : public RowsetWriter { RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } Status get_segment_num_rows(std::vector* segment_num_rows) const override { - std::lock_guard l(_lock); + std::lock_guard l(_segid_statistics_map_mutex); *segment_num_rows = _segment_num_rows; return Status::OK(); } @@ -143,11 +166,11 @@ class BaseBetaRowsetWriter : public RowsetWriter { std::mutex _segment_set_mutex; // mutex for _segment_set int32_t _segment_start_id; // basic write start from 0, partial update may be different - mutable SpinLock _lock; // protect following vectors. + SegmentFileCollection _seg_files; + // record rows number of every segment already written, using for rowid // conversion when compaction in unique key with MoW model std::vector _segment_num_rows; - std::vector _file_writers; // for unique key table with merge-on-write std::vector _segments_encoded_key_bounds; @@ -158,7 +181,7 @@ class BaseBetaRowsetWriter : public RowsetWriter { // TODO rowset Zonemap std::map _segid_statistics_map; - std::mutex _segid_statistics_map_mutex; + mutable std::mutex _segid_statistics_map_mutex; bool _is_pending = false; bool _already_built = false; diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp b/be/src/olap/rowset/beta_rowset_writer_v2.cpp index 225ba490a35d85..1fee4e04034b10 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp +++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp @@ -58,7 +58,7 @@ namespace doris { using namespace ErrorCode; BetaRowsetWriterV2::BetaRowsetWriterV2(const std::vector>& streams) - : _streams(streams) {} + : _segment_creator(_context, _seg_files), _streams(streams) {} BetaRowsetWriterV2::~BetaRowsetWriterV2() = default; @@ -66,7 +66,6 @@ Status BetaRowsetWriterV2::init(const RowsetWriterContext& rowset_writer_context _context = rowset_writer_context; _context.segment_collector = std::make_shared>(this); _context.file_writer_creator = std::make_shared>(this); - RETURN_IF_ERROR(_segment_creator.init(_context)); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index 7ad5f6e828655e..7553a1ab17ed3d 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -39,9 +39,9 @@ #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "olap/olap_common.h" +#include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" -#include "olap/rowset/rowset_writer.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_creator.h" #include "segment_v2/segment.h" @@ -142,11 +142,11 @@ class BetaRowsetWriterV2 : public RowsetWriter { // record rows number of every segment already written, using for rowid // conversion when compaction in unique key with MoW model std::vector _segment_num_rows; - std::vector _file_writers; + // for unique key table with merge-on-write std::vector _segments_encoded_key_bounds; - // TODO rowset Zonemap + SegmentFileCollection _seg_files; SegmentCreator _segment_creator; diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index 781510f2a21d56..8e696eaec25bc5 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -84,18 +84,6 @@ bool Rowset::check_rowset_segment() { return check_current_rowset_segment(); } -void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr& other) { - _rowset_meta->set_num_segments(num_segments() + other->num_segments()); - _rowset_meta->set_num_rows(num_rows() + other->num_rows()); - _rowset_meta->set_data_disk_size(data_disk_size() + other->data_disk_size()); - _rowset_meta->set_index_disk_size(index_disk_size() + other->index_disk_size()); - std::vector key_bounds; - other->get_segments_key_bounds(&key_bounds); - for (auto key_bound : key_bounds) { - _rowset_meta->add_segment_key_bounds(key_bound); - } -} - std::string Rowset::get_rowset_info_str() { std::string disk_size = PrettyPrinter::print( static_cast(_rowset_meta->total_disk_size()), TUnit::BYTES); diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index b95aad700aec4c..072341405c7342 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -171,7 +171,7 @@ class Rowset : public std::enable_shared_from_this { // used for partial update, when publish, partial update may add a new rowset // and we should update rowset meta - void merge_rowset_meta(const RowsetMetaSharedPtr& other); + Status merge_rowset_meta(const RowsetMeta& other); // close to clear the resource owned by rowset // including: open files, indexes and so on diff --git a/be/src/olap/rowset/rowset_factory.cpp b/be/src/olap/rowset/rowset_factory.cpp index 9d8b253b7f3bdd..59ec9ae2295ffd 100644 --- a/be/src/olap/rowset/rowset_factory.cpp +++ b/be/src/olap/rowset/rowset_factory.cpp @@ -23,7 +23,6 @@ #include "beta_rowset.h" #include "cloud/cloud_rowset_writer.h" -#include "cloud/cloud_vertical_rowset_writer.h" #include "cloud/config.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "olap/rowset/beta_rowset_writer.h" @@ -58,7 +57,7 @@ Result> RowsetFactory::create_rowset_writer( if (context.rowset_type == BETA_ROWSET) { std::unique_ptr writer; if (is_vertical) { - writer = std::make_unique(engine); + writer = std::make_unique>(engine); } else { writer = std::make_unique(engine); } @@ -75,7 +74,7 @@ Result> RowsetFactory::create_rowset_writer( // TODO(plat1ko): cloud vertical rowset writer std::unique_ptr writer; if (is_vertical) { - writer = std::make_unique(); + writer = std::make_unique>(); } else { writer = std::make_unique(); } diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index 412bf56e6f5b5f..994d5351ae1460 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -176,6 +176,40 @@ void RowsetMeta::_init() { } } +void RowsetMeta::add_segments_file_size(const std::vector& seg_file_size) { + _rowset_meta_pb.set_enable_segments_file_size(true); + for (auto fsize : seg_file_size) { + _rowset_meta_pb.add_segments_file_size(fsize); + } +} + +int64_t RowsetMeta::segment_file_size(int seg_id) { + DCHECK(_rowset_meta_pb.segments_file_size().empty() || + _rowset_meta_pb.segments_file_size_size() > seg_id) + << _rowset_meta_pb.segments_file_size_size() << ' ' << seg_id; + return _rowset_meta_pb.enable_segments_file_size() + ? (_rowset_meta_pb.segments_file_size_size() > seg_id + ? _rowset_meta_pb.segments_file_size(seg_id) + : -1) + : -1; +} + +void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { + set_num_segments(num_segments() + other.num_segments()); + set_num_rows(num_rows() + other.num_rows()); + set_data_disk_size(data_disk_size() + other.data_disk_size()); + set_index_disk_size(index_disk_size() + other.index_disk_size()); + for (auto&& key_bound : other.get_segments_key_bounds()) { + add_segment_key_bounds(key_bound); + } + if (_rowset_meta_pb.enable_segments_file_size() && + other._rowset_meta_pb.enable_segments_file_size()) { + for (auto fsize : other.segments_file_size()) { + _rowset_meta_pb.add_segments_file_size(fsize); + } + } +} + bool operator==(const RowsetMeta& a, const RowsetMeta& b) { if (a._rowset_id != b._rowset_id) return false; if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) return false; diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 884a2bd4de047e..a590335eed5f88 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -270,7 +270,7 @@ class RowsetMeta { } } - auto& get_segments_key_bounds() { return _rowset_meta_pb.segments_key_bounds(); } + auto& get_segments_key_bounds() const { return _rowset_meta_pb.segments_key_bounds(); } bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) { // for compatibility, old version has not segment key bounds @@ -296,8 +296,8 @@ class RowsetMeta { } } - void add_segment_key_bounds(const KeyBoundsPB& segments_key_bounds) { - *_rowset_meta_pb.add_segments_key_bounds() = segments_key_bounds; + void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) { + *_rowset_meta_pb.add_segments_key_bounds() = std::move(segments_key_bounds); set_segments_overlap(OVERLAPPING); } @@ -320,6 +320,17 @@ class RowsetMeta { int64_t compaction_level() { return _rowset_meta_pb.compaction_level(); } + // `seg_file_size` MUST ordered by segment id + void add_segments_file_size(const std::vector& seg_file_size); + + // Return -1 if segment file size is unknown + int64_t segment_file_size(int seg_id); + + const auto& segments_file_size() const { return _rowset_meta_pb.segments_file_size(); } + + // Used for partial update, when publish, partial update may add a new rowset and we should update rowset meta + void merge_rowset_meta(const RowsetMeta& other); + // Because the member field '_handle' is a raw pointer, use member func 'init' to replace copy ctor RowsetMeta(const RowsetMeta&) = delete; RowsetMeta operator=(const RowsetMeta&) = delete; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 9a62055ac3fc6a..01011dc2d30ed2 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -47,15 +47,11 @@ namespace doris { using namespace ErrorCode; -SegmentFlusher::SegmentFlusher() = default; +SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files) + : _context(context), _seg_files(seg_files) {} SegmentFlusher::~SegmentFlusher() = default; -Status SegmentFlusher::init(RowsetWriterContext& rowset_writer_context) { - _context = &rowset_writer_context; - return Status::OK(); -} - Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id, int64_t* flush_size) { if (block->rows() == 0) { @@ -64,13 +60,13 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ // Expand variant columns vectorized::Block flush_block(*block); TabletSchemaSPtr flush_schema; - if (_context->write_type != DataWriteType::TYPE_COMPACTION && - _context->tablet_schema->num_variant_columns() > 0) { + if (_context.write_type != DataWriteType::TYPE_COMPACTION && + _context.tablet_schema->num_variant_columns() > 0) { RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema)); } bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; if (config::enable_vertical_segment_writer && - _context->tablet_schema->cluster_key_idxes().empty()) { + _context.tablet_schema->cluster_key_idxes().empty()) { std::unique_ptr writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); @@ -92,10 +88,10 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, } std::vector variant_column_pos; - if (_context->partial_update_info && _context->partial_update_info->is_partial_update) { + if (_context.partial_update_info && _context.partial_update_info->is_partial_update) { // check columns that used to do partial updates should not include variant - for (int i : _context->partial_update_info->update_cids) { - const auto& col = *_context->original_tablet_schema->columns()[i]; + for (int i : _context.partial_update_info->update_cids) { + const auto& col = *_context.original_tablet_schema->columns()[i]; if (!col.is_key() && col.name() != DELETE_SIGN) { return Status::InvalidArgument( "Not implement partial update for variant only support delete currently"); @@ -103,8 +99,8 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, } } else { // find positions of variant columns - for (int i = 0; i < _context->original_tablet_schema->columns().size(); ++i) { - if (_context->original_tablet_schema->columns()[i]->is_variant_type()) { + for (int i = 0; i < _context.original_tablet_schema->columns().size(); ++i) { + if (_context.original_tablet_schema->columns()[i]->is_variant_type()) { variant_column_pos.push_back(i); } } @@ -115,32 +111,32 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, } vectorized::schema_util::ParseContext ctx; - ctx.record_raw_json_column = _context->original_tablet_schema->store_row_column(); + ctx.record_raw_json_column = _context.original_tablet_schema->store_row_column(); RETURN_IF_ERROR(vectorized::schema_util::parse_and_encode_variant_columns( block, variant_column_pos, ctx)); flush_schema = std::make_shared(); - flush_schema->copy_from(*_context->original_tablet_schema); + flush_schema->copy_from(*_context.original_tablet_schema); vectorized::Block flush_block(std::move(block)); vectorized::schema_util::rebuild_schema_and_block( - _context->original_tablet_schema, variant_column_pos, flush_block, flush_schema); + _context.original_tablet_schema, variant_column_pos, flush_block, flush_schema); { // Update rowset schema, tablet's tablet schema will be updated when build Rowset // Eg. flush schema: A(int), B(float), C(int), D(int) // ctx.tablet_schema: A(bigint), B(double) // => update_schema: A(bigint), B(double), C(int), D(int) - std::lock_guard lock(*(_context->schema_lock)); + std::lock_guard lock(*(_context.schema_lock)); TabletSchemaSPtr update_schema; RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( - {_context->tablet_schema, flush_schema}, nullptr, update_schema)); + {_context.tablet_schema, flush_schema}, nullptr, update_schema)); CHECK_GE(update_schema->num_columns(), flush_schema->num_columns()) << "Rowset merge schema columns count is " << update_schema->num_columns() << ", but flush_schema is larger " << flush_schema->num_columns() << " update_schema: " << update_schema->dump_structure() << " flush_schema: " << flush_schema->dump_structure(); - _context->tablet_schema.swap(update_schema); - VLOG_DEBUG << "dump rs schema: " << _context->tablet_schema->dump_structure(); + _context.tablet_schema.swap(update_schema); + VLOG_DEBUG << "dump rs schema: " << _context.tablet_schema->dump_structure(); } block.swap(flush_block); // NOLINT(bugprone-use-after-move) @@ -150,22 +146,13 @@ Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, } Status SegmentFlusher::close() { - std::lock_guard l(_lock); - for (auto& [segment_id, file_writer] : _file_writers) { - Status status = file_writer->close(); - if (!status.ok()) { - LOG(WARNING) << "failed to close file writer, path=" << file_writer->path() - << " res=" << status; - return status; - } - } - return Status::OK(); + return _seg_files.close(); } bool SegmentFlusher::need_buffering() { // buffering variants for schema change - return _context->write_type == DataWriteType::TYPE_SCHEMA_CHANGE && - _context->tablet_schema->num_variant_columns() > 0; + return _context.write_type == DataWriteType::TYPE_SCHEMA_CHANGE && + _context.tablet_schema->num_variant_columns() > 0; } Status SegmentFlusher::_add_rows(std::unique_ptr& segment_writer, @@ -189,24 +176,21 @@ Status SegmentFlusher::_create_segment_writer(std::unique_ptrfile_writer_creator->create(segment_id, file_writer)); + RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer)); segment_v2::SegmentWriterOptions writer_options; - writer_options.enable_unique_key_merge_on_write = _context->enable_unique_key_merge_on_write; - writer_options.rowset_ctx = _context; - writer_options.write_type = _context->write_type; + writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; + writer_options.rowset_ctx = &_context; + writer_options.write_type = _context.write_type; if (no_compression) { writer_options.compression_type = NO_COMPRESSION; } - const auto& tablet_schema = flush_schema ? flush_schema : _context->tablet_schema; - writer.reset(new segment_v2::SegmentWriter( - file_writer.get(), segment_id, tablet_schema, _context->tablet, _context->data_dir, - _context->max_rows_per_segment, writer_options, _context->mow_context)); - { - std::lock_guard l(_lock); - _file_writers.emplace(segment_id, std::move(file_writer)); - } + const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema; + writer = std::make_unique( + file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, + _context.max_rows_per_segment, writer_options, _context.mow_context); + RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); auto s = writer->init(); if (!s.ok()) { LOG(WARNING) << "failed to init segment writer: " << s.to_string(); @@ -220,24 +204,21 @@ Status SegmentFlusher::_create_segment_writer( std::unique_ptr& writer, int32_t segment_id, bool no_compression, TabletSchemaSPtr flush_schema) { io::FileWriterPtr file_writer; - RETURN_IF_ERROR(_context->file_writer_creator->create(segment_id, file_writer)); + RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, file_writer)); segment_v2::VerticalSegmentWriterOptions writer_options; - writer_options.enable_unique_key_merge_on_write = _context->enable_unique_key_merge_on_write; - writer_options.rowset_ctx = _context; - writer_options.write_type = _context->write_type; + writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; + writer_options.rowset_ctx = &_context; + writer_options.write_type = _context.write_type; if (no_compression) { writer_options.compression_type = NO_COMPRESSION; } - const auto& tablet_schema = flush_schema ? flush_schema : _context->tablet_schema; - writer.reset(new segment_v2::VerticalSegmentWriter( - file_writer.get(), segment_id, tablet_schema, _context->tablet, _context->data_dir, - _context->max_rows_per_segment, writer_options, _context->mow_context)); - { - std::lock_guard l(_lock); - _file_writers.emplace(segment_id, std::move(file_writer)); - } + const auto& tablet_schema = flush_schema ? flush_schema : _context.tablet_schema; + writer = std::make_unique( + file_writer.get(), segment_id, tablet_schema, _context.tablet, _context.data_dir, + _context.max_rows_per_segment, writer_options, _context.mow_context); + RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(file_writer))); auto s = writer->init(); if (!s.ok()) { LOG(WARNING) << "failed to init segment writer: " << s.to_string(); @@ -245,9 +226,9 @@ Status SegmentFlusher::_create_segment_writer( return s; } - VLOG_DEBUG << "create new segment writer, tablet_id:" << _context->tablet_id + VLOG_DEBUG << "create new segment writer, tablet_id:" << _context.tablet_id << " segment id: " << segment_id << " filename: " << writer->data_dir_path() - << " rowset_id:" << _context->rowset_id; + << " rowset_id:" << _context.rowset_id; return Status::OK(); } @@ -266,9 +247,9 @@ Status SegmentFlusher::_flush_segment_writer( if (!s.ok()) { return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); } - VLOG_DEBUG << "tablet_id:" << _context->tablet_id + VLOG_DEBUG << "tablet_id:" << _context.tablet_id << " flushing filename: " << writer->data_dir_path() - << " rowset_id:" << _context->rowset_id; + << " rowset_id:" << _context.rowset_id; KeyBoundsPB key_bounds; Slice min_key = writer->min_encoded_key(); @@ -286,7 +267,7 @@ Status SegmentFlusher::_flush_segment_writer( writer.reset(); - RETURN_IF_ERROR(_context->segment_collector->add(segment_id, segstat, flush_schema)); + RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema)); if (flush_size) { *flush_size = segment_size + index_size; @@ -308,9 +289,9 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptrtablet_id - << " flushing rowset_dir: " << _context->rowset_dir - << " rowset_id:" << _context->rowset_id; + VLOG_DEBUG << "tablet_id:" << _context.tablet_id + << " flushing rowset_dir: " << _context.rowset_dir + << " rowset_id:" << _context.rowset_id; KeyBoundsPB key_bounds; Slice min_key = writer->min_encoded_key(); @@ -328,7 +309,7 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptrsegment_collector->add(segment_id, segstat, flush_schema)); + RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat, flush_schema)); if (flush_size) { *flush_size = segment_size + index_size; @@ -345,13 +326,6 @@ Status SegmentFlusher::create_writer(std::unique_ptr& wr return Status::OK(); } -io::FileWriter* SegmentFlusher::get_file_writer(int32_t segment_id) { - if (!_file_writers.contains(segment_id)) { - return nullptr; - } - return _file_writers[segment_id].get(); -} - SegmentFlusher::Writer::Writer(SegmentFlusher* flusher, std::unique_ptr& segment_writer) : _flusher(flusher), _writer(std::move(segment_writer)) {}; @@ -366,10 +340,8 @@ int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) { return _writer->max_row_to_add(row_avg_size_in_bytes); } -Status SegmentCreator::init(RowsetWriterContext& rowset_writer_context) { - RETURN_IF_ERROR(_segment_flusher.init(rowset_writer_context)); - return Status::OK(); -} +SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files) + : _segment_flusher(context, seg_files) {} Status SegmentCreator::add_block(const vectorized::Block* block) { if (block->rows() == 0) { diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index fe439d3bc7a869..b53497d4aba5f3 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -43,6 +43,7 @@ class VerticalSegmentWriter; struct SegmentStatistics; class BetaRowsetWriter; +class SegmentFileCollection; class FileWriterCreator { public: @@ -88,12 +89,10 @@ class SegmentCollectorT : public SegmentCollector { class SegmentFlusher { public: - SegmentFlusher(); + SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files); ~SegmentFlusher(); - Status init(RowsetWriterContext& rowset_writer_context); - // Return the file size flushed to disk in "flush_size" // This method is thread-safe. Status flush_single_block(const vectorized::Block* block, int32_t segment_id, @@ -103,8 +102,6 @@ class SegmentFlusher { int64_t num_rows_filtered() const { return _num_rows_filtered; } - io::FileWriter* get_file_writer(int32_t segment_id); - Status close(); public: @@ -153,10 +150,8 @@ class SegmentFlusher { int64_t* flush_size = nullptr); private: - RowsetWriterContext* _context; - - mutable SpinLock _lock; // protect following vectors. - std::unordered_map _file_writers; + RowsetWriterContext& _context; + SegmentFileCollection& _seg_files; // written rows by add_block/add_row std::atomic _num_rows_written = 0; @@ -165,12 +160,10 @@ class SegmentFlusher { class SegmentCreator { public: - SegmentCreator() = default; + SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files); ~SegmentCreator() = default; - Status init(RowsetWriterContext& rowset_writer_context); - void set_segment_start_id(uint32_t start_id) { _next_segment_id = start_id; } Status add_block(const vectorized::Block* block); @@ -199,10 +192,6 @@ class SegmentCreator { Status close(); - io::FileWriter* get_file_writer(int32_t segment_id) { - return _segment_flusher.get_file_writer(segment_id); - } - private: std::atomic _next_segment_id = 0; SegmentFlusher _segment_flusher; diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 5fdbdcc49afd16..4544223546a434 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -27,6 +27,7 @@ #include #include +#include "cloud/cloud_rowset_writer.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" #include "io/fs/file_system.h" @@ -41,30 +42,182 @@ namespace doris { using namespace ErrorCode; -VerticalBetaRowsetWriter::VerticalBetaRowsetWriter(StorageEngine& engine) - : BetaRowsetWriter(engine) { - _helper = std::make_shared( - &_segment_writers, _already_built, _rowset_meta, &_num_segment, _context, - &_num_rows_written, &_segments_encoded_key_bounds, &_segment_num_rows, - &_total_index_size, &_file_writers, &_total_data_size, &_lock); +template class VerticalBetaRowsetWriter; +template class VerticalBetaRowsetWriter; + +template + requires std::is_base_of_v +Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block, + const std::vector& col_ids, bool is_key, + uint32_t max_rows_per_segment) { + auto& context = this->_context; + + VLOG_NOTICE << "VerticalBetaRowsetWriter::add_columns, columns: " << block->columns(); + size_t num_rows = block->rows(); + if (num_rows == 0) { + return Status::OK(); + } + if (UNLIKELY(max_rows_per_segment > context.max_rows_per_segment)) { + max_rows_per_segment = context.max_rows_per_segment; + } + + if (_segment_writers.empty()) { + // it must be key columns + DCHECK(is_key); + std::unique_ptr writer; + RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); + _segment_writers.emplace_back(std::move(writer)); + _cur_writer_idx = 0; + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows)); + } else if (is_key) { + if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) { + // segment is full, need flush columns and create new segment writer + RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), true)); + + std::unique_ptr writer; + RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); + _segment_writers.emplace_back(std::move(writer)); + ++_cur_writer_idx; + } + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows)); + } else { + // value columns + uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written(); + VLOG_NOTICE << "num_rows_written: " << num_rows_written + << ", _cur_writer_idx: " << _cur_writer_idx; + uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count(); + // init if it's first value column write in current segment + if (_cur_writer_idx == 0 && num_rows_written == 0) { + VLOG_NOTICE << "init first value column segment writer"; + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key)); + } + // when splitting segment, need to make rows align between key columns and value columns + size_t start_offset = 0; + size_t limit = num_rows; + if (num_rows_written + num_rows >= num_rows_key_group && + _cur_writer_idx < _segment_writers.size() - 1) { + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block( + block, 0, num_rows_key_group - num_rows_written)); + RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get())); + start_offset = num_rows_key_group - num_rows_written; + limit = num_rows - start_offset; + ++_cur_writer_idx; + // switch to next writer + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key)); + num_rows_written = 0; + num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count(); + } + if (limit > 0) { + RETURN_IF_ERROR( + _segment_writers[_cur_writer_idx]->append_block(block, start_offset, limit)); + DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <= + _segment_writers[_cur_writer_idx]->row_count()); + } + } + if (is_key) { + this->_num_rows_written += num_rows; + } + return Status::OK(); +} + +template + requires std::is_base_of_v +Status VerticalBetaRowsetWriter::_flush_columns(segment_v2::SegmentWriter* segment_writer, + bool is_key) { + uint64_t index_size = 0; + VLOG_NOTICE << "flush columns index: " << _cur_writer_idx; + RETURN_IF_ERROR(segment_writer->finalize_columns_data()); + RETURN_IF_ERROR(segment_writer->finalize_columns_index(&index_size)); + if (is_key) { + _total_key_group_rows += segment_writer->row_count(); + // record segment key bound + KeyBoundsPB key_bounds; + Slice min_key = segment_writer->min_encoded_key(); + Slice max_key = segment_writer->max_encoded_key(); + DCHECK_LE(min_key.compare(max_key), 0); + key_bounds.set_min_key(min_key.to_string()); + key_bounds.set_max_key(max_key.to_string()); + this->_segments_encoded_key_bounds.emplace_back(std::move(key_bounds)); + this->_segment_num_rows.resize(_cur_writer_idx + 1); + this->_segment_num_rows[_cur_writer_idx] = _segment_writers[_cur_writer_idx]->row_count(); + } + this->_total_index_size += + static_cast(index_size) + segment_writer->get_inverted_index_file_size(); + return Status::OK(); } -VerticalBetaRowsetWriter::~VerticalBetaRowsetWriter() { - _helper->destruct_writer(); +template + requires std::is_base_of_v +Status VerticalBetaRowsetWriter::flush_columns(bool is_key) { + if (_segment_writers.empty()) { + return Status::OK(); + } + + DCHECK(_cur_writer_idx < _segment_writers.size() && _segment_writers[_cur_writer_idx]); + RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), is_key)); + _cur_writer_idx = 0; + return Status::OK(); } -Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block, - const std::vector& col_ids, bool is_key, - uint32_t max_rows_per_segment) { - return _helper->add_columns(block, col_ids, is_key, max_rows_per_segment); +template + requires std::is_base_of_v +Status VerticalBetaRowsetWriter::_create_segment_writer( + const std::vector& column_ids, bool is_key, + std::unique_ptr* writer) { + auto& context = this->_context; + + int seg_id = this->_num_segment.fetch_add(1, std::memory_order_relaxed); + auto path = BetaRowset::segment_file_path(context.rowset_dir, context.rowset_id, seg_id); + auto fs = this->_rowset_meta->fs(); + if (!fs) { + return Status::Error("get fs failed"); + } + io::FileWriterPtr file_writer; + Status st = fs->create_file(path, &file_writer); + if (!st.ok()) { + LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; + return st; + } + + DCHECK(file_writer != nullptr); + segment_v2::SegmentWriterOptions writer_options; + writer_options.enable_unique_key_merge_on_write = context.enable_unique_key_merge_on_write; + writer_options.rowset_ctx = &context; + *writer = std::make_unique( + file_writer.get(), seg_id, context.tablet_schema, context.tablet, context.data_dir, + context.max_rows_per_segment, writer_options, nullptr); + RETURN_IF_ERROR(this->_seg_files.add(seg_id, std::move(file_writer))); + + auto s = (*writer)->init(column_ids, is_key); + if (!s.ok()) { + LOG(WARNING) << "failed to init segment writer: " << s.to_string(); + writer->reset(nullptr); + return s; + } + return Status::OK(); } -Status VerticalBetaRowsetWriter::flush_columns(bool is_key) { - return _helper->flush_columns(is_key); +template + requires std::is_base_of_v +Status VerticalBetaRowsetWriter::final_flush() { + for (auto& segment_writer : _segment_writers) { + uint64_t segment_size = 0; + //uint64_t footer_position = 0; + auto st = segment_writer->finalize_footer(&segment_size); + if (!st.ok()) { + LOG(WARNING) << "Fail to finalize segment footer, " << st; + return st; + } + this->_total_data_size += segment_size + segment_writer->get_inverted_index_file_size(); + segment_writer.reset(); + } + return Status::OK(); } -Status VerticalBetaRowsetWriter::final_flush() { - return _helper->final_flush(); +template + requires std::is_base_of_v +Status VerticalBetaRowsetWriter::_close_file_writers() { + return this->_seg_files.close(); } } // namespace doris diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.h b/be/src/olap/rowset/vertical_beta_rowset_writer.h index b2056cc8fba438..dcb4ae5a8b5d16 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.h +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.h @@ -17,16 +17,13 @@ #pragma once -#include -#include - #include +#include #include #include "common/status.h" #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/segment_v2/segment_writer.h" -#include "olap/rowset/vertical_beta_rowset_writer_helper.h" namespace doris { namespace vectorized { @@ -34,11 +31,14 @@ class Block; } // namespace vectorized // for vertical compaction -// TODO(plat1ko): Inherited from template type `T`, `T` is `BetaRowsetWriter` or `CloudBetaRowsetWriter` -class VerticalBetaRowsetWriter final : public BetaRowsetWriter { +template + requires std::is_base_of_v +class VerticalBetaRowsetWriter final : public T { public: - VerticalBetaRowsetWriter(StorageEngine& engine); - ~VerticalBetaRowsetWriter() override; + template + explicit VerticalBetaRowsetWriter(Args&&... args) : T(std::forward(args)...) {} + + ~VerticalBetaRowsetWriter() override = default; Status add_columns(const vectorized::Block* block, const std::vector& col_ids, bool is_key, uint32_t max_rows_per_segment) override; @@ -51,10 +51,16 @@ class VerticalBetaRowsetWriter final : public BetaRowsetWriter { int64_t num_rows() const override { return _total_key_group_rows; } + Status _close_file_writers() override; + private: + Status _flush_columns(segment_v2::SegmentWriter* segment_writer, bool is_key = false); + Status _create_segment_writer(const std::vector& column_ids, bool is_key, + std::unique_ptr* writer); + std::vector> _segment_writers; + size_t _cur_writer_idx = 0; size_t _total_key_group_rows = 0; - std::shared_ptr _helper; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp deleted file mode 100644 index e8d03530662545..00000000000000 --- a/be/src/olap/rowset/vertical_beta_rowset_writer_helper.cpp +++ /dev/null @@ -1,244 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "olap/rowset/vertical_beta_rowset_writer_helper.h" - -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "common/compiler_util.h" // IWYU pragma: keep -#include "common/logging.h" -#include "io/fs/file_system.h" -#include "io/fs/file_writer.h" -#include "olap/rowset/beta_rowset.h" -#include "olap/rowset/rowset_meta.h" -#include "olap/rowset/rowset_writer_context.h" -#include "util/slice.h" -#include "util/spinlock.h" -#include "vec/core/block.h" - -namespace doris { -using namespace ErrorCode; - -VerticalBetaRowsetWriterHelper::VerticalBetaRowsetWriterHelper( - std::vector>* segment_writers, - bool& already_built, RowsetMetaSharedPtr& rowset_meta, std::atomic* num_segment, - RowsetWriterContext& context, std::atomic* num_rows_written, - std::vector* segments_encoded_key_bounds, - std::vector* segment_num_rows, std::atomic* total_index_size, - std::vector* file_writers, std::atomic* total_data_size, - SpinLock* lock) - : _segment_writers(segment_writers), - _already_built(already_built), - _rowset_meta(rowset_meta), - _num_segment(num_segment), - _context(context), - _num_rows_written(num_rows_written), - _segments_encoded_key_bounds(segments_encoded_key_bounds), - _segment_num_rows(segment_num_rows), - _total_index_size(total_index_size), - _file_writers(file_writers), - _total_data_size(total_data_size), - _lock(lock) {} - -void VerticalBetaRowsetWriterHelper::destruct_writer() { - if (!_already_built) { - const auto& fs = _rowset_meta->fs(); - if (!fs || !_rowset_meta->is_local()) { // Remote fs will delete them asynchronously - return; - } - for (auto& segment_writer : *_segment_writers) { - segment_writer.reset(); - } - for (int i = 0; i < *_num_segment; ++i) { - auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i); - // Even if an error is encountered, these files that have not been cleaned up - // will be cleaned up by the GC background. So here we only print the error - // message when we encounter an error. - WARN_IF_ERROR(fs->delete_file(path), fmt::format("Failed to delete file={}", path)); - } - } -} - -Status VerticalBetaRowsetWriterHelper::add_columns(const vectorized::Block* block, - const std::vector& col_ids, - bool is_key, uint32_t max_rows_per_segment) { - VLOG_NOTICE << "VerticalBetaRowsetWriter::add_columns, columns: " << block->columns(); - size_t num_rows = block->rows(); - if (num_rows == 0) { - return Status::OK(); - } - if (UNLIKELY(max_rows_per_segment > _context.max_rows_per_segment)) { - max_rows_per_segment = _context.max_rows_per_segment; - } - - if (_segment_writers->empty()) { - // it must be key columns - DCHECK(is_key); - std::unique_ptr writer; - RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); - _segment_writers->emplace_back(std::move(writer)); - _cur_writer_idx = 0; - RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); - } else if (is_key) { - if ((*_segment_writers)[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) { - // segment is full, need flush columns and create new segment writer - RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx], true)); - - std::unique_ptr writer; - RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); - _segment_writers->emplace_back(std::move(writer)); - ++_cur_writer_idx; - } - RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block(block, 0, num_rows)); - } else { - // value columns - uint32_t num_rows_written = (*_segment_writers)[_cur_writer_idx]->num_rows_written(); - VLOG_NOTICE << "num_rows_written: " << num_rows_written - << ", _cur_writer_idx: " << _cur_writer_idx; - uint32_t num_rows_key_group = (*_segment_writers)[_cur_writer_idx]->row_count(); - // init if it's first value column write in current segment - if (_cur_writer_idx == 0 && num_rows_written == 0) { - VLOG_NOTICE << "init first value column segment writer"; - RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); - } - // when splitting segment, need to make rows align between key columns and value columns - size_t start_offset = 0; - size_t limit = num_rows; - if (num_rows_written + num_rows >= num_rows_key_group && - _cur_writer_idx < _segment_writers->size() - 1) { - RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->append_block( - block, 0, num_rows_key_group - num_rows_written)); - RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx])); - start_offset = num_rows_key_group - num_rows_written; - limit = num_rows - start_offset; - ++_cur_writer_idx; - // switch to next writer - RETURN_IF_ERROR((*_segment_writers)[_cur_writer_idx]->init(col_ids, is_key)); - num_rows_written = 0; - num_rows_key_group = (*_segment_writers)[_cur_writer_idx]->row_count(); - } - if (limit > 0) { - RETURN_IF_ERROR( - (*_segment_writers)[_cur_writer_idx]->append_block(block, start_offset, limit)); - DCHECK((*_segment_writers)[_cur_writer_idx]->num_rows_written() <= - (*_segment_writers)[_cur_writer_idx]->row_count()); - } - } - if (is_key) { - *_num_rows_written += num_rows; - } - return Status::OK(); -} - -Status VerticalBetaRowsetWriterHelper::_flush_columns( - std::unique_ptr* segment_writer, bool is_key) { - uint64_t index_size = 0; - VLOG_NOTICE << "flush columns index: " << _cur_writer_idx; - RETURN_IF_ERROR((*segment_writer)->finalize_columns_data()); - RETURN_IF_ERROR((*segment_writer)->finalize_columns_index(&index_size)); - if (is_key) { - _total_key_group_rows += (*segment_writer)->row_count(); - // record segment key bound - KeyBoundsPB key_bounds; - Slice min_key = (*segment_writer)->min_encoded_key(); - Slice max_key = (*segment_writer)->max_encoded_key(); - DCHECK_LE(min_key.compare(max_key), 0); - key_bounds.set_min_key(min_key.to_string()); - key_bounds.set_max_key(max_key.to_string()); - _segments_encoded_key_bounds->emplace_back(key_bounds); - _segment_num_rows->resize(_cur_writer_idx + 1); - (*_segment_num_rows)[_cur_writer_idx] = (*_segment_writers)[_cur_writer_idx]->row_count(); - } - *_total_index_size += - static_cast(index_size) + (*segment_writer)->get_inverted_index_file_size(); - return Status::OK(); -} - -Status VerticalBetaRowsetWriterHelper::flush_columns(bool is_key) { - if (_segment_writers->empty()) { - return Status::OK(); - } - - DCHECK(_cur_writer_idx < _segment_writers->size() && (*_segment_writers)[_cur_writer_idx]); - RETURN_IF_ERROR(_flush_columns(&(*_segment_writers)[_cur_writer_idx], is_key)); - _cur_writer_idx = 0; - return Status::OK(); -} - -Status VerticalBetaRowsetWriterHelper::_create_segment_writer( - const std::vector& column_ids, bool is_key, - std::unique_ptr* writer) { - auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, - (*_num_segment)++); - auto fs = _rowset_meta->fs(); - if (!fs) { - return Status::Error("get fs failed"); - } - io::FileWriterPtr file_writer; - io::FileWriterOptions opts; - opts.create_empty_file = false; - Status st = fs->create_file(path, &file_writer, &opts); - if (!st.ok()) { - LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; - return st; - } - - DCHECK(file_writer != nullptr); - segment_v2::SegmentWriterOptions writer_options; - writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; - writer_options.rowset_ctx = &_context; - writer->reset(new segment_v2::SegmentWriter( - file_writer.get(), *_num_segment, _context.tablet_schema, _context.tablet, - _context.data_dir, _context.max_rows_per_segment, writer_options, nullptr)); - { - std::lock_guard l(*_lock); - _file_writers->push_back(std::move(file_writer)); - } - - auto s = (*writer)->init(column_ids, is_key); - if (!s.ok()) { - LOG(WARNING) << "failed to init segment writer: " << s.to_string(); - writer->reset(nullptr); - return s; - } - return Status::OK(); -} - -Status VerticalBetaRowsetWriterHelper::final_flush() { - for (auto& segment_writer : *_segment_writers) { - uint64_t segment_size = 0; - //uint64_t footer_position = 0; - auto st = segment_writer->finalize_footer(&segment_size); - if (!st.ok()) { - LOG(WARNING) << "Fail to finalize segment footer, " << st; - return st; - } - *_total_data_size += segment_size + segment_writer->get_inverted_index_file_size(); - segment_writer.reset(); - } - return Status::OK(); -} - -} // namespace doris diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer_helper.h b/be/src/olap/rowset/vertical_beta_rowset_writer_helper.h deleted file mode 100644 index 10982bf6b2af47..00000000000000 --- a/be/src/olap/rowset/vertical_beta_rowset_writer_helper.h +++ /dev/null @@ -1,83 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include -#include - -#include "common/status.h" -#include "olap/rowset/beta_rowset_writer.h" -#include "olap/rowset/segment_v2/segment_writer.h" - -namespace doris { -namespace vectorized { -class Block; -} // namespace vectorized - -class VerticalBetaRowsetWriterHelper { -public: - VerticalBetaRowsetWriterHelper( - std::vector>* segment_writers, - bool& already_built, RowsetMetaSharedPtr& rowset_meta, - std::atomic* num_segment, RowsetWriterContext& context, - std::atomic* _num_rows_written, - std::vector* _segments_encoded_key_bounds, - std::vector* _segment_num_rows, std::atomic* _total_index_size, - std::vector* _file_writers, std::atomic* _total_data_size, - SpinLock* _lock); - ~VerticalBetaRowsetWriterHelper() = default; - - Status add_columns(const vectorized::Block* block, const std::vector& col_ids, - bool is_key, uint32_t max_rows_per_segment); - - Status flush_columns(bool is_key); - - Status final_flush(); - - int64_t num_rows() const { return _total_key_group_rows; } - - void destruct_writer(); - -private: - Status _flush_columns(std::unique_ptr* segment_writer, - bool is_key = false); - Status _create_segment_writer(const std::vector& column_ids, bool is_key, - std::unique_ptr* writer); - -private: - std::vector>* _segment_writers; - size_t _cur_writer_idx = 0; - size_t _total_key_group_rows = 0; - - bool& _already_built; - RowsetMetaSharedPtr& _rowset_meta; - std::atomic* _num_segment; - RowsetWriterContext& _context; - std::atomic* _num_rows_written; - std::vector* _segments_encoded_key_bounds; - std::vector* _segment_num_rows; - std::atomic* _total_index_size; - std::vector* _file_writers; - std::atomic* _total_data_size; - SpinLock* _lock; -}; - -} // namespace doris \ No newline at end of file diff --git a/be/src/vec/runtime/ipv4_value.h b/be/src/vec/runtime/ipv4_value.h index 2a5a5ae91ef8ee..ffce9e2bb67641 100644 --- a/be/src/vec/runtime/ipv4_value.h +++ b/be/src/vec/runtime/ipv4_value.h @@ -48,7 +48,7 @@ class IPv4Value { if (len == 0) { return false; } - int64_t parse_value; + int64_t parse_value = 0; size_t begin = 0; size_t end = len - 1; while (begin < len && std::isspace(ipv4_str[begin])) {