Skip to content

Commit

Permalink
[Bug](exchange) fix exchange of tablet shuffle send block error (#44102)
Browse files Browse the repository at this point in the history
Problem Summary:
the _batching_block is same as block which is used for output.
and maybe different with input block in some column about nullable type.
so if sink this will cause nullable type not equal as origin.
```
Status VRowDistribution::generate_rows_distribution(
        vectorized::Block& input_block, std::shared_ptr<vectorized::Block>& block,
        int64_t& filtered_rows, bool& has_filtered_rows,
        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& rows_stat_val)
    .......
    // batching block rows which need new partitions. deal together at finish.
    if (!_batching_block) [[unlikely]] {
        std::unique_ptr<Block> tmp_block = block->create_same_struct_block(0);
        _batching_block = MutableBlock::create_unique(std::move(*tmp_block));
    }
```

```
void OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::vectorized::Block* block) {
    for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) {
        SlotDescriptor* desc = _output_tuple_desc->slots()[i];
        if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) {
            if (desc->is_nullable()) {
                block->get_by_position(i).type =
                        vectorized::make_nullable(block->get_by_position(i).type);
                block->get_by_position(i).column =
                        vectorized::make_nullable(block->get_by_position(i).column);
            } else {
                block->get_by_position(i).type = assert_cast<const vectorized::DataTypeNullable&>(
                                                         *block->get_by_position(i).type)
                                                         .get_nested_type();
                block->get_by_position(i).column = assert_cast<const vectorized::ColumnNullable&>(
                                                           *block->get_by_position(i).column)
                                                           .get_nested_column_ptr();
            }
        }
    }
}
```
  • Loading branch information
zhangstar333 committed Nov 19, 2024
1 parent 05e1620 commit 6f35903
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 31 deletions.
51 changes: 21 additions & 30 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
Expand Down Expand Up @@ -318,23 +319,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

Status ExchangeSinkLocalState::_send_new_partition_batch() {
if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
vectorized::Block tmp_block =
_row_distribution._batching_block->to_block(); // Borrow out, for lval ref
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
RETURN_IF_ERROR(p.sink(_state, &tmp_block, false));
// Recovery back
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
}
Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) {
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// Recovery back
_row_distribution.clear_batching_stats();
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
RETURN_IF_ERROR(p.sink(_state, input_block, false));
return Status::OK();
}

Expand Down Expand Up @@ -551,7 +543,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
// check out of limit
RETURN_IF_ERROR(local_state._send_new_partition_batch());
std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>();
const auto& num_channels = local_state._partition_count;
std::vector<std::vector<uint32>> channel2rows;
Expand All @@ -566,21 +557,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
*block, convert_block, filtered_rows, has_filtered_rows,
local_state._row_part_tablet_ids, local_state._number_input_rows));

const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0);
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
if (local_state._row_distribution.batching_rows() > 0) {
SCOPED_TIMER(local_state._send_new_partition_timer);
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
} else {
const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0);
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
}
}
}

if (eos) {
local_state._row_distribution._deal_batched = true;
RETURN_IF_ERROR(local_state._send_new_partition_batch());
}
// the convert_block maybe different with block after execute exprs
// when send data we still use block
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels,
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
static Status empty_callback_function(void* sender, TCreatePartitionResult* result) {
return Status::OK();
}
Status _send_new_partition_batch();
Status _send_new_partition_batch(vectorized::Block* input_block);
std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
channel_shared_ptrs;
Expand Down Expand Up @@ -179,6 +179,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _send_new_partition_timer = nullptr;

RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class VRowDistribution {
std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
size_t batching_rows() const { return _batching_rows; }
// create partitions when need for auto-partition table using #_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();
Expand Down

0 comments on commit 6f35903

Please sign in to comment.