From 6f359031a9372b44268a47da6fa71ac3bffef478 Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Mon, 18 Nov 2024 15:47:25 +0800 Subject: [PATCH] [Bug](exchange) fix exchange of tablet shuffle send block error (#44102) Problem Summary: the _batching_block is same as block which is used for output. and maybe different with input block in some column about nullable type. so if sink this will cause nullable type not equal as origin. ``` Status VRowDistribution::generate_rows_distribution( vectorized::Block& input_block, std::shared_ptr& block, int64_t& filtered_rows, bool& has_filtered_rows, std::vector& row_part_tablet_ids, int64_t& rows_stat_val) ....... // batching block rows which need new partitions. deal together at finish. if (!_batching_block) [[unlikely]] { std::unique_ptr tmp_block = block->create_same_struct_block(0); _batching_block = MutableBlock::create_unique(std::move(*tmp_block)); } ``` ``` void OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::vectorized::Block* block) { for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) { SlotDescriptor* desc = _output_tuple_desc->slots()[i]; if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) { if (desc->is_nullable()) { block->get_by_position(i).type = vectorized::make_nullable(block->get_by_position(i).type); block->get_by_position(i).column = vectorized::make_nullable(block->get_by_position(i).column); } else { block->get_by_position(i).type = assert_cast( *block->get_by_position(i).type) .get_nested_type(); block->get_by_position(i).column = assert_cast( *block->get_by_position(i).column) .get_nested_column_ptr(); } } } } ``` --- .../pipeline/exec/exchange_sink_operator.cpp | 51 ++++++++----------- be/src/pipeline/exec/exchange_sink_operator.h | 3 +- be/src/vec/sink/vrow_distribution.h | 1 + 3 files changed, 24 insertions(+), 31 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index b26c69ad560fa1..ff8bcdd9236236 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -119,6 +119,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime"); _split_block_distribute_by_channel_timer = ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime"); + _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime"); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( @@ -318,23 +319,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status ExchangeSinkLocalState::_send_new_partition_batch() { - if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time - RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); - vectorized::Block tmp_block = - _row_distribution._batching_block->to_block(); // Borrow out, for lval ref - auto& p = _parent->cast(); - // these order is unique. - // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. - // 2. deal batched block - // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. - _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(p.sink(_state, &tmp_block, false)); - // Recovery back - _row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns()); - _row_distribution._batching_block->clear_column_data(); - _row_distribution._deal_batched = false; - } +Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) { + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + auto& p = _parent->cast(); + // Recovery back + _row_distribution.clear_batching_stats(); + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + RETURN_IF_ERROR(p.sink(_state, input_block, false)); return Status::OK(); } @@ -551,7 +543,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { // check out of limit - RETURN_IF_ERROR(local_state._send_new_partition_batch()); std::shared_ptr convert_block = std::make_shared(); const auto& num_channels = local_state._partition_count; std::vector> channel2rows; @@ -566,21 +557,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( *block, convert_block, filtered_rows, has_filtered_rows, local_state._row_part_tablet_ids, local_state._number_input_rows)); - - const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; - const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; - for (int idx = 0; idx < row_ids.size(); ++idx) { - const auto& row = row_ids[idx]; - const auto& tablet_id_hash = - HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); - channel2rows[tablet_id_hash % num_channels].emplace_back(row); + if (local_state._row_distribution.batching_rows() > 0) { + SCOPED_TIMER(local_state._send_new_partition_timer); + RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); + } else { + const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); + channel2rows[tablet_id_hash % num_channels].emplace_back(row); + } } } - if (eos) { - local_state._row_distribution._deal_batched = true; - RETURN_IF_ERROR(local_state._send_new_partition_batch()); - } // the convert_block maybe different with block after execute exprs // when send data we still use block RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 8d9382dadd0e5e..c055b131d8a8ca 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -144,7 +144,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { return Status::OK(); } - Status _send_new_partition_batch(); + Status _send_new_partition_batch(vectorized::Block* input_block); std::vector*> channels; std::vector>> channel_shared_ptrs; @@ -179,6 +179,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // Used to counter send bytes under local data exchange RuntimeProfile::Counter* _local_bytes_send_counter = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; + RuntimeProfile::Counter* _send_new_partition_timer = nullptr; RuntimeProfile::Counter* _wait_queue_timer = nullptr; RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr; diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 248982c02026dc..9e4cce6b528e17 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -131,6 +131,7 @@ class VRowDistribution { std::vector& row_part_tablet_ids, int64_t& rows_stat_val); bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } + size_t batching_rows() const { return _batching_rows; } // create partitions when need for auto-partition table using #_partitions_need_create. Status automatic_create_partition(); void clear_batching_stats();