Skip to content

Commit

Permalink
[enhance](io) Set segment file size to rowset meta (apache#31853)
Browse files Browse the repository at this point in the history
  • Loading branch information
platoneko authored Mar 14, 2024
1 parent 65945a0 commit bd330a5
Show file tree
Hide file tree
Showing 23 changed files with 430 additions and 620 deletions.
8 changes: 6 additions & 2 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
_rowset_meta->set_tablet_schema(_context.tablet_schema);
_context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
RETURN_IF_ERROR(_segment_creator.init(_context));
return Status::OK();
}

Expand Down Expand Up @@ -105,7 +104,12 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}

// TODO(plat1ko): Record segment file size in rowset meta
if (auto seg_file_size = _seg_files.segments_file_size(_segment_start_id);
!seg_file_size.has_value()) [[unlikely]] {
LOG(ERROR) << "expected segment file sizes, but none presents: " << seg_file_size.error();
} else {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}

RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_write
context.rowset_dir = remote_tablet_path(tablet_id());
context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
context.txn_expiration = txn_expiration;
context.fs = _engine.latest_fs();
context.fs = rowset.rowset_meta()->fs();
return RowsetFactory::create_rowset_writer(_engine, context, false)
.transform([&](auto&& writer) {
writer->set_segment_start_id(rowset.num_segments());
Expand Down
71 changes: 0 additions & 71 deletions be/src/cloud/cloud_vertical_rowset_writer.cpp

This file was deleted.

60 changes: 0 additions & 60 deletions be/src/cloud/cloud_vertical_rowset_writer.h

This file was deleted.

2 changes: 1 addition & 1 deletion be/src/io/fs/benchmark/base_benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class BaseBenchmark {
size_t size = std::min(buffer_size, (size_t)remaining_size);
data.size = size;
status = reader->read_at(offset, data, &bytes_read);
if (!status.ok() || bytes_read < 0) {
if (!status.ok()) {
bm_log("reader read_at error: {}", status.to_string());
break;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, const Tablet
RETURN_IF_ERROR(rowset_writer->flush());
RowsetSharedPtr transient_rowset;
RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
rowset->merge_rowset_meta(transient_rowset->rowset_meta());
rowset->rowset_meta()->merge_rowset_meta(*transient_rowset->rowset_meta());

// erase segment cache cause we will add a segment to rowset
SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments());
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.file_size = _rowset_meta->segment_file_size(seg_id),
};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options,
segment);
Expand Down Expand Up @@ -418,7 +419,9 @@ bool BetaRowset::check_current_rowset_segment() {
io::FileReaderOptions reader_options {
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true};
.is_doris_table = true,
.file_size = _rowset_meta->segment_file_size(seg_id),
};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema,
reader_options, &segment);
if (!s.ok()) {
Expand Down
105 changes: 96 additions & 9 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ctime> // time
#include <filesystem>
#include <memory>
#include <mutex>
#include <sstream>
#include <utility>

Expand Down Expand Up @@ -96,12 +97,103 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,

} // namespace

SegmentFileCollection::~SegmentFileCollection() = default;

Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) {
std::lock_guard lock(_lock);
if (_closed) [[unlikely]] {
DCHECK(false) << writer->path();
return Status::InternalError("add to closed SegmentFileCollection");
}

_file_writers.emplace(seg_id, std::move(writer));
return Status::OK();
}

io::FileWriter* SegmentFileCollection::get(int seg_id) const {
std::lock_guard lock(_lock);
if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) {
return it->second.get();
} else {
return nullptr;
}
}

Status SegmentFileCollection::close() {
{
std::lock_guard lock(_lock);
if (_closed) [[unlikely]] {
DCHECK(false);
return Status::InternalError("double close SegmentFileCollection");
}
_closed = true;
}

for (auto&& [_, writer] : _file_writers) {
RETURN_IF_ERROR(writer->close());
}

return Status::OK();
}

Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) {
std::lock_guard lock(_lock);
if (!_closed) [[unlikely]] {
DCHECK(false);
return ResultError(Status::InternalError("get segments file size without closed"));
}

Status st;
std::vector<size_t> seg_file_size(_file_writers.size(), 0);
bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) {
auto&& [seg_id, writer] = it;

int idx = seg_id - seg_id_offset;
if (idx >= seg_file_size.size()) [[unlikely]] {
auto err_msg = fmt::format(
"invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id,
seg_file_size.size(), seg_id_offset, writer->path().native());
DCHECK(false) << err_msg;
st = Status::InternalError(err_msg);
return false;
}

auto& fsize = seg_file_size[idx];
if (fsize != 0) {
// File size should not been set
auto err_msg =
fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native());
DCHECK(false) << err_msg;
st = Status::InternalError(err_msg);
return false;
}

fsize = writer->bytes_appended();
if (fsize <= 0) {
auto err_msg =
fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native());
DCHECK(false) << err_msg;
st = Status::InternalError(err_msg);
return false;
}

return true;
});

if (succ) {
return seg_file_size;
}

return ResultError(st);
}

BaseBetaRowsetWriter::BaseBetaRowsetWriter()
: _num_segment(0),
_segment_start_id(0),
_num_rows_written(0),
_total_data_size(0),
_total_index_size(0) {}
_total_index_size(0),
_segment_creator(_context, _seg_files) {}

BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
: _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}
Expand Down Expand Up @@ -158,7 +250,6 @@ Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte
_rowset_meta->set_tablet_schema(_context.tablet_schema);
_context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
RETURN_IF_ERROR(_segment_creator.init(_context));
return Status::OK();
}

Expand Down Expand Up @@ -533,12 +624,7 @@ RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& sp
}

Status BaseBetaRowsetWriter::_close_file_writers() {
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
RETURN_NOT_OK_STATUS_WITH_WARN(
file_writer->close(),
fmt::format("failed to close file writer, path={}", file_writer->path().string()));
}
// Flush and close segment files
RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
"failed to close segment creator when build new rowset");
return Status::OK();
Expand Down Expand Up @@ -667,6 +753,7 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
// TODO write zonemap to meta
rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
rowset_meta->set_creation_time(time(nullptr));

return Status::OK();
}

Expand Down Expand Up @@ -805,7 +892,7 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStati
}
if (_context.mow_context != nullptr) {
// ensure that the segment file writing is complete
auto* file_writer = _segment_creator.get_file_writer(segment_id);
auto* file_writer = _seg_files.get(segment_id);
if (file_writer) {
RETURN_IF_ERROR(file_writer->close());
}
Expand Down
Loading

0 comments on commit bd330a5

Please sign in to comment.