Skip to content

Commit

Permalink
[refactor](pipelineX) Split init and open for local state (apache#24166)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Sep 11, 2023
1 parent be36183 commit 9b4338f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 27 deletions.
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ Status AggSinkLocalState<Derived>::init(RuntimeState* state, LocalSinkStateInfo&
(!p._have_conjuncts) && // no having conjunct
p._needs_finalize; // agg's finalize step
}

return Status::OK();
}

template <typename Derived>
Status AggSinkLocalState<Derived>::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState<AggDependency>::open(state));
_agg_data = _shared_state->agg_data.get();
// move _create_agg_status to open not in during prepare,
// because during prepare and open thread is not the same one,
// this could cause unable to get JVM
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggDependency> {
virtual ~AggSinkLocalState() = default;

virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
virtual Status open(RuntimeState* state) override;
virtual Status close(RuntimeState* state) override;

Status try_spill_disk(bool eos = false);
Expand Down
50 changes: 23 additions & 27 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,29 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const {
Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
_sender_id = info.sender_id;

_bytes_sent_counter = ADD_COUNTER(_profile, "BytesSent", TUnit::BYTES);
_uncompressed_bytes_counter = ADD_COUNTER(_profile, "UncompressedRowBatchSize", TUnit::BYTES);
_local_sent_rows = ADD_COUNTER(_profile, "LocalSentRows", TUnit::UNIT);
_serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
_compress_timer = ADD_TIMER(_profile, "CompressTime");
_brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime");
_brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
_local_send_timer = ADD_TIMER(_profile, "LocalSendTime");
_split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_blocks_sent_counter = ADD_COUNTER(_profile, "BlocksSent", TUnit::UNIT);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
_profile->total_time_counter()),
"");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage");
_peak_memory_usage_counter =
_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}

Expand Down Expand Up @@ -126,11 +149,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
}
}

std::vector<std::string> instances;
for (const auto& channel : channels) {
instances.emplace_back(channel->get_fragment_instance_id_str());
}
std::string title = "VDataStreamSender (dst_id={}, dst_fragments=[{}])";
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());

Expand Down Expand Up @@ -161,28 +179,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {

register_channels(_sink_buffer.get());

_bytes_sent_counter = ADD_COUNTER(_profile, "BytesSent", TUnit::BYTES);
_uncompressed_bytes_counter = ADD_COUNTER(_profile, "UncompressedRowBatchSize", TUnit::BYTES);
_local_sent_rows = ADD_COUNTER(_profile, "LocalSentRows", TUnit::UNIT);
_serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
_compress_timer = ADD_TIMER(_profile, "CompressTime");
_brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime");
_brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
_local_send_timer = ADD_TIMER(_profile, "LocalSendTime");
_split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_blocks_sent_counter = ADD_COUNTER(_profile, "BlocksSent", TUnit::UNIT);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
_profile->total_time_counter()),
"");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES);
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_peak_memory_usage_counter =
profile()->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}

Expand Down
7 changes: 7 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
p._runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
}

return Status::OK();
}

Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();

for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class HashJoinBuildSinkLocalState final
~HashJoinBuildSinkLocalState() = default;

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status process_build_block(RuntimeState* state, vectorized::Block& block, uint8_t offset);

void init_short_circuit_for_probe();
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ class PipelineXLocalStateBase {
return reinterpret_cast<const TARGET&>(*this);
}

// Do initialization. This step should be executed only once and in bthread, so we can do some
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) { return Status::OK(); }
virtual Status close(RuntimeState* state) = 0;

Expand Down Expand Up @@ -341,8 +345,12 @@ class PipelineXSinkLocalStateBase {
: _parent(parent_), _state(state_) {}
virtual ~PipelineXSinkLocalStateBase() {}

// Do initialization. This step should be executed only once and in bthread, so we can do some
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0;

// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) { return Status::OK(); }
virtual Status close(RuntimeState* state) = 0;

Expand Down

0 comments on commit 9b4338f

Please sign in to comment.