Skip to content

Commit

Permalink
Merge branch 'master' into nereids_recover_all
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Nov 19, 2024
2 parents cba8acb + b3cb480 commit b950a7a
Show file tree
Hide file tree
Showing 74 changed files with 12,357 additions and 429 deletions.
4 changes: 4 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,10 @@ Status BaseTablet::check_rowid_conversion(
VLOG_DEBUG << "check_rowid_conversion, location_map is empty";
return Status::OK();
}
if (!tablet_schema()->cluster_key_idxes().empty()) {
VLOG_DEBUG << "skip check_rowid_conversion for mow tables with cluster keys";
return Status::OK();
}
std::vector<segment_v2::SegmentSharedPtr> dst_segments;

RETURN_IF_ERROR(
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,12 @@ Status MemTable::_sort_by_cluster_keys() {
for (int i = 0; i < row_in_blocks.size(); i++) {
row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos);
}
std::vector<int> column_offset;
for (int i = 0; i < _column_offset.size(); ++i) {
column_offset.emplace_back(i);
}
return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows(), &_column_offset);
row_pos_vec.data() + in_block.rows(), &column_offset);
}

void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,10 @@ Status Merger::vertical_compact_one_group(
}

reader_params.tablet_schema = merge_tablet_schema;
bool has_cluster_key = false;
if (!tablet->tablet_schema()->cluster_key_idxes().empty()) {
reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
has_cluster_key = true;
}

if (is_key && stats_output && stats_output->rowid_conversion) {
Expand Down Expand Up @@ -290,7 +292,8 @@ Status Merger::vertical_compact_one_group(
"failed to read next block when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));
RETURN_NOT_OK_STATUS_WITH_WARN(
dst_rowset_writer->add_columns(&block, column_group, is_key, max_rows_per_segment),
dst_rowset_writer->add_columns(&block, column_group, is_key, max_rows_per_segment,
has_cluster_key),
"failed to write block when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));

Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/primary_key_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ Status PrimaryKeyIndexBuilder::add_item(const Slice& key) {
if (UNLIKELY(_num_rows == 0)) {
_min_key.append(key.get_data(), key.get_size());
}
DCHECK(key.compare(_max_key) > 0)
<< "found duplicate key or key is not sorted! current key: " << key
<< ", last max key: " << _max_key;
_max_key.clear();
_max_key.append(key.get_data(), key.get_size());
_num_rows++;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class RowsetWriter {
"RowsetWriter not support add_block");
}
virtual Status add_columns(const vectorized::Block* block, const std::vector<uint32_t>& col_ids,
bool is_key, uint32_t max_rows_per_segment) {
bool is_key, uint32_t max_rows_per_segment, bool has_cluster_key) {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
"RowsetWriter not support add_columns");
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ Status SegmentFlusher::close() {
bool SegmentFlusher::need_buffering() {
// buffering variants for schema change
return _context.write_type == DataWriteType::TYPE_SCHEMA_CHANGE &&
_context.tablet_schema->num_variant_columns() > 0;
(_context.tablet_schema->num_variant_columns() > 0 ||
!_context.tablet_schema->cluster_key_idxes().empty());
}

Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ Status ColumnReader::next_batch_of_zone_map(size_t* n, vectorized::MutableColumn
} else {
if (is_string) {
auto sv = (StringRef*)min_value->cell_ptr();
dst->insert_many_data(sv->data, sv->size, size);
dst->insert_data_repeatedly(sv->data, sv->size, size);
} else {
// TODO: the work may cause performance problem, opt latter
for (int i = 0; i < size; ++i) {
Expand Down Expand Up @@ -1508,7 +1508,7 @@ void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info,
value.cast_to_date();

int64 = binary_cast<VecDateTimeValue, vectorized::Int64>(value);
dst->insert_many_data(data_ptr, data_len, n);
dst->insert_data_repeatedly(data_ptr, data_len, n);
break;
}
case FieldType::OLAP_FIELD_TYPE_DATETIME: {
Expand All @@ -1526,7 +1526,7 @@ void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info,
value.to_datetime();

int64 = binary_cast<VecDateTimeValue, vectorized::Int64>(value);
dst->insert_many_data(data_ptr, data_len, n);
dst->insert_data_repeatedly(data_ptr, data_len, n);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL: {
Expand All @@ -1538,7 +1538,7 @@ void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info,
sizeof(FieldTypeTraits<FieldType::OLAP_FIELD_TYPE_DECIMAL>::CppType)); //decimal12_t
decimal12_t* d = (decimal12_t*)mem_value;
int128 = DecimalV2Value(d->integer, d->fraction).value();
dst->insert_many_data(data_ptr, data_len, n);
dst->insert_data_repeatedly(data_ptr, data_len, n);
break;
}
case FieldType::OLAP_FIELD_TYPE_STRING:
Expand All @@ -1548,7 +1548,7 @@ void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info,
case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
char* data_ptr = ((Slice*)mem_value)->data;
size_t data_len = ((Slice*)mem_value)->size;
dst->insert_many_data(data_ptr, data_len, n);
dst->insert_data_repeatedly(data_ptr, data_len, n);
break;
}
case FieldType::OLAP_FIELD_TYPE_ARRAY: {
Expand All @@ -1566,7 +1566,7 @@ void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info,
default: {
char* data_ptr = (char*)mem_value;
size_t data_len = type_size;
dst->insert_many_data(data_ptr, data_len, n);
dst->insert_data_repeatedly(data_ptr, data_len, n);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Status FulltextIndexSearcherBuilder::build(lucene::store::Directory* directory,
reader = lucene::index::IndexReader::open(
directory, config::inverted_index_read_buffer_size, close_directory);
} catch (const CLuceneError& e) {
std::vector<std::string> file_names;
directory->list(&file_names);
LOG(ERROR) << fmt::format("Directory list: {}", fmt::join(file_names, ", "));
std::string msg = "FulltextIndexSearcherBuilder build error: " + std::string(e.what());
if (e.number() == CL_ERR_EmptyIndexSegment) {
return Status::Error<ErrorCode::INVERTED_INDEX_FILE_CORRUPTED>(msg);
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/rowset/vertical_beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ template <class T>
requires std::is_base_of_v<BaseBetaRowsetWriter, T>
Status VerticalBetaRowsetWriter<T>::add_columns(const vectorized::Block* block,
const std::vector<uint32_t>& col_ids, bool is_key,
uint32_t max_rows_per_segment) {
uint32_t max_rows_per_segment,
bool has_cluster_key) {
auto& context = this->_context;

VLOG_NOTICE << "VerticalBetaRowsetWriter::add_columns, columns: " << block->columns();
Expand All @@ -71,7 +72,10 @@ Status VerticalBetaRowsetWriter<T>::add_columns(const vectorized::Block* block,
_cur_writer_idx = 0;
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows));
} else if (is_key) {
if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) {
// TODO for cluster key, always create new segment writer because the primary keys are
// sorted in SegmentWriter::_generate_primary_key_index, will cause too many segments
if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment ||
has_cluster_key) {
// segment is full, need flush columns and create new segment writer
RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), true));

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/vertical_beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class VerticalBetaRowsetWriter final : public T {
~VerticalBetaRowsetWriter() override = default;

Status add_columns(const vectorized::Block* block, const std::vector<uint32_t>& col_ids,
bool is_key, uint32_t max_rows_per_segment) override;
bool is_key, uint32_t max_rows_per_segment, bool has_cluster_key) override;

// flush last segment's column
Status flush_columns(bool is_key) override;
Expand Down
15 changes: 4 additions & 11 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,9 @@ class IColumn : public COW<IColumn> {
return nullptr;
}

// shrink the end zeros for CHAR type or ARRAY<CHAR> type
virtual MutablePtr get_shrinked_column() {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Method get_shrinked_column is not supported for " + get_name());
return nullptr;
}

// check the column whether could shrinked
// now support only in char type, or the nested type in complex type: array{char}, struct{char}, map{char}
virtual bool could_shrinked_column() { return false; }
// shrink the end zeros for ColumnStr(also for who has it nested). so nest column will call it for all nested.
// for non-str col, will reach here(do nothing). only ColumnStr will really shrink itself.
virtual void shrink_padding_chars() {}

/// Some columns may require finalization before using of other operations.
virtual void finalize() {}
Expand Down Expand Up @@ -290,7 +283,7 @@ class IColumn : public COW<IColumn> {
"Method insert_many_raw_data is not supported for " + get_name());
}

void insert_many_data(const char* pos, size_t length, size_t data_num) {
void insert_data_repeatedly(const char* pos, size_t length, size_t data_num) {
for (size_t i = 0; i < data_num; ++i) {
insert_data(pos, length);
}
Expand Down
12 changes: 2 additions & 10 deletions be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,8 @@ ColumnArray::ColumnArray(MutableColumnPtr&& nested_column) : data(std::move(nest
offsets = ColumnOffsets::create();
}

bool ColumnArray::could_shrinked_column() {
return data->could_shrinked_column();
}

MutableColumnPtr ColumnArray::get_shrinked_column() {
if (could_shrinked_column()) {
return ColumnArray::create(data->get_shrinked_column(), offsets->assume_mutable());
} else {
return ColumnArray::create(data->assume_mutable(), offsets->assume_mutable());
}
void ColumnArray::shrink_padding_chars() {
data->shrink_padding_chars();
}

std::string ColumnArray::get_name() const {
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
return Base::create(std::forward<Args>(args)...);
}

MutableColumnPtr get_shrinked_column() override;
bool could_shrinked_column() override;
void shrink_padding_chars() override;

/** On the index i there is an offset to the beginning of the i + 1 -th element. */
using ColumnOffsets = ColumnVector<Offset64>;
Expand Down
24 changes: 3 additions & 21 deletions be/src/vec/columns/column_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,27 +502,9 @@ ColumnPtr ColumnMap::replicate(const Offsets& offsets) const {
return res;
}

bool ColumnMap::could_shrinked_column() {
return keys_column->could_shrinked_column() || values_column->could_shrinked_column();
}

MutableColumnPtr ColumnMap::get_shrinked_column() {
MutableColumns new_columns(2);

if (keys_column->could_shrinked_column()) {
new_columns[0] = keys_column->get_shrinked_column();
} else {
new_columns[0] = keys_column->get_ptr();
}

if (values_column->could_shrinked_column()) {
new_columns[1] = values_column->get_shrinked_column();
} else {
new_columns[1] = values_column->get_ptr();
}

return ColumnMap::create(new_columns[0]->assume_mutable(), new_columns[1]->assume_mutable(),
offsets_column->assume_mutable());
void ColumnMap::shrink_padding_chars() {
keys_column->shrink_padding_chars();
values_column->shrink_padding_chars();
}

void ColumnMap::reserve(size_t n) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
const char* deserialize_and_insert_from_arena(const char* pos) override;

void update_hash_with_value(size_t n, SipHash& hash) const override;
MutableColumnPtr get_shrinked_column() override;
bool could_shrinked_column() override;
void shrink_padding_chars() override;
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override;
size_t filter(const Filter& filter) override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
Expand Down
13 changes: 2 additions & 11 deletions be/src/vec/columns/column_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,8 @@ ColumnNullable::ColumnNullable(MutableColumnPtr&& nested_column_, MutableColumnP
_need_update_has_null = true;
}

bool ColumnNullable::could_shrinked_column() {
return get_nested_column_ptr()->could_shrinked_column();
}

MutableColumnPtr ColumnNullable::get_shrinked_column() {
if (could_shrinked_column()) {
return ColumnNullable::create(get_nested_column_ptr()->get_shrinked_column(),
get_null_map_column_ptr());
} else {
return ColumnNullable::create(get_nested_column_ptr(), get_null_map_column_ptr());
}
void ColumnNullable::shrink_padding_chars() {
get_nested_column_ptr()->shrink_padding_chars();
}

void ColumnNullable::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable>, public N
return Base::create(std::forward<Args>(args)...);
}

MutableColumnPtr get_shrinked_column() override;
bool could_shrinked_column() override;
void shrink_padding_chars() override;

bool is_variable_length() const override { return nested_column->is_variable_length(); }

std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; }
Expand Down
6 changes: 0 additions & 6 deletions be/src/vec/columns/column_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,6 @@ class ColumnObject final : public COWHelper<IColumn, ColumnObject> {
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const override;

// Not implemented
MutableColumnPtr get_shrinked_column() override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"get_shrinked_column" + get_name());
}

Int64 get_int(size_t /*n*/) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "get_int" + get_name());
}
Expand Down
31 changes: 21 additions & 10 deletions be/src/vec/columns/column_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <cstring>

#include "util/memcpy_inlined.h"
#include "util/simd/bits.h"
Expand Down Expand Up @@ -81,16 +82,26 @@ MutableColumnPtr ColumnStr<T>::clone_resized(size_t to_size) const {
}

template <typename T>
MutableColumnPtr ColumnStr<T>::get_shrinked_column() {
auto shrinked_column = ColumnStr<T>::create();
shrinked_column->get_offsets().reserve(offsets.size());
shrinked_column->get_chars().reserve(chars.size());
for (int i = 0; i < size(); i++) {
StringRef str = get_data_at(i);
reinterpret_cast<ColumnStr<T>*>(shrinked_column.get())
->insert_data(str.data, strnlen(str.data, str.size));
}
return shrinked_column;
void ColumnStr<T>::shrink_padding_chars() {
if (size() == 0) {
return;
}
char* data = reinterpret_cast<char*>(chars.data());
auto* offset = offsets.data();
size_t size = offsets.size();

// deal the 0-th element. no need to move.
auto next_start = offset[0];
offset[0] = strnlen(data, size_at(0));
for (size_t i = 1; i < size; i++) {
// get the i-th length and whole move it to cover the last's trailing void
auto length = strnlen(data + next_start, offset[i] - next_start);
memmove(data + offset[i - 1], data + next_start, length);
// offset i will be changed. so save the old value for (i+1)-th to get its length.
next_start = offset[i];
offset[i] = offset[i - 1] + length;
}
chars.resize_fill(offsets.back()); // just call it to shrink memory here. no possible to expand.
}

// This method is only called by MutableBlock::merge_ignore_overflow
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {
/// For convenience, every string ends with terminating zero byte. Note that strings could contain zero bytes in the middle.
Chars chars;

// Start position of i-th element.
size_t ALWAYS_INLINE offset_at(ssize_t i) const { return offsets[i - 1]; }

/// Size of i-th element, including terminating zero.
Expand Down Expand Up @@ -117,8 +118,7 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {

MutableColumnPtr clone_resized(size_t to_size) const override;

MutableColumnPtr get_shrinked_column() override;
bool could_shrinked_column() override { return true; }
void shrink_padding_chars() override;

Field operator[](size_t n) const override {
assert(n < size());
Expand Down
Loading

0 comments on commit b950a7a

Please sign in to comment.