Skip to content

Commit

Permalink
[pipelineX](fix) Fix core dump if cancelled (apache#29138)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Dec 28, 2023
1 parent 1aa9ac4 commit 8b225c6
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 41 deletions.
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
for (auto* agg_function : _agg_functions) {
agg_function->close(state);
}

static_cast<void>(_destroy_agg_status());
_agg_arena_pool = nullptr;
Expand Down
32 changes: 11 additions & 21 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ struct BasicSharedState {
Dependency* source_dep = nullptr;
Dependency* sink_dep = nullptr;

std::atomic<int> ref_count = 0;

void ref() { ref_count++; }
virtual Status close(RuntimeState* state) { return Status::OK(); }
virtual ~BasicSharedState() = default;
};
Expand Down Expand Up @@ -296,24 +293,17 @@ struct AggSharedState : public BasicSharedState {
agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
agg_arena_pool = std::make_unique<vectorized::Arena>();
}
~AggSharedState() override = default;
~AggSharedState() override {
if (probe_expr_ctxs.empty()) {
_close_without_key();
} else {
_close_with_serialized_key();
}
}
void init_spill_partition_helper(size_t spill_partition_count_bits) {
spill_partition_helper =
std::make_unique<vectorized::SpillPartitionHelper>(spill_partition_count_bits);
}
Status close(RuntimeState* state) override {
if (ref_count.fetch_sub(1) == 1) {
for (auto* aggregate_evaluator : aggregate_evaluators) {
aggregate_evaluator->close(state);
}
if (probe_expr_ctxs.empty()) {
_close_without_key();
} else {
_close_with_serialized_key();
}
}
return Status::OK();
}

vectorized::AggregatedDataVariantsUPtr agg_data = nullptr;
std::unique_ptr<vectorized::AggregateDataContainer> aggregate_data_container;
Expand Down Expand Up @@ -620,25 +610,25 @@ struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
std::unique_ptr<Exchanger> exchanger {};
std::vector<Dependency*> source_dependencies;
std::vector<DependencySPtr> source_dependencies;
Dependency* sink_dependency;
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
void sub_running_sink_operators();
void _set_ready_for_read() {
for (auto* dep : source_dependencies) {
for (auto& dep : source_dependencies) {
DCHECK(dep);
dep->set_ready();
}
}

void set_dep_by_channel_id(Dependency* dep, int channel_id) {
void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
source_dependencies[channel_id] = dep;
}

void set_ready_to_read(int channel_id) {
auto* dep = source_dependencies[channel_id];
auto& dep = source_dependencies[channel_id];
DCHECK(dep) << channel_id;
dep->set_ready();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_channel_id = info.task_idx;
_shared_state->set_dep_by_channel_id(_dependency, _channel_id);
_shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
_shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
Expand Down
6 changes: 0 additions & 6 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,13 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();

_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
} else if constexpr (!is_fake_shared) {
_dependency->set_shared_state(deps.front()->shared_state());
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();

_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
Expand Down Expand Up @@ -419,10 +417,6 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
}
if constexpr (!is_fake_shared) {
_shared_state->ref();
}

} else {
auto& deps = info.dependencys;
deps.front() = std::make_shared<FakeDependency>(0, 0, state->get_query_ctx());
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ void PipelineXTask::finalize() {
_finished = true;
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
DependencyMap {}.swap(_upstream_dependency);
std::map<int, DependencySPtr> {}.swap(_source_dependency);

_le_state_map.clear();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,6 @@ Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_b
}

void AggregationNode::release_resource(RuntimeState* state) {
for (auto* aggregate_evaluator : _aggregate_evaluators) {
aggregate_evaluator->close(state);
}
if (_executor.close) {
_executor.close();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/exec/vanalytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState* state) {
if (is_closed()) {
return;
}
for (auto* agg_function : _agg_functions) {
agg_function->close(state);
}

static_cast<void>(_destroy_agg_status());
_release_mem();
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exprs/vectorized_agg_fn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ Status AggFnEvaluator::open(RuntimeState* state) {
return VExpr::open(_input_exprs_ctxs, state);
}

void AggFnEvaluator::close(RuntimeState* state) {}

void AggFnEvaluator::create(AggregateDataPtr place) {
_function->create(place);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exprs/vectorized_agg_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ class AggFnEvaluator {

Status open(RuntimeState* state);

void close(RuntimeState* state);

// create/destroy AGG Data
void create(AggregateDataPtr place);
void destroy(AggregateDataPtr place);
Expand Down

0 comments on commit 8b225c6

Please sign in to comment.