Skip to content

Commit

Permalink
[minor](pipelineX) refine error message for broadcast shuffle buffer (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and seawinde committed Nov 6, 2023
1 parent 57975b4 commit 7f1ddf8
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,10 @@ Status ExchangeSinkLocalState::get_next_available_buffer(
return Status::OK();
}
}
return Status::InternalError("No broadcast buffer left!");
return Status::InternalError("No broadcast buffer left! Available blocks: " +
std::to_string(_broadcast_dependency->available_blocks()) +
" and number of buffer is " +
std::to_string(_broadcast_pb_blocks.size()));
}

template <typename Channels, typename HashValueType>
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ExchangeSinkQueueDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
ExchangeSinkQueueDependency(int id) : WriteDependency(id, "ResultQueueDependency") {}
~ExchangeSinkQueueDependency() = default;
~ExchangeSinkQueueDependency() override = default;

void* shared_state() override { return nullptr; }
};
Expand All @@ -77,7 +77,7 @@ class BroadcastDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(BroadcastDependency);
BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"), _available_block(0) {}
virtual ~BroadcastDependency() = default;
~BroadcastDependency() override = default;

[[nodiscard]] WriteDependency* write_blocked_by() override {
if (config::enable_fuzzy_mode && _available_block == 0 &&
Expand Down Expand Up @@ -107,6 +107,8 @@ class BroadcastDependency final : public WriteDependency {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!");
}

int available_blocks() const { return _available_block; }

private:
std::atomic<int> _available_block;
};
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,8 @@ class PipChannel final : public Channel<Parent> {
if (eos) {
if (_eos_send) {
return Status::OK();
} else {
_eos_send = true;
}
_eos_send = true;
}
if (eos || block->get_block()->column_metas_size()) {
RETURN_IF_ERROR(_buffer->add_block({this, block, eos}, sent));
Expand Down

0 comments on commit 7f1ddf8

Please sign in to comment.