Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick](branch-21) fix exchange of tablet shuffle send block error (#44102) #44230

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 21 additions & 30 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
Expand Down Expand Up @@ -318,23 +319,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

Status ExchangeSinkLocalState::_send_new_partition_batch() {
if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
vectorized::Block tmp_block =
_row_distribution._batching_block->to_block(); // Borrow out, for lval ref
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
RETURN_IF_ERROR(p.sink(_state, &tmp_block, false));
// Recovery back
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
}
Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) {
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// Recovery back
_row_distribution.clear_batching_stats();
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
RETURN_IF_ERROR(p.sink(_state, input_block, false));
return Status::OK();
}

Expand Down Expand Up @@ -551,7 +543,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
// check out of limit
RETURN_IF_ERROR(local_state._send_new_partition_batch());
std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>();
const auto& num_channels = local_state._partition_count;
std::vector<std::vector<uint32>> channel2rows;
Expand All @@ -566,21 +557,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
*block, convert_block, filtered_rows, has_filtered_rows,
local_state._row_part_tablet_ids, local_state._number_input_rows));

const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0);
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
if (local_state._row_distribution.batching_rows() > 0) {
SCOPED_TIMER(local_state._send_new_partition_timer);
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
} else {
const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0);
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
}
}
}

if (eos) {
local_state._row_distribution._deal_batched = true;
RETURN_IF_ERROR(local_state._send_new_partition_batch());
}
// the convert_block maybe different with block after execute exprs
// when send data we still use block
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels,
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
static Status empty_callback_function(void* sender, TCreatePartitionResult* result) {
return Status::OK();
}
Status _send_new_partition_batch();
Status _send_new_partition_batch(vectorized::Block* input_block);
std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
channel_shared_ptrs;
Expand Down Expand Up @@ -179,6 +179,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _send_new_partition_timer = nullptr;

RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class VRowDistribution {
std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
size_t batching_rows() const { return _batching_rows; }
// create partitions when need for auto-partition table using #_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();
Expand Down
Loading