From a2419a8eb408c0c890d48e0905ee199eb3deb9a5 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 8 Nov 2023 11:51:40 +0800 Subject: [PATCH 1/2] [enhancement](sink) refactor code of auto partition and where clause and enable them on sinkv2 (#26432) For better performance and elasticity, we move memtable from loadchannel to sink, VTabletSinkV2 is introduced, then there are VTabletWriter and VTabletSinkV2 distributing rows to tablets. where clauses on mvs are executed in VTabletWriter, while VTabletSinkV2 needs it too. So common code is moved to row distribution. Actually, we can layer code by rows' data flow, then the code is much more understood and maintainable. ScanNode -> Sink/Writer (RowDistribution -> IndexChannel / DeltaWriter) --- be/src/vec/sink/vrow_distribution.cpp | 306 ++++++++++++++++ be/src/vec/sink/vrow_distribution.h | 160 +++++++++ be/src/vec/sink/vtablet_sink_v2.cpp | 101 +++--- be/src/vec/sink/vtablet_sink_v2.h | 22 +- be/src/vec/sink/writer/vtablet_writer.cpp | 408 +++++----------------- be/src/vec/sink/writer/vtablet_writer.h | 37 +- 6 files changed, 655 insertions(+), 379 deletions(-) create mode 100644 be/src/vec/sink/vrow_distribution.cpp create mode 100644 be/src/vec/sink/vrow_distribution.h diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp new file mode 100644 index 00000000000000..78d3b062045648 --- /dev/null +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -0,0 +1,306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vrow_distribution.h" + +#include +#include + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_vector.h" +#include "vec/sink/writer/vtablet_writer.h" + +namespace doris::vectorized { + +std::pair +VRowDistribution::_get_partition_function() { + return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; +} + +void VRowDistribution::_save_missing_values(vectorized::ColumnPtr col, + vectorized::DataTypePtr value_type) { + _partitions_need_create.clear(); + std::set deduper; + // de-duplication + for (auto row : _missing_map) { + deduper.emplace(value_type->to_string(*col, row)); + } + for (auto& value : deduper) { + TStringLiteral node; + node.value = value; + _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now + } +} + +Status VRowDistribution::_automatic_create_partition() { + SCOPED_TIMER(_add_partition_request_timer); + TCreatePartitionRequest request; + TCreatePartitionResult result; + request.__set_txn_id(_txn_id); + request.__set_db_id(_vpartition->db_id()); + request.__set_table_id(_vpartition->table_id()); + request.__set_partitionValues(_partitions_need_create); + + VLOG(1) << "automatic partition rpc begin request " << request; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + int time_out = _state->execution_timeout() * 1000; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->createPartition(result, request); + }, + time_out)); + + Status status(Status::create(result.status)); + VLOG(1) << "automatic partition rpc end response " << result; + if (result.status.status_code == TStatusCode::OK) { + // add new created partitions + RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); + RETURN_IF_ERROR(_on_partitions_created(_caller, &result)); + } + + return status; +} + +void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx, + std::vector& tablet_ids) { + tablet_ids.reserve(block->rows()); + for (int row_idx = 0; row_idx < block->rows(); row_idx++) { + if (_skip[row_idx]) { + continue; + } + auto& partition = _partitions[row_idx]; + auto& tablet_index = _tablet_indexes[row_idx]; + auto& index = partition->indexes[index_idx]; + + auto tablet_id = index.tablets[tablet_index]; + tablet_ids[row_idx] = tablet_id; + } +} + +void VRowDistribution::_filter_block_by_skip(vectorized::Block* block, + RowPartTabletIds& row_part_tablet_id) { + auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + + for (size_t i = 0; i < block->rows(); i++) { + if (!_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); + } + } +} + +Status VRowDistribution::_filter_block_by_skip_and_where_clause( + vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause, + RowPartTabletIds& row_part_tablet_id) { + // TODO + //SCOPED_RAW_TIMER(&_stat.where_clause_ns); + int result_index = -1; + size_t column_number = block->columns(); + RETURN_IF_ERROR(where_clause->execute(block, &result_index)); + + auto filter_column = block->get_by_position(result_index).column; + + auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + if (auto* nullable_column = + vectorized::check_and_get_column(*filter_column)) { + for (size_t i = 0; i < block->rows(); i++) { + if (nullable_column->get_bool_inline(i) && !_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); + } + } + } else if (auto* const_column = + vectorized::check_and_get_column(*filter_column)) { + bool ret = const_column->get_bool(0); + if (!ret) { + return Status::OK(); + } + // should we optimize? + _filter_block_by_skip(block, row_part_tablet_id); + } else { + auto& filter = assert_cast(*filter_column).get_data(); + for (size_t i = 0; i < block->rows(); i++) { + if (filter[i] != 0 && !_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); + } + } + } + + for (size_t i = block->columns() - 1; i >= column_number; i--) { + block->erase(i); + } + return Status::OK(); +} + +Status VRowDistribution::_filter_block(vectorized::Block* block, + std::vector& row_part_tablet_ids) { + for (int i = 0; i < _schema->indexes().size(); i++) { + _get_tablet_ids(block, i, _tablet_ids); + auto& where_clause = _schema->indexes()[i]->where_clause; + if (where_clause != nullptr) { + RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, + row_part_tablet_ids[i])); + } else { + _filter_block_by_skip(block, row_part_tablet_ids[i]); + } + } + return Status::OK(); +} + +Status VRowDistribution::_generate_rows_distribution_for_non_auto_parititon( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto num_rows = block->rows(); + + bool stop_processing = false; + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; + } + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); + return Status::OK(); +} + +Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( + vectorized::Block* block, int partition_col_idx, bool has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto num_rows = block->rows(); + std::vector partition_keys = _vpartition->get_partition_keys(); + + //TODO: use loop to create missing_vals for multi column. + CHECK(partition_keys.size() == 1) << "now support only 1 partition column for auto partitions."; + auto partition_col = block->get_by_position(partition_keys[0]); + _missing_map.clear(); + _missing_map.reserve(partition_col.column->size()); + bool stop_processing = false; + //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip, + &_missing_map)); + if (_missing_map.empty()) { + // we don't calculate it distribution when have missing values + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; + } + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); + } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create + auto [part_ctx, part_func] = _get_partition_function(); + auto return_type = part_func->data_type(); + + // expose the data column + vectorized::ColumnPtr range_left_col = block->get_by_position(partition_col_idx).column; + if (const auto* nullable = + check_and_get_column(*range_left_col)) { + range_left_col = nullable->get_nested_column_ptr(); + return_type = assert_cast(return_type.get()) + ->get_nested_type(); + } + // calc the end value and save them. + _save_missing_values(range_left_col, return_type); + // then call FE to create it. then FragmentExecutor will redo the load. + RETURN_IF_ERROR(_automatic_create_partition()); + // In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet + LOG(INFO) << "Auto created partition. Send block again."; + return Status::NeedSendAgain(""); + } // creating done + + return Status::OK(); +} + +void VRowDistribution::_reset_row_part_tablet_ids( + std::vector& row_part_tablet_ids, int64_t rows) { + row_part_tablet_ids.resize(_schema->indexes().size()); + for (auto& row_part_tablet_id : row_part_tablet_ids) { + auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + + row_ids.clear(); + partition_ids.clear(); + tablet_ids.clear(); + row_ids.reserve(rows); + partition_ids.reserve(rows); + tablet_ids.reserve(rows); + } +} + +Status VRowDistribution::generate_rows_distribution( + vectorized::Block& input_block, std::shared_ptr& block, + int64_t& filtered_rows, bool& has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto input_rows = input_block.rows(); + _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); + + int64_t prev_filtered_rows = + _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); + RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( + _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); + + _tablet_finder->clear_for_new_batch(); + _row_distribution_watch.start(); + auto num_rows = block->rows(); + _tablet_finder->filter_bitmap().Reset(num_rows); + + //reuse vars for find_tablets + _partitions.assign(num_rows, nullptr); + _skip.assign(num_rows, false); + _tablet_indexes.assign(num_rows, 0); + + // if there's projection of partition calc, we need to calc it first. + auto [part_ctx, part_func] = _get_partition_function(); + int partition_col_idx = -1; + if (_vpartition->is_projection_partition()) { + // calc the start value of missing partition ranges. + RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &partition_col_idx)); + VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); + // change the column to compare to transformed. + _vpartition->set_transformed_slots({(uint16_t)partition_col_idx}); + } + + if (_vpartition->is_auto_partition()) { + RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon( + block.get(), partition_col_idx, has_filtered_rows, row_part_tablet_ids)); + } else { // not auto partition + RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon( + block.get(), has_filtered_rows, row_part_tablet_ids)); + } + _row_distribution_watch.stop(); + filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - + prev_filtered_rows; + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h new file mode 100644 index 00000000000000..5da964d44fcbdd --- /dev/null +++ b/be/src/vec/sink/vrow_distribution.h @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +// IWYU pragma: no_include +#include +#include +#include + +#include +#include +#include + +#include "common/status.h" +#include "exec/tablet_info.h" +#include "runtime/types.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris::vectorized { + +class IndexChannel; +class VNodeChannel; + +// +class RowPartTabletIds { +public: + std::vector row_ids; + std::vector partition_ids; + std::vector tablet_ids; +}; + +typedef Status (*OnPartitionsCreated)(void*, TCreatePartitionResult*); + +class VRowDistributionContext { +public: + RuntimeState* state = nullptr; + OlapTableBlockConvertor* block_convertor = nullptr; + OlapTabletFinder* tablet_finder = nullptr; + VOlapTablePartitionParam* vpartition = nullptr; + RuntimeProfile::Counter* add_partition_request_timer = nullptr; + int64_t txn_id = -1; + ObjectPool* pool; + OlapTableLocationParam* location; + const VExprContextSPtrs* vec_output_expr_ctxs; + OnPartitionsCreated on_partitions_created; + void* caller; + std::shared_ptr schema; +}; + +class VRowDistribution { +public: + VRowDistribution() {} + virtual ~VRowDistribution() {} + + void init(VRowDistributionContext* ctx) { + _state = ctx->state; + _block_convertor = ctx->block_convertor; + _tablet_finder = ctx->tablet_finder; + _vpartition = ctx->vpartition; + _add_partition_request_timer = ctx->add_partition_request_timer; + _txn_id = ctx->txn_id; + _pool = ctx->pool; + _location = ctx->location; + _vec_output_expr_ctxs = ctx->vec_output_expr_ctxs; + _on_partitions_created = ctx->on_partitions_created; + _caller = ctx->caller; + _schema = ctx->schema; + } + + // auto partition + // mv where clause + // v1 needs index->node->row_ids - tabletids + // v2 needs index,tablet->rowids + Status generate_rows_distribution(vectorized::Block& input_block, + std::shared_ptr& block, + int64_t& filtered_rows, bool& has_filtered_rows, + std::vector& row_part_tablet_ids); + +private: + std::pair _get_partition_function(); + + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type); + + // create partitions when need for auto-partition table using #_partitions_need_create. + Status _automatic_create_partition(); + + void _get_tablet_ids(vectorized::Block* block, int32_t index_idx, + std::vector& tablet_ids); + + void _filter_block_by_skip(vectorized::Block* block, RowPartTabletIds& row_part_tablet_id); + + Status _filter_block_by_skip_and_where_clause(vectorized::Block* block, + const vectorized::VExprContextSPtr& where_clause, + RowPartTabletIds& row_part_tablet_id); + + Status _filter_block(vectorized::Block* block, + std::vector& row_part_tablet_ids); + + Status _generate_rows_distribution_for_auto_parititon( + vectorized::Block* block, int partition_col_idx, bool has_filtered_rows, + std::vector& row_part_tablet_ids); + + Status _generate_rows_distribution_for_non_auto_parititon( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids); + + void _reset_row_part_tablet_ids(std::vector& row_part_tablet_ids, + int64_t rows); + +private: + RuntimeState* _state = nullptr; + + // support only one partition column now + std::vector> _partitions_need_create; + + MonotonicStopWatch _row_distribution_watch; + OlapTableBlockConvertor* _block_convertor = nullptr; + OlapTabletFinder* _tablet_finder = nullptr; + VOlapTablePartitionParam* _vpartition = nullptr; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; + int64_t _txn_id = -1; + ObjectPool* _pool; + OlapTableLocationParam* _location = nullptr; + // std::function _on_partition_created; + // int64_t _number_output_rows = 0; + const VExprContextSPtrs* _vec_output_expr_ctxs; + OnPartitionsCreated _on_partitions_created = nullptr; + void* _caller; + std::shared_ptr _schema; + + // reuse for find_tablet. + std::vector _partitions; + std::vector _skip; + std::vector _tablet_indexes; + std::vector _tablet_ids; + std::vector _missing_map; // indice of missing values in partition_col +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 696a66ac3fbad4..69a372e0e3886f 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -77,6 +77,42 @@ VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_de VOlapTableSinkV2::~VOlapTableSinkV2() = default; +Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) { + // add new tablet locations. it will use by address. so add to pool + auto* new_locations = _pool->add(new std::vector(result->tablets)); + _location->add_locations(*new_locations); + + // update new node info + _nodes_info->add_nodes(result->nodes); + + // incremental open stream + + return Status::OK(); +} + +static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { + return static_cast(writer)->on_partitions_created(result); +} + +void VOlapTableSinkV2::_init_row_distribution() { + VRowDistributionContext ctx; + + ctx.state = _state; + ctx.block_convertor = _block_convertor.get(); + ctx.tablet_finder = _tablet_finder.get(); + ctx.vpartition = _vpartition; + ctx.add_partition_request_timer = _add_partition_request_timer; + ctx.txn_id = _txn_id; + ctx.pool = _pool; + ctx.location = _location; + ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs; + ctx.on_partitions_created = &vectorized::on_partitions_created; + ctx.caller = (void*)this; + ctx.schema = _schema; + + _row_distribution.init(&ctx); +} + Status VOlapTableSinkV2::init(const TDataSink& t_sink) { DCHECK(t_sink.__isset.olap_table_sink); auto& table_sink = t_sink.olap_table_sink; @@ -104,7 +140,9 @@ Status VOlapTableSinkV2::init(const TDataSink& t_sink) { } _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); _tablet_finder = std::make_unique(_vpartition, find_tablet_mode); - return _vpartition->init(); + RETURN_IF_ERROR(_vpartition->init()); + + return Status::OK(); } Status VOlapTableSinkV2::prepare(RuntimeState* state) { @@ -168,6 +206,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) { _build_tablet_node_mapping(); RETURN_IF_ERROR(_open_streams(state->backend_id())); + _init_row_distribution(); return Status::OK(); } @@ -222,30 +261,25 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() { } } -void VOlapTableSinkV2::_generate_rows_for_tablet( - RowsForTablet& rows_for_tablet, const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt) { - for (int row_idx = 0; row_idx < row_cnt; row_idx++) { - if (skip[row_idx]) { - continue; - } - - auto& partition = partitions[row_idx]; - auto& tablet_index = tablet_indexes[row_idx]; +void VOlapTableSinkV2::_generate_rows_for_tablet(std::vector& row_part_tablet_ids, + RowsForTablet& rows_for_tablet) { + for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) { + auto& row_ids = row_part_tablet_ids[index_idx].row_ids; + auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; + auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids; - for (const auto& index : partition->indexes) { - auto tablet_id = index.tablets[tablet_index]; + for (int i = 0; i < row_ids.size(); i++) { + auto& tablet_id = tablet_ids[i]; auto it = rows_for_tablet.find(tablet_id); if (it == rows_for_tablet.end()) { Rows rows; - rows.partition_id = partition->id; - rows.index_id = index.index_id; - rows.row_idxes.reserve(row_cnt); + rows.partition_id = partition_ids[i]; + rows.index_id = _schema->indexes()[index_idx]->index_id; + rows.row_idxes.reserve(row_ids.size()); auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); it = tmp_it; } - it->second.row_idxes.push_back(row_idx); + it->second.row_idxes.push_back(row_ids[i]); _number_output_rows++; } } @@ -285,37 +319,18 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc DorisMetrics::instance()->load_rows->increment(input_rows); DorisMetrics::instance()->load_bytes->increment(input_bytes); - std::shared_ptr block; bool has_filtered_rows = false; - RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( - state, input_block, block, _output_vexpr_ctxs, input_rows, has_filtered_rows)); - - // clear and release the references of columns - input_block->clear(); + int64_t filtered_rows = 0; SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. - bool stop_processing = false; - RowsForTablet rows_for_tablet; - _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); - const auto num_rows = input_rows; - const auto* __restrict filter_map = _block_convertor->filter_map(); - //reuse vars - _partitions.assign(num_rows, nullptr); - _skip.assign(num_rows, false); - _tablet_indexes.assign(num_rows, 0); - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - _skip[i] = _skip[i] || filter_map[i]; - } - } - _generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes, _skip, num_rows); + std::shared_ptr block; + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + *input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + RowsForTablet rows_for_tablet; + _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); _row_distribution_watch.stop(); diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 63f0985eb090ce..a67c4e65cd2cae 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -64,6 +64,7 @@ #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/vrow_distribution.h" namespace doris { class DeltaWriterV2; @@ -112,17 +113,20 @@ class VOlapTableSinkV2 final : public DataSink { Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status close_status) override; + Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; + Status on_partitions_created(TCreatePartitionResult* result); + private: + void _init_row_distribution(); + Status _open_streams(int64_t src_id); void _build_tablet_node_mapping(); - void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet, - const std::vector& partitions, - const std::vector& tablet_indexes, - const std::vector& skip, size_t row_cnt); + void _generate_rows_for_tablet(std::vector& row_part_tablet_ids, + RowsForTablet& rows_for_tablet); Status _write_memtable(std::shared_ptr block, int64_t tablet_id, const Rows& rows, const Streams& streams); @@ -168,11 +172,6 @@ class VOlapTableSinkV2 final : public DataSink { int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; - // reuse for find_tablet - std::vector _partitions; - std::vector _skip; - std::vector _tablet_indexes; - MonotonicStopWatch _row_distribution_watch; RuntimeProfile::Counter* _input_rows_counter = nullptr; @@ -187,6 +186,7 @@ class VOlapTableSinkV2 final : public DataSink { RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _close_writer_timer = nullptr; RuntimeProfile::Counter* _close_load_timer = nullptr; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; // Save the status of close() method Status _close_status; @@ -204,6 +204,10 @@ class VOlapTableSinkV2 final : public DataSink { std::unordered_map> _streams_for_node; size_t _stream_index = 0; std::shared_ptr _delta_writer_for_tablet; + + VRowDistribution _row_distribution; + // reuse to avoid frequent memory allocation and release. + std::vector _row_part_tablet_ids; }; } // namespace vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index d0720255695b3f..786c0e21105ac9 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -152,6 +152,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectorprepare(state, *_parent->_output_row_desc)); RETURN_IF_ERROR(_where_clause->open(state)); } + return Status::OK(); } @@ -488,52 +489,6 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); } - std::unique_ptr temp_payload = nullptr; - if (_index_channel != nullptr && _index_channel->get_where_clause() != nullptr) { - SCOPED_RAW_TIMER(&_stat.where_clause_ns); - temp_payload.reset(new Payload( - std::unique_ptr(new vectorized::IColumn::Selector()), - std::vector())); - int result_index = -1; - size_t column_number = block->columns(); - RETURN_IF_ERROR(_index_channel->get_where_clause()->execute(block, &result_index)); - - auto& row_ids = *payload->first; - auto& tablets_ids = payload->second; - - auto filter_column = block->get_by_position(result_index).column; - - if (auto* nullable_column = - vectorized::check_and_get_column(*filter_column)) { - for (size_t i = 0; i < payload->second.size(); i++) { - if (nullable_column->get_bool_inline(row_ids[i])) { - temp_payload->first->emplace_back(row_ids[i]); - temp_payload->second.emplace_back(tablets_ids[i]); - } - } - payload = temp_payload.get(); - } else if (auto* const_column = vectorized::check_and_get_column( - *filter_column)) { - bool ret = const_column->get_bool(0); - if (!ret) { - return Status::OK(); - } - } else { - auto& filter = assert_cast(*filter_column).get_data(); - for (size_t i = 0; i < payload->second.size(); i++) { - if (filter[row_ids[i]] != 0) { - temp_payload->first->emplace_back(row_ids[i]); - temp_payload->second.emplace_back(tablets_ids[i]); - } - } - payload = temp_payload.get(); - } - - for (size_t i = block->columns() - 1; i >= column_number; i--) { - block->erase(i); - } - } - SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); if (is_append) { // Do not split the data of the block by tablets but append it to a single delta writer. @@ -1095,6 +1050,43 @@ Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* pr return Status::OK(); } +Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) { + // add new tablet locations. it will use by address. so add to pool + auto* new_locations = _pool->add(new std::vector(result->tablets)); + _location->add_locations(*new_locations); + + // update new node info + _nodes_info->add_nodes(result->nodes); + + // incremental open node channel + RETURN_IF_ERROR(_incremental_open_node_channel(result->partitions)); + + return Status::OK(); +} + +static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { + return static_cast(writer)->on_partitions_created(result); +} + +void VTabletWriter::_init_row_distribution() { + VRowDistributionContext ctx; + + ctx.state = _state; + ctx.block_convertor = _block_convertor.get(); + ctx.tablet_finder = _tablet_finder.get(); + ctx.vpartition = _vpartition; + ctx.add_partition_request_timer = _add_partition_request_timer; + ctx.txn_id = _txn_id; + ctx.pool = _pool; + ctx.location = _location; + ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; + ctx.on_partitions_created = &vectorized::on_partitions_created; + ctx.caller = (void*)this; + ctx.schema = _schema; + + _row_distribution.init(&ctx); +} + Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { DCHECK(_t_sink.__isset.olap_table_sink); auto& table_sink = _t_sink.olap_table_sink; @@ -1243,49 +1235,12 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); } + _init_row_distribution(); + _inited = true; return Status::OK(); } -Status VTabletWriter::_automatic_create_partition() { - SCOPED_TIMER(_add_partition_request_timer); - TCreatePartitionRequest request; - TCreatePartitionResult result; - request.__set_txn_id(_txn_id); - request.__set_db_id(_vpartition->db_id()); - request.__set_table_id(_vpartition->table_id()); - request.__set_partitionValues(_partitions_need_create); - - VLOG(1) << "automatic partition rpc begin request " << request; - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; - int time_out = _state->execution_timeout() * 1000; - RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result](FrontendServiceConnection& client) { - client->createPartition(result, request); - }, - time_out)); - - Status status(Status::create(result.status)); - VLOG(1) << "automatic partition rpc end response " << result; - if (result.status.status_code == TStatusCode::OK) { - // add new created partitions - RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); - - // add new tablet locations. it will use by address. so add to pool - auto* new_locations = _pool->add(new std::vector(result.tablets)); - _location->add_locations(*new_locations); - - // update new node info - _nodes_info->add_nodes(result.nodes); - - // incremental open node channel - RETURN_IF_ERROR(_incremental_open_node_channel(result.partitions)); - } - - return status; -} - Status VTabletWriter::_incremental_open_node_channel( const std::vector& partitions) { // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. @@ -1337,129 +1292,11 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } -// Generate channel payload for sinking data to differenct node channel -// Payload = std::pair, std::vector>; -// first = row_id, second = vector -void VTabletWriter::_generate_row_distribution_payload( - ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt) { - for (int row_idx = 0; row_idx < row_cnt; row_idx++) { - if (skip[row_idx]) { - continue; - } - const auto& partition = partitions[row_idx]; - const auto& tablet_index = tablet_indexes[row_idx]; - - for (int index_num = 0; index_num < partition->indexes.size(); - ++index_num) { // partition->indexes = [index, tablets...] - - auto tablet_id = partition->indexes[index_num].tablets[tablet_index]; - auto it = _channels[index_num]->_channels_by_tablet.find( - tablet_id); // (tablet_id, VNodeChannel) where this tablet locate - - DCHECK(it != _channels[index_num]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - - std::vector>& tablet_locations = it->second; - std::unordered_map& payloads_this_index = - channel_to_payload[index_num]; // payloads of this index in every node - - for (const auto& locate_node : tablet_locations) { - auto payload_it = - payloads_this_index.find(locate_node.get()); // - if (payload_it == payloads_this_index.end()) { - auto [tmp_it, _] = payloads_this_index.emplace( - locate_node.get(), - Payload {std::make_unique(), - std::vector()}); - payload_it = tmp_it; - payload_it->second.first->reserve(row_cnt); - payload_it->second.second.reserve(row_cnt); - } - payload_it->second.first->push_back(row_idx); - payload_it->second.second.push_back(tablet_id); - } - _number_output_rows++; - } - } -} - -Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows) { - // only need to calculate one value for single partition. - std::vector partitions(1, nullptr); - std::vector skip(1, false); - std::vector tablet_indexes(1, 0); - bool stop_processing = false; - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, tablet_indexes, - stop_processing, skip)); - - const VOlapTablePartition* partition = nullptr; - uint32_t tablet_index = 0; - for (size_t i = 0; i < num_rows; i++) { - if (!skip[i]) { - partition = partitions[i]; - tablet_index = tablet_indexes[i]; - break; - } - } - if (partition == nullptr) { - return Status::OK(); - } - - for (int j = 0; j < partition->indexes.size(); ++j) { - auto tid = partition->indexes[j].tablets[tablet_index]; - auto it = _channels[j]->_channels_by_tablet.find(tid); - DCHECK(it != _channels[j]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - int64_t row_cnt = 0; - for (const auto& channel : it->second) { - if (!channel_to_payload[j].contains(channel.get())) { - channel_to_payload[j].insert( - {channel.get(), Payload {std::make_unique(), - std::vector()}}); - } - auto& selector = channel_to_payload[j][channel.get()].first; - auto& tablet_ids = channel_to_payload[j][channel.get()].second; - for (int32_t i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { - continue; - } - selector->push_back(i); - } - tablet_ids.resize(selector->size(), tid); - row_cnt = selector->size(); - } - _number_output_rows += row_cnt; - } - return Status::OK(); -} - std::pair VTabletWriter::_get_partition_function() { return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; } -void VTabletWriter::_save_missing_values(vectorized::ColumnPtr col, - vectorized::DataTypePtr value_type, - std::vector filter) { - _partitions_need_create.clear(); - std::set deduper; - // de-duplication - for (auto row : filter) { - deduper.emplace(value_type->to_string(*col, row)); - } - for (auto& value : deduper) { - TStringLiteral node; - node.value = value; - _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now - } -} - Status VTabletWriter::_cancel_channel_and_check_intolerable_failure( Status status, const std::string& err_msg, const std::shared_ptr ich, const std::shared_ptr nch) { @@ -1705,6 +1542,46 @@ Status VTabletWriter::close(Status exec_status) { return _close_status; } +void VTabletWriter::_generate_one_index_channel_payload( + RowPartTabletIds& row_part_tablet_id, int32_t index_idx, + ChannelDistributionPayload& channel_payload) { + auto& row_ids = row_part_tablet_id.row_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + + size_t row_cnt = row_ids.size(); + + for (int i = 0; i < row_ids.size(); i++) { + // (tablet_id, VNodeChannel) where this tablet locate + auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]); + DCHECK(it != _channels[index_idx]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_ids[i]; + + std::vector>& tablet_locations = it->second; + for (const auto& locate_node : tablet_locations) { + auto payload_it = channel_payload.find(locate_node.get()); // + if (payload_it == channel_payload.end()) { + auto [tmp_it, _] = channel_payload.emplace( + locate_node.get(), + Payload {std::make_unique(), + std::vector()}); + payload_it = tmp_it; + payload_it->second.first->reserve(row_cnt); + payload_it->second.second.reserve(row_cnt); + } + payload_it->second.first->push_back(row_ids[i]); + payload_it->second.second.push_back(tablet_ids[i]); + } + } +} + +void VTabletWriter::_generate_index_channels_payloads( + std::vector& row_part_tablet_ids, + ChannelDistributionPayloadVec& payload) { + for (int i = 0; i < _schema->indexes().size(); i++) { + _generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i]); + } +} + Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -1719,6 +1596,19 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { return status; } SCOPED_TIMER(_profile->total_time_counter()); + + std::shared_ptr block; + bool has_filtered_rows = false; + int64_t filtered_rows = 0; + + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + + ChannelDistributionPayloadVec channel_to_payload; + + channel_to_payload.resize(_channels.size()); + _generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload); + _number_input_rows += rows; // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. @@ -1727,112 +1617,6 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { DorisMetrics::instance()->load_rows->increment(rows); DorisMetrics::instance()->load_bytes->increment(bytes); - std::shared_ptr block; - bool has_filtered_rows = false; - int64_t filtered_rows = - _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); - RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( - _state, &input_block, block, _vec_output_expr_ctxs, rows, has_filtered_rows)); - - SCOPED_RAW_TIMER(&_send_data_ns); - // This is just for passing compilation. - bool stop_processing = false; - ChannelDistributionPayload channel_to_payload; - channel_to_payload.resize(_channels.size()); - _tablet_finder->clear_for_new_batch(); - _row_distribution_watch.start(); - auto num_rows = block->rows(); - _tablet_finder->filter_bitmap().Reset(num_rows); - size_t partition_num = _vpartition->get_partitions().size(); - if (!_vpartition->is_auto_partition() && partition_num == 1 && - _tablet_finder->is_find_tablet_every_sink()) { - RETURN_IF_ERROR(_single_partition_generate(_state, block.get(), channel_to_payload, - num_rows, has_filtered_rows)); - } else { - // if there's projection of partition calc, we need to calc it first. - auto [part_ctx, part_func] = _get_partition_function(); - int result_idx = -1; - if (_vpartition->is_projection_partition()) { - // calc the start value of missing partition ranges. - RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx)); - VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); - // change the column to compare to transformed. - _vpartition->set_transformed_slots({(uint16_t)result_idx}); - } - - if (_vpartition->is_auto_partition()) { - std::vector partition_keys = _vpartition->get_partition_keys(); - //TODO: use loop to create missing_vals for multi column. - CHECK(partition_keys.size() == 1) - << "now support only 1 partition column for auto partitions."; - auto partition_col = block->get_by_position(partition_keys[0]); - - std::vector missing_map; // indice of missing values in partition_col - missing_map.reserve(partition_col.column->size()); - - // try to find tablet and save missing value - std::vector partitions(num_rows, nullptr); - std::vector skip(num_rows, false); - std::vector tablet_indexes(num_rows, 0); - - //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip, - &missing_map)); - - if (missing_map.empty()) { - // we don't calculate it distribution when have missing values - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, - skip, num_rows); - } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create - auto return_type = part_func->data_type(); - - // expose the data column - vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; - if (const auto* nullable = - check_and_get_column(*range_left_col)) { - range_left_col = nullable->get_nested_column_ptr(); - return_type = - assert_cast(return_type.get()) - ->get_nested_type(); - } - // calc the end value and save them. - _save_missing_values(range_left_col, return_type, missing_map); - // then call FE to create it. then FragmentExecutor will redo the load. - RETURN_IF_ERROR(_automatic_create_partition()); - // now we need to rollback the metrics - _number_input_rows -= rows; - _state->update_num_rows_load_total(-rows); - _state->update_num_bytes_load_total(-bytes); - DorisMetrics::instance()->load_rows->increment(-rows); - DorisMetrics::instance()->load_bytes->increment(-bytes); - // In the next round, we will _generate_row_distribution_payload again to get right payload of new tablet - LOG(INFO) << "Auto created partition. Send block again."; - return Status::NeedSendAgain(""); - } // creating done - } else { // not auto partition - std::vector partitions(num_rows, nullptr); - std::vector skip(num_rows, false); - std::vector tablet_indexes(num_rows, 0); - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, - num_rows); - } - } - _row_distribution_watch.stop(); // Random distribution and the block belongs to a single tablet, we could optimize to append the whole // block into node channel. bool load_block_to_single_tablet = @@ -1855,10 +1639,8 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { } if (_group_commit) { - _group_commit_block(&input_block, num_rows, - _block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows() - filtered_rows, - _state, block.get(), _block_convertor.get(), _tablet_finder.get()); + _group_commit_block(&input_block, block->rows(), filtered_rows, _state, block.get(), + _block_convertor.get(), _tablet_finder.get()); } // TODO: Before load, we need to projection unuseful column // auto slots = _schema->tuple_desc()->slots(); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index c8d5d1c2ce9ae9..7fee45be371080 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -74,6 +74,7 @@ #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/runtime/vfile_format_transformer.h" +#include "vec/sink/vrow_distribution.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" #include "vec/sink/writer/async_result_writer.h" @@ -204,9 +205,6 @@ class ReusableClosure final : public google::protobuf::Closure { class IndexChannel; class VTabletWriter; -// pair -using Payload = std::pair, std::vector>; - class VNodeChannelStat { public: VNodeChannelStat& operator+=(const VNodeChannelStat& stat) { @@ -221,6 +219,9 @@ class VNodeChannelStat { int64_t append_node_channel_ns = 0; }; +// pair +using Payload = std::pair, std::vector>; + // every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures. class VNodeChannel { public: @@ -485,6 +486,7 @@ class IndexChannel { private: friend class VNodeChannel; friend class VTabletWriter; + friend class VRowDistribution; VTabletWriter* _parent; int64_t _index_id; @@ -546,32 +548,34 @@ class VTabletWriter final : public AsyncResultWriter { bool is_close_done(); + Status on_partitions_created(TCreatePartitionResult* result); + private: friend class VNodeChannel; friend class IndexChannel; - using ChannelDistributionPayload = std::vector>; + using ChannelDistributionPayload = std::unordered_map; + using ChannelDistributionPayloadVec = std::vector>; + + void _init_row_distribution(); Status _init(RuntimeState* state, RuntimeProfile* profile); - // payload for every row - void _generate_row_distribution_payload(ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, - const std::vector& skip, size_t row_cnt); + void _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple, + int32_t index_idx, + ChannelDistributionPayload& channel_payload); - Status _single_partition_generate(RuntimeState* state, vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows); + void _generate_index_channels_payloads(std::vector& row_part_tablet_ids, + ChannelDistributionPayloadVec& payload); Status _cancel_channel_and_check_intolerable_failure(Status status, const std::string& err_msg, const std::shared_ptr ich, const std::shared_ptr nch); - void _cancel_all_channel(Status status); - std::pair _get_partition_function(); + void _cancel_all_channel(Status status); + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, std::vector filter); @@ -689,6 +693,11 @@ class VTabletWriter final : public AsyncResultWriter { RuntimeProfile* _profile = nullptr; // not owned, set when open bool _group_commit = false; std::shared_ptr _wal_writer = nullptr; + + VRowDistribution _row_distribution; + // reuse to avoid frequent memory allocation and release. + std::vector _row_part_tablet_ids; + int64_t _tb_id; int64_t _db_id; int64_t _wal_id; From f8f3bc6a6701fad5d2e94fc6d54c7ded4b8912c9 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 8 Nov 2023 11:52:08 +0800 Subject: [PATCH 2/2] Revert "[Chore](ci)Temporarily cancel the mandatory restrictions of ShellCheck (#26553)" (#26565) This reverts commit b7c81bc73625b26df746fc2213980c16b9d8f1a0. --- .asf.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.asf.yaml b/.asf.yaml index 59f6fd20e75068..f8b7e0913cf61b 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -59,6 +59,7 @@ github: - BE UT (Doris BE UT) - Build Broker - Build Documents + - ShellCheck - clickbench-new (clickbench) - Build Third Party Libraries (Linux) - Build Third Party Libraries (macOS)