Skip to content

Commit

Permalink
refactor variant flush
Browse files Browse the repository at this point in the history
refactor writer
  • Loading branch information
eldenmoon committed Dec 3, 2024
1 parent 29787a2 commit 25ca020
Show file tree
Hide file tree
Showing 10 changed files with 723 additions and 162 deletions.
57 changes: 53 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "olap/rowset/segment_v2/page_builder.h"
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/tablet_schema.h"
#include "olap/types.h"
Expand Down Expand Up @@ -292,6 +293,14 @@ Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
return Status::OK();
}

Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
*writer = std::unique_ptr<ColumnWriter>(new VariantColumnWriter(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column))));
return Status::OK();
}

//Todo(Amory): here should according nullable and offset and need sub to simply this function
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
Expand Down Expand Up @@ -320,10 +329,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_VARIANT: {
// Use ScalarColumnWriter to write it's only root data
std::unique_ptr<ColumnWriter> writer_local = std::unique_ptr<ColumnWriter>(
new ScalarColumnWriter(opts, std::move(field), file_writer));
*writer = std::move(writer_local);
RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer));
return Status::OK();
}
default:
Expand Down Expand Up @@ -1158,4 +1164,47 @@ Status MapColumnWriter::write_inverted_index() {
return Status::OK();
}

VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts,
const TabletColumn* column, std::unique_ptr<Field> field)
: ColumnWriter(std::move(field), opts.meta->is_nullable()) {
_impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
};

Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
_next_rowid += num_rows;
return _impl->append_data(ptr, num_rows);
}

uint64_t VariantColumnWriter::estimate_buffer_size() {
return _impl->estimate_buffer_size();
}

Status VariantColumnWriter::finish() {
return _impl->finish();
}
Status VariantColumnWriter::write_data() {
return _impl->write_data();
}
Status VariantColumnWriter::write_ordinal_index() {
return _impl->write_ordinal_index();
}

Status VariantColumnWriter::write_zone_map() {
return _impl->write_zone_map();
}

Status VariantColumnWriter::write_bitmap_index() {
return _impl->write_bitmap_index();
}
Status VariantColumnWriter::write_inverted_index() {
return _impl->write_inverted_index();
}
Status VariantColumnWriter::write_bloom_filter_index() {
return _impl->write_bloom_filter_index();
}
Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
return _impl->append_nullable(null_map, ptr, num_rows);
}

} // namespace doris::segment_v2
51 changes: 50 additions & 1 deletion be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
#include <vector>

#include "common/status.h" // for Status
#include "olap/field.h" // for Field
#include "exec/decompressor.h"
#include "olap/field.h" // for Field
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "util/bitmap.h" // for BitmapChange
Expand All @@ -40,6 +41,7 @@ namespace doris {
class BlockCompressionCodec;
class TabletColumn;
class TabletIndex;
struct RowsetWriterContext;

namespace io {
class FileWriter;
Expand All @@ -66,6 +68,11 @@ struct ColumnWriterOptions {
std::vector<const TabletIndex*> indexes; // unused
const TabletIndex* inverted_index = nullptr;
InvertedIndexFileWriter* inverted_index_file_writer;
// variant column writer used
SegmentFooterPB* footer = nullptr;
io::FileWriter* file_writer = nullptr;
CompressionTypePB compression_type = UNKNOWN_COMPRESSION;
RowsetWriterContext* rowset_ctx = nullptr;
std::string to_string() const {
std::stringstream ss;
ss << std::boolalpha << "meta=" << meta->DebugString()
Expand All @@ -84,6 +91,7 @@ class OrdinalIndexWriter;
class PageBuilder;
class BloomFilterIndexWriter;
class ZoneMapIndexWriter;
class VariantColumnWriterImpl;

class ColumnWriter {
public:
Expand All @@ -98,6 +106,9 @@ class ColumnWriter {
static Status create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column,
io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer);
static Status create_variant_writer(const ColumnWriterOptions& opts, const TabletColumn* column,
io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer);
static Status create_agg_state_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer);
Expand Down Expand Up @@ -462,5 +473,43 @@ class MapColumnWriter final : public ColumnWriter {
ColumnWriterOptions _opts;
};

class VariantColumnWriter : public ColumnWriter {
public:
explicit VariantColumnWriter(const ColumnWriterOptions& opts, const TabletColumn* column,
std::unique_ptr<Field> field);

~VariantColumnWriter() override = default;

Status init() override { return Status::OK(); }

Status append_data(const uint8_t** ptr, size_t num_rows) override;

uint64_t estimate_buffer_size() override;

Status finish() override;
Status write_data() override;
Status write_ordinal_index() override;

Status write_zone_map() override;

Status write_bitmap_index() override;
Status write_inverted_index() override;
Status write_bloom_filter_index() override;
ordinal_t get_next_rowid() const override { return _next_rowid; }

Status append_nulls(size_t num_rows) override {
return Status::NotSupported("variant writer can not append_nulls");
}
Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows) override;

Status finish_current_page() override {
return Status::NotSupported("variant writer has no data, can not finish_current_page");
}

private:
std::unique_ptr<VariantColumnWriterImpl> _impl;
ordinal_t _next_rowid = 0;
};

} // namespace segment_v2
} // namespace doris
Loading

0 comments on commit 25ca020

Please sign in to comment.