diff --git a/.asf.yaml b/.asf.yaml index 59f6fd20e75068..f8b7e0913cf61b 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -59,6 +59,7 @@ github: - BE UT (Doris BE UT) - Build Broker - Build Documents + - ShellCheck - clickbench-new (clickbench) - Build Third Party Libraries (Linux) - Build Third Party Libraries (macOS) diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp new file mode 100644 index 00000000000000..78d3b062045648 --- /dev/null +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -0,0 +1,306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vrow_distribution.h" + +#include +#include + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/columns/column_const.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_vector.h" +#include "vec/sink/writer/vtablet_writer.h" + +namespace doris::vectorized { + +std::pair +VRowDistribution::_get_partition_function() { + return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; +} + +void VRowDistribution::_save_missing_values(vectorized::ColumnPtr col, + vectorized::DataTypePtr value_type) { + _partitions_need_create.clear(); + std::set deduper; + // de-duplication + for (auto row : _missing_map) { + deduper.emplace(value_type->to_string(*col, row)); + } + for (auto& value : deduper) { + TStringLiteral node; + node.value = value; + _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now + } +} + +Status VRowDistribution::_automatic_create_partition() { + SCOPED_TIMER(_add_partition_request_timer); + TCreatePartitionRequest request; + TCreatePartitionResult result; + request.__set_txn_id(_txn_id); + request.__set_db_id(_vpartition->db_id()); + request.__set_table_id(_vpartition->table_id()); + request.__set_partitionValues(_partitions_need_create); + + VLOG(1) << "automatic partition rpc begin request " << request; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + int time_out = _state->execution_timeout() * 1000; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->createPartition(result, request); + }, + time_out)); + + Status status(Status::create(result.status)); + VLOG(1) << "automatic partition rpc end response " << result; + if (result.status.status_code == TStatusCode::OK) { + // add new created partitions + RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); + RETURN_IF_ERROR(_on_partitions_created(_caller, &result)); + } + + return status; +} + +void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t index_idx, + std::vector& tablet_ids) { + tablet_ids.reserve(block->rows()); + for (int row_idx = 0; row_idx < block->rows(); row_idx++) { + if (_skip[row_idx]) { + continue; + } + auto& partition = _partitions[row_idx]; + auto& tablet_index = _tablet_indexes[row_idx]; + auto& index = partition->indexes[index_idx]; + + auto tablet_id = index.tablets[tablet_index]; + tablet_ids[row_idx] = tablet_id; + } +} + +void VRowDistribution::_filter_block_by_skip(vectorized::Block* block, + RowPartTabletIds& row_part_tablet_id) { + auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + + for (size_t i = 0; i < block->rows(); i++) { + if (!_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); + } + } +} + +Status VRowDistribution::_filter_block_by_skip_and_where_clause( + vectorized::Block* block, const vectorized::VExprContextSPtr& where_clause, + RowPartTabletIds& row_part_tablet_id) { + // TODO + //SCOPED_RAW_TIMER(&_stat.where_clause_ns); + int result_index = -1; + size_t column_number = block->columns(); + RETURN_IF_ERROR(where_clause->execute(block, &result_index)); + + auto filter_column = block->get_by_position(result_index).column; + + auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + if (auto* nullable_column = + vectorized::check_and_get_column(*filter_column)) { + for (size_t i = 0; i < block->rows(); i++) { + if (nullable_column->get_bool_inline(i) && !_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); + } + } + } else if (auto* const_column = + vectorized::check_and_get_column(*filter_column)) { + bool ret = const_column->get_bool(0); + if (!ret) { + return Status::OK(); + } + // should we optimize? + _filter_block_by_skip(block, row_part_tablet_id); + } else { + auto& filter = assert_cast(*filter_column).get_data(); + for (size_t i = 0; i < block->rows(); i++) { + if (filter[i] != 0 && !_skip[i]) { + row_ids.emplace_back(i); + partition_ids.emplace_back(_partitions[i]->id); + tablet_ids.emplace_back(_tablet_ids[i]); + } + } + } + + for (size_t i = block->columns() - 1; i >= column_number; i--) { + block->erase(i); + } + return Status::OK(); +} + +Status VRowDistribution::_filter_block(vectorized::Block* block, + std::vector& row_part_tablet_ids) { + for (int i = 0; i < _schema->indexes().size(); i++) { + _get_tablet_ids(block, i, _tablet_ids); + auto& where_clause = _schema->indexes()[i]->where_clause; + if (where_clause != nullptr) { + RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, where_clause, + row_part_tablet_ids[i])); + } else { + _filter_block_by_skip(block, row_part_tablet_ids[i]); + } + } + return Status::OK(); +} + +Status VRowDistribution::_generate_rows_distribution_for_non_auto_parititon( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto num_rows = block->rows(); + + bool stop_processing = false; + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; + } + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); + return Status::OK(); +} + +Status VRowDistribution::_generate_rows_distribution_for_auto_parititon( + vectorized::Block* block, int partition_col_idx, bool has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto num_rows = block->rows(); + std::vector partition_keys = _vpartition->get_partition_keys(); + + //TODO: use loop to create missing_vals for multi column. + CHECK(partition_keys.size() == 1) << "now support only 1 partition column for auto partitions."; + auto partition_col = block->get_by_position(partition_keys[0]); + _missing_map.clear(); + _missing_map.reserve(partition_col.column->size()); + bool stop_processing = false; + //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip, + &_missing_map)); + if (_missing_map.empty()) { + // we don't calculate it distribution when have missing values + if (has_filtered_rows) { + for (int i = 0; i < num_rows; i++) { + _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; + } + } + RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); + } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create + auto [part_ctx, part_func] = _get_partition_function(); + auto return_type = part_func->data_type(); + + // expose the data column + vectorized::ColumnPtr range_left_col = block->get_by_position(partition_col_idx).column; + if (const auto* nullable = + check_and_get_column(*range_left_col)) { + range_left_col = nullable->get_nested_column_ptr(); + return_type = assert_cast(return_type.get()) + ->get_nested_type(); + } + // calc the end value and save them. + _save_missing_values(range_left_col, return_type); + // then call FE to create it. then FragmentExecutor will redo the load. + RETURN_IF_ERROR(_automatic_create_partition()); + // In the next round, we will _generate_rows_distribution_payload again to get right payload of new tablet + LOG(INFO) << "Auto created partition. Send block again."; + return Status::NeedSendAgain(""); + } // creating done + + return Status::OK(); +} + +void VRowDistribution::_reset_row_part_tablet_ids( + std::vector& row_part_tablet_ids, int64_t rows) { + row_part_tablet_ids.resize(_schema->indexes().size()); + for (auto& row_part_tablet_id : row_part_tablet_ids) { + auto& row_ids = row_part_tablet_id.row_ids; + auto& partition_ids = row_part_tablet_id.partition_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + + row_ids.clear(); + partition_ids.clear(); + tablet_ids.clear(); + row_ids.reserve(rows); + partition_ids.reserve(rows); + tablet_ids.reserve(rows); + } +} + +Status VRowDistribution::generate_rows_distribution( + vectorized::Block& input_block, std::shared_ptr& block, + int64_t& filtered_rows, bool& has_filtered_rows, + std::vector& row_part_tablet_ids) { + auto input_rows = input_block.rows(); + _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows); + + int64_t prev_filtered_rows = + _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); + RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( + _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, has_filtered_rows)); + + _tablet_finder->clear_for_new_batch(); + _row_distribution_watch.start(); + auto num_rows = block->rows(); + _tablet_finder->filter_bitmap().Reset(num_rows); + + //reuse vars for find_tablets + _partitions.assign(num_rows, nullptr); + _skip.assign(num_rows, false); + _tablet_indexes.assign(num_rows, 0); + + // if there's projection of partition calc, we need to calc it first. + auto [part_ctx, part_func] = _get_partition_function(); + int partition_col_idx = -1; + if (_vpartition->is_projection_partition()) { + // calc the start value of missing partition ranges. + RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &partition_col_idx)); + VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); + // change the column to compare to transformed. + _vpartition->set_transformed_slots({(uint16_t)partition_col_idx}); + } + + if (_vpartition->is_auto_partition()) { + RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon( + block.get(), partition_col_idx, has_filtered_rows, row_part_tablet_ids)); + } else { // not auto partition + RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon( + block.get(), has_filtered_rows, row_part_tablet_ids)); + } + _row_distribution_watch.stop(); + filtered_rows = _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - + prev_filtered_rows; + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h new file mode 100644 index 00000000000000..5da964d44fcbdd --- /dev/null +++ b/be/src/vec/sink/vrow_distribution.h @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +// IWYU pragma: no_include +#include +#include +#include + +#include +#include +#include + +#include "common/status.h" +#include "exec/tablet_info.h" +#include "runtime/types.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris::vectorized { + +class IndexChannel; +class VNodeChannel; + +// +class RowPartTabletIds { +public: + std::vector row_ids; + std::vector partition_ids; + std::vector tablet_ids; +}; + +typedef Status (*OnPartitionsCreated)(void*, TCreatePartitionResult*); + +class VRowDistributionContext { +public: + RuntimeState* state = nullptr; + OlapTableBlockConvertor* block_convertor = nullptr; + OlapTabletFinder* tablet_finder = nullptr; + VOlapTablePartitionParam* vpartition = nullptr; + RuntimeProfile::Counter* add_partition_request_timer = nullptr; + int64_t txn_id = -1; + ObjectPool* pool; + OlapTableLocationParam* location; + const VExprContextSPtrs* vec_output_expr_ctxs; + OnPartitionsCreated on_partitions_created; + void* caller; + std::shared_ptr schema; +}; + +class VRowDistribution { +public: + VRowDistribution() {} + virtual ~VRowDistribution() {} + + void init(VRowDistributionContext* ctx) { + _state = ctx->state; + _block_convertor = ctx->block_convertor; + _tablet_finder = ctx->tablet_finder; + _vpartition = ctx->vpartition; + _add_partition_request_timer = ctx->add_partition_request_timer; + _txn_id = ctx->txn_id; + _pool = ctx->pool; + _location = ctx->location; + _vec_output_expr_ctxs = ctx->vec_output_expr_ctxs; + _on_partitions_created = ctx->on_partitions_created; + _caller = ctx->caller; + _schema = ctx->schema; + } + + // auto partition + // mv where clause + // v1 needs index->node->row_ids - tabletids + // v2 needs index,tablet->rowids + Status generate_rows_distribution(vectorized::Block& input_block, + std::shared_ptr& block, + int64_t& filtered_rows, bool& has_filtered_rows, + std::vector& row_part_tablet_ids); + +private: + std::pair _get_partition_function(); + + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type); + + // create partitions when need for auto-partition table using #_partitions_need_create. + Status _automatic_create_partition(); + + void _get_tablet_ids(vectorized::Block* block, int32_t index_idx, + std::vector& tablet_ids); + + void _filter_block_by_skip(vectorized::Block* block, RowPartTabletIds& row_part_tablet_id); + + Status _filter_block_by_skip_and_where_clause(vectorized::Block* block, + const vectorized::VExprContextSPtr& where_clause, + RowPartTabletIds& row_part_tablet_id); + + Status _filter_block(vectorized::Block* block, + std::vector& row_part_tablet_ids); + + Status _generate_rows_distribution_for_auto_parititon( + vectorized::Block* block, int partition_col_idx, bool has_filtered_rows, + std::vector& row_part_tablet_ids); + + Status _generate_rows_distribution_for_non_auto_parititon( + vectorized::Block* block, bool has_filtered_rows, + std::vector& row_part_tablet_ids); + + void _reset_row_part_tablet_ids(std::vector& row_part_tablet_ids, + int64_t rows); + +private: + RuntimeState* _state = nullptr; + + // support only one partition column now + std::vector> _partitions_need_create; + + MonotonicStopWatch _row_distribution_watch; + OlapTableBlockConvertor* _block_convertor = nullptr; + OlapTabletFinder* _tablet_finder = nullptr; + VOlapTablePartitionParam* _vpartition = nullptr; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; + int64_t _txn_id = -1; + ObjectPool* _pool; + OlapTableLocationParam* _location = nullptr; + // std::function _on_partition_created; + // int64_t _number_output_rows = 0; + const VExprContextSPtrs* _vec_output_expr_ctxs; + OnPartitionsCreated _on_partitions_created = nullptr; + void* _caller; + std::shared_ptr _schema; + + // reuse for find_tablet. + std::vector _partitions; + std::vector _skip; + std::vector _tablet_indexes; + std::vector _tablet_ids; + std::vector _missing_map; // indice of missing values in partition_col +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 696a66ac3fbad4..69a372e0e3886f 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -77,6 +77,42 @@ VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_de VOlapTableSinkV2::~VOlapTableSinkV2() = default; +Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) { + // add new tablet locations. it will use by address. so add to pool + auto* new_locations = _pool->add(new std::vector(result->tablets)); + _location->add_locations(*new_locations); + + // update new node info + _nodes_info->add_nodes(result->nodes); + + // incremental open stream + + return Status::OK(); +} + +static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { + return static_cast(writer)->on_partitions_created(result); +} + +void VOlapTableSinkV2::_init_row_distribution() { + VRowDistributionContext ctx; + + ctx.state = _state; + ctx.block_convertor = _block_convertor.get(); + ctx.tablet_finder = _tablet_finder.get(); + ctx.vpartition = _vpartition; + ctx.add_partition_request_timer = _add_partition_request_timer; + ctx.txn_id = _txn_id; + ctx.pool = _pool; + ctx.location = _location; + ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs; + ctx.on_partitions_created = &vectorized::on_partitions_created; + ctx.caller = (void*)this; + ctx.schema = _schema; + + _row_distribution.init(&ctx); +} + Status VOlapTableSinkV2::init(const TDataSink& t_sink) { DCHECK(t_sink.__isset.olap_table_sink); auto& table_sink = t_sink.olap_table_sink; @@ -104,7 +140,9 @@ Status VOlapTableSinkV2::init(const TDataSink& t_sink) { } _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); _tablet_finder = std::make_unique(_vpartition, find_tablet_mode); - return _vpartition->init(); + RETURN_IF_ERROR(_vpartition->init()); + + return Status::OK(); } Status VOlapTableSinkV2::prepare(RuntimeState* state) { @@ -168,6 +206,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) { _build_tablet_node_mapping(); RETURN_IF_ERROR(_open_streams(state->backend_id())); + _init_row_distribution(); return Status::OK(); } @@ -222,30 +261,25 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() { } } -void VOlapTableSinkV2::_generate_rows_for_tablet( - RowsForTablet& rows_for_tablet, const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt) { - for (int row_idx = 0; row_idx < row_cnt; row_idx++) { - if (skip[row_idx]) { - continue; - } - - auto& partition = partitions[row_idx]; - auto& tablet_index = tablet_indexes[row_idx]; +void VOlapTableSinkV2::_generate_rows_for_tablet(std::vector& row_part_tablet_ids, + RowsForTablet& rows_for_tablet) { + for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); index_idx++) { + auto& row_ids = row_part_tablet_ids[index_idx].row_ids; + auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids; + auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids; - for (const auto& index : partition->indexes) { - auto tablet_id = index.tablets[tablet_index]; + for (int i = 0; i < row_ids.size(); i++) { + auto& tablet_id = tablet_ids[i]; auto it = rows_for_tablet.find(tablet_id); if (it == rows_for_tablet.end()) { Rows rows; - rows.partition_id = partition->id; - rows.index_id = index.index_id; - rows.row_idxes.reserve(row_cnt); + rows.partition_id = partition_ids[i]; + rows.index_id = _schema->indexes()[index_idx]->index_id; + rows.row_idxes.reserve(row_ids.size()); auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows}); it = tmp_it; } - it->second.row_idxes.push_back(row_idx); + it->second.row_idxes.push_back(row_ids[i]); _number_output_rows++; } } @@ -285,37 +319,18 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc DorisMetrics::instance()->load_rows->increment(input_rows); DorisMetrics::instance()->load_bytes->increment(input_bytes); - std::shared_ptr block; bool has_filtered_rows = false; - RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( - state, input_block, block, _output_vexpr_ctxs, input_rows, has_filtered_rows)); - - // clear and release the references of columns - input_block->clear(); + int64_t filtered_rows = 0; SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. - bool stop_processing = false; - RowsForTablet rows_for_tablet; - _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); - const auto num_rows = input_rows; - const auto* __restrict filter_map = _block_convertor->filter_map(); - //reuse vars - _partitions.assign(num_rows, nullptr); - _skip.assign(num_rows, false); - _tablet_indexes.assign(num_rows, 0); - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - _skip[i] = _skip[i] || filter_map[i]; - } - } - _generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes, _skip, num_rows); + std::shared_ptr block; + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + *input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + RowsForTablet rows_for_tablet; + _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet); _row_distribution_watch.stop(); diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 63f0985eb090ce..a67c4e65cd2cae 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -64,6 +64,7 @@ #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/vrow_distribution.h" namespace doris { class DeltaWriterV2; @@ -112,17 +113,20 @@ class VOlapTableSinkV2 final : public DataSink { Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status close_status) override; + Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; + Status on_partitions_created(TCreatePartitionResult* result); + private: + void _init_row_distribution(); + Status _open_streams(int64_t src_id); void _build_tablet_node_mapping(); - void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet, - const std::vector& partitions, - const std::vector& tablet_indexes, - const std::vector& skip, size_t row_cnt); + void _generate_rows_for_tablet(std::vector& row_part_tablet_ids, + RowsForTablet& rows_for_tablet); Status _write_memtable(std::shared_ptr block, int64_t tablet_id, const Rows& rows, const Streams& streams); @@ -168,11 +172,6 @@ class VOlapTableSinkV2 final : public DataSink { int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; - // reuse for find_tablet - std::vector _partitions; - std::vector _skip; - std::vector _tablet_indexes; - MonotonicStopWatch _row_distribution_watch; RuntimeProfile::Counter* _input_rows_counter = nullptr; @@ -187,6 +186,7 @@ class VOlapTableSinkV2 final : public DataSink { RuntimeProfile::Counter* _close_timer = nullptr; RuntimeProfile::Counter* _close_writer_timer = nullptr; RuntimeProfile::Counter* _close_load_timer = nullptr; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; // Save the status of close() method Status _close_status; @@ -204,6 +204,10 @@ class VOlapTableSinkV2 final : public DataSink { std::unordered_map> _streams_for_node; size_t _stream_index = 0; std::shared_ptr _delta_writer_for_tablet; + + VRowDistribution _row_distribution; + // reuse to avoid frequent memory allocation and release. + std::vector _row_part_tablet_ids; }; } // namespace vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index d0720255695b3f..786c0e21105ac9 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -152,6 +152,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vectorprepare(state, *_parent->_output_row_desc)); RETURN_IF_ERROR(_where_clause->open(state)); } + return Status::OK(); } @@ -488,52 +489,6 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); } - std::unique_ptr temp_payload = nullptr; - if (_index_channel != nullptr && _index_channel->get_where_clause() != nullptr) { - SCOPED_RAW_TIMER(&_stat.where_clause_ns); - temp_payload.reset(new Payload( - std::unique_ptr(new vectorized::IColumn::Selector()), - std::vector())); - int result_index = -1; - size_t column_number = block->columns(); - RETURN_IF_ERROR(_index_channel->get_where_clause()->execute(block, &result_index)); - - auto& row_ids = *payload->first; - auto& tablets_ids = payload->second; - - auto filter_column = block->get_by_position(result_index).column; - - if (auto* nullable_column = - vectorized::check_and_get_column(*filter_column)) { - for (size_t i = 0; i < payload->second.size(); i++) { - if (nullable_column->get_bool_inline(row_ids[i])) { - temp_payload->first->emplace_back(row_ids[i]); - temp_payload->second.emplace_back(tablets_ids[i]); - } - } - payload = temp_payload.get(); - } else if (auto* const_column = vectorized::check_and_get_column( - *filter_column)) { - bool ret = const_column->get_bool(0); - if (!ret) { - return Status::OK(); - } - } else { - auto& filter = assert_cast(*filter_column).get_data(); - for (size_t i = 0; i < payload->second.size(); i++) { - if (filter[row_ids[i]] != 0) { - temp_payload->first->emplace_back(row_ids[i]); - temp_payload->second.emplace_back(tablets_ids[i]); - } - } - payload = temp_payload.get(); - } - - for (size_t i = block->columns() - 1; i >= column_number; i--) { - block->erase(i); - } - } - SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); if (is_append) { // Do not split the data of the block by tablets but append it to a single delta writer. @@ -1095,6 +1050,43 @@ Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* pr return Status::OK(); } +Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) { + // add new tablet locations. it will use by address. so add to pool + auto* new_locations = _pool->add(new std::vector(result->tablets)); + _location->add_locations(*new_locations); + + // update new node info + _nodes_info->add_nodes(result->nodes); + + // incremental open node channel + RETURN_IF_ERROR(_incremental_open_node_channel(result->partitions)); + + return Status::OK(); +} + +static Status on_partitions_created(void* writer, TCreatePartitionResult* result) { + return static_cast(writer)->on_partitions_created(result); +} + +void VTabletWriter::_init_row_distribution() { + VRowDistributionContext ctx; + + ctx.state = _state; + ctx.block_convertor = _block_convertor.get(); + ctx.tablet_finder = _tablet_finder.get(); + ctx.vpartition = _vpartition; + ctx.add_partition_request_timer = _add_partition_request_timer; + ctx.txn_id = _txn_id; + ctx.pool = _pool; + ctx.location = _location; + ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs; + ctx.on_partitions_created = &vectorized::on_partitions_created; + ctx.caller = (void*)this; + ctx.schema = _schema; + + _row_distribution.init(&ctx); +} + Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { DCHECK(_t_sink.__isset.olap_table_sink); auto& table_sink = _t_sink.olap_table_sink; @@ -1243,49 +1235,12 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); } + _init_row_distribution(); + _inited = true; return Status::OK(); } -Status VTabletWriter::_automatic_create_partition() { - SCOPED_TIMER(_add_partition_request_timer); - TCreatePartitionRequest request; - TCreatePartitionResult result; - request.__set_txn_id(_txn_id); - request.__set_db_id(_vpartition->db_id()); - request.__set_table_id(_vpartition->table_id()); - request.__set_partitionValues(_partitions_need_create); - - VLOG(1) << "automatic partition rpc begin request " << request; - TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; - int time_out = _state->execution_timeout() * 1000; - RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result](FrontendServiceConnection& client) { - client->createPartition(result, request); - }, - time_out)); - - Status status(Status::create(result.status)); - VLOG(1) << "automatic partition rpc end response " << result; - if (result.status.status_code == TStatusCode::OK) { - // add new created partitions - RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); - - // add new tablet locations. it will use by address. so add to pool - auto* new_locations = _pool->add(new std::vector(result.tablets)); - _location->add_locations(*new_locations); - - // update new node info - _nodes_info->add_nodes(result.nodes); - - // incremental open node channel - RETURN_IF_ERROR(_incremental_open_node_channel(result.partitions)); - } - - return status; -} - Status VTabletWriter::_incremental_open_node_channel( const std::vector& partitions) { // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. @@ -1337,129 +1292,11 @@ Status VTabletWriter::_incremental_open_node_channel( return Status::OK(); } -// Generate channel payload for sinking data to differenct node channel -// Payload = std::pair, std::vector>; -// first = row_id, second = vector -void VTabletWriter::_generate_row_distribution_payload( - ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, const std::vector& skip, - size_t row_cnt) { - for (int row_idx = 0; row_idx < row_cnt; row_idx++) { - if (skip[row_idx]) { - continue; - } - const auto& partition = partitions[row_idx]; - const auto& tablet_index = tablet_indexes[row_idx]; - - for (int index_num = 0; index_num < partition->indexes.size(); - ++index_num) { // partition->indexes = [index, tablets...] - - auto tablet_id = partition->indexes[index_num].tablets[tablet_index]; - auto it = _channels[index_num]->_channels_by_tablet.find( - tablet_id); // (tablet_id, VNodeChannel) where this tablet locate - - DCHECK(it != _channels[index_num]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - - std::vector>& tablet_locations = it->second; - std::unordered_map& payloads_this_index = - channel_to_payload[index_num]; // payloads of this index in every node - - for (const auto& locate_node : tablet_locations) { - auto payload_it = - payloads_this_index.find(locate_node.get()); // - if (payload_it == payloads_this_index.end()) { - auto [tmp_it, _] = payloads_this_index.emplace( - locate_node.get(), - Payload {std::make_unique(), - std::vector()}); - payload_it = tmp_it; - payload_it->second.first->reserve(row_cnt); - payload_it->second.second.reserve(row_cnt); - } - payload_it->second.first->push_back(row_idx); - payload_it->second.second.push_back(tablet_id); - } - _number_output_rows++; - } - } -} - -Status VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows) { - // only need to calculate one value for single partition. - std::vector partitions(1, nullptr); - std::vector skip(1, false); - std::vector tablet_indexes(1, 0); - bool stop_processing = false; - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, tablet_indexes, - stop_processing, skip)); - - const VOlapTablePartition* partition = nullptr; - uint32_t tablet_index = 0; - for (size_t i = 0; i < num_rows; i++) { - if (!skip[i]) { - partition = partitions[i]; - tablet_index = tablet_indexes[i]; - break; - } - } - if (partition == nullptr) { - return Status::OK(); - } - - for (int j = 0; j < partition->indexes.size(); ++j) { - auto tid = partition->indexes[j].tablets[tablet_index]; - auto it = _channels[j]->_channels_by_tablet.find(tid); - DCHECK(it != _channels[j]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - int64_t row_cnt = 0; - for (const auto& channel : it->second) { - if (!channel_to_payload[j].contains(channel.get())) { - channel_to_payload[j].insert( - {channel.get(), Payload {std::make_unique(), - std::vector()}}); - } - auto& selector = channel_to_payload[j][channel.get()].first; - auto& tablet_ids = channel_to_payload[j][channel.get()].second; - for (int32_t i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) { - continue; - } - selector->push_back(i); - } - tablet_ids.resize(selector->size(), tid); - row_cnt = selector->size(); - } - _number_output_rows += row_cnt; - } - return Status::OK(); -} - std::pair VTabletWriter::_get_partition_function() { return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; } -void VTabletWriter::_save_missing_values(vectorized::ColumnPtr col, - vectorized::DataTypePtr value_type, - std::vector filter) { - _partitions_need_create.clear(); - std::set deduper; - // de-duplication - for (auto row : filter) { - deduper.emplace(value_type->to_string(*col, row)); - } - for (auto& value : deduper) { - TStringLiteral node; - node.value = value; - _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now - } -} - Status VTabletWriter::_cancel_channel_and_check_intolerable_failure( Status status, const std::string& err_msg, const std::shared_ptr ich, const std::shared_ptr nch) { @@ -1705,6 +1542,46 @@ Status VTabletWriter::close(Status exec_status) { return _close_status; } +void VTabletWriter::_generate_one_index_channel_payload( + RowPartTabletIds& row_part_tablet_id, int32_t index_idx, + ChannelDistributionPayload& channel_payload) { + auto& row_ids = row_part_tablet_id.row_ids; + auto& tablet_ids = row_part_tablet_id.tablet_ids; + + size_t row_cnt = row_ids.size(); + + for (int i = 0; i < row_ids.size(); i++) { + // (tablet_id, VNodeChannel) where this tablet locate + auto it = _channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]); + DCHECK(it != _channels[index_idx]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_ids[i]; + + std::vector>& tablet_locations = it->second; + for (const auto& locate_node : tablet_locations) { + auto payload_it = channel_payload.find(locate_node.get()); // + if (payload_it == channel_payload.end()) { + auto [tmp_it, _] = channel_payload.emplace( + locate_node.get(), + Payload {std::make_unique(), + std::vector()}); + payload_it = tmp_it; + payload_it->second.first->reserve(row_cnt); + payload_it->second.second.reserve(row_cnt); + } + payload_it->second.first->push_back(row_ids[i]); + payload_it->second.second.push_back(tablet_ids[i]); + } + } +} + +void VTabletWriter::_generate_index_channels_payloads( + std::vector& row_part_tablet_ids, + ChannelDistributionPayloadVec& payload) { + for (int i = 0; i < _schema->indexes().size(); i++) { + _generate_one_index_channel_payload(row_part_tablet_ids[i], i, payload[i]); + } +} + Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -1719,6 +1596,19 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { return status; } SCOPED_TIMER(_profile->total_time_counter()); + + std::shared_ptr block; + bool has_filtered_rows = false; + int64_t filtered_rows = 0; + + RETURN_IF_ERROR(_row_distribution.generate_rows_distribution( + input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids)); + + ChannelDistributionPayloadVec channel_to_payload; + + channel_to_payload.resize(_channels.size()); + _generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload); + _number_input_rows += rows; // update incrementally so that FE can get the progress. // the real 'num_rows_load_total' will be set when sink being closed. @@ -1727,112 +1617,6 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { DorisMetrics::instance()->load_rows->increment(rows); DorisMetrics::instance()->load_bytes->increment(bytes); - std::shared_ptr block; - bool has_filtered_rows = false; - int64_t filtered_rows = - _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); - RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( - _state, &input_block, block, _vec_output_expr_ctxs, rows, has_filtered_rows)); - - SCOPED_RAW_TIMER(&_send_data_ns); - // This is just for passing compilation. - bool stop_processing = false; - ChannelDistributionPayload channel_to_payload; - channel_to_payload.resize(_channels.size()); - _tablet_finder->clear_for_new_batch(); - _row_distribution_watch.start(); - auto num_rows = block->rows(); - _tablet_finder->filter_bitmap().Reset(num_rows); - size_t partition_num = _vpartition->get_partitions().size(); - if (!_vpartition->is_auto_partition() && partition_num == 1 && - _tablet_finder->is_find_tablet_every_sink()) { - RETURN_IF_ERROR(_single_partition_generate(_state, block.get(), channel_to_payload, - num_rows, has_filtered_rows)); - } else { - // if there's projection of partition calc, we need to calc it first. - auto [part_ctx, part_func] = _get_partition_function(); - int result_idx = -1; - if (_vpartition->is_projection_partition()) { - // calc the start value of missing partition ranges. - RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), &result_idx)); - VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); - // change the column to compare to transformed. - _vpartition->set_transformed_slots({(uint16_t)result_idx}); - } - - if (_vpartition->is_auto_partition()) { - std::vector partition_keys = _vpartition->get_partition_keys(); - //TODO: use loop to create missing_vals for multi column. - CHECK(partition_keys.size() == 1) - << "now support only 1 partition column for auto partitions."; - auto partition_col = block->get_by_position(partition_keys[0]); - - std::vector missing_map; // indice of missing values in partition_col - missing_map.reserve(partition_col.column->size()); - - // try to find tablet and save missing value - std::vector partitions(num_rows, nullptr); - std::vector skip(num_rows, false); - std::vector tablet_indexes(num_rows, 0); - - //TODO: we could use the buffer to save tablets we found so that no need to find them again when we created partitions and try to append block next time. - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip, - &missing_map)); - - if (missing_map.empty()) { - // we don't calculate it distribution when have missing values - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, - skip, num_rows); - } else { // for missing partition keys, calc the missing partition and save in _partitions_need_create - auto return_type = part_func->data_type(); - - // expose the data column - vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; - if (const auto* nullable = - check_and_get_column(*range_left_col)) { - range_left_col = nullable->get_nested_column_ptr(); - return_type = - assert_cast(return_type.get()) - ->get_nested_type(); - } - // calc the end value and save them. - _save_missing_values(range_left_col, return_type, missing_map); - // then call FE to create it. then FragmentExecutor will redo the load. - RETURN_IF_ERROR(_automatic_create_partition()); - // now we need to rollback the metrics - _number_input_rows -= rows; - _state->update_num_rows_load_total(-rows); - _state->update_num_bytes_load_total(-bytes); - DorisMetrics::instance()->load_rows->increment(-rows); - DorisMetrics::instance()->load_bytes->increment(-bytes); - // In the next round, we will _generate_row_distribution_payload again to get right payload of new tablet - LOG(INFO) << "Auto created partition. Send block again."; - return Status::NeedSendAgain(""); - } // creating done - } else { // not auto partition - std::vector partitions(num_rows, nullptr); - std::vector skip(num_rows, false); - std::vector tablet_indexes(num_rows, 0); - - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), num_rows, partitions, - tablet_indexes, stop_processing, skip)); - - if (has_filtered_rows) { - for (int i = 0; i < num_rows; i++) { - skip[i] = skip[i] || _block_convertor->filter_map()[i]; - } - } - _generate_row_distribution_payload(channel_to_payload, partitions, tablet_indexes, skip, - num_rows); - } - } - _row_distribution_watch.stop(); // Random distribution and the block belongs to a single tablet, we could optimize to append the whole // block into node channel. bool load_block_to_single_tablet = @@ -1855,10 +1639,8 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { } if (_group_commit) { - _group_commit_block(&input_block, num_rows, - _block_convertor->num_filtered_rows() + - _tablet_finder->num_filtered_rows() - filtered_rows, - _state, block.get(), _block_convertor.get(), _tablet_finder.get()); + _group_commit_block(&input_block, block->rows(), filtered_rows, _state, block.get(), + _block_convertor.get(), _tablet_finder.get()); } // TODO: Before load, we need to projection unuseful column // auto slots = _schema->tuple_desc()->slots(); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index c8d5d1c2ce9ae9..7fee45be371080 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -74,6 +74,7 @@ #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/runtime/vfile_format_transformer.h" +#include "vec/sink/vrow_distribution.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" #include "vec/sink/writer/async_result_writer.h" @@ -204,9 +205,6 @@ class ReusableClosure final : public google::protobuf::Closure { class IndexChannel; class VTabletWriter; -// pair -using Payload = std::pair, std::vector>; - class VNodeChannelStat { public: VNodeChannelStat& operator+=(const VNodeChannelStat& stat) { @@ -221,6 +219,9 @@ class VNodeChannelStat { int64_t append_node_channel_ns = 0; }; +// pair +using Payload = std::pair, std::vector>; + // every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures. class VNodeChannel { public: @@ -485,6 +486,7 @@ class IndexChannel { private: friend class VNodeChannel; friend class VTabletWriter; + friend class VRowDistribution; VTabletWriter* _parent; int64_t _index_id; @@ -546,32 +548,34 @@ class VTabletWriter final : public AsyncResultWriter { bool is_close_done(); + Status on_partitions_created(TCreatePartitionResult* result); + private: friend class VNodeChannel; friend class IndexChannel; - using ChannelDistributionPayload = std::vector>; + using ChannelDistributionPayload = std::unordered_map; + using ChannelDistributionPayloadVec = std::vector>; + + void _init_row_distribution(); Status _init(RuntimeState* state, RuntimeProfile* profile); - // payload for every row - void _generate_row_distribution_payload(ChannelDistributionPayload& channel_to_payload, - const std::vector& partitions, - const std::vector& tablet_indexes, - const std::vector& skip, size_t row_cnt); + void _generate_one_index_channel_payload(RowPartTabletIds& row_part_tablet_tuple, + int32_t index_idx, + ChannelDistributionPayload& channel_payload); - Status _single_partition_generate(RuntimeState* state, vectorized::Block* block, - ChannelDistributionPayload& channel_to_payload, - size_t num_rows, bool has_filtered_rows); + void _generate_index_channels_payloads(std::vector& row_part_tablet_ids, + ChannelDistributionPayloadVec& payload); Status _cancel_channel_and_check_intolerable_failure(Status status, const std::string& err_msg, const std::shared_ptr ich, const std::shared_ptr nch); - void _cancel_all_channel(Status status); - std::pair _get_partition_function(); + void _cancel_all_channel(Status status); + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, std::vector filter); @@ -689,6 +693,11 @@ class VTabletWriter final : public AsyncResultWriter { RuntimeProfile* _profile = nullptr; // not owned, set when open bool _group_commit = false; std::shared_ptr _wal_writer = nullptr; + + VRowDistribution _row_distribution; + // reuse to avoid frequent memory allocation and release. + std::vector _row_part_tablet_ids; + int64_t _tb_id; int64_t _db_id; int64_t _wal_id;