Skip to content

Commit

Permalink
removed deprecated code
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Dec 5, 2024
1 parent 0d273b7 commit 6bed85d
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 22 deletions.
10 changes: 1 addition & 9 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
namespace doris::pipeline {
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state) {
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", true);
}
: Base(parent, state) {}

Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo& info) {
Expand Down Expand Up @@ -69,7 +65,6 @@ Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
"AggSinkSpillDependency", true);
state->get_task()->add_spill_dependency(_spill_dependency.get());

_finish_dependency->block();
return Status::OK();
}

Expand Down Expand Up @@ -196,11 +191,9 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
RETURN_IF_ERROR(partition->finish_current_spilling(eos));
}
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else if (local_state._shared_state->is_spilled) {
if (revocable_mem_size(state) >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
Expand Down Expand Up @@ -331,7 +324,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(

if (_eos) {
Base::_dependency->set_ready_to_read();
_finish_dependency->set_ready();
}
state->get_query_ctx()->decrease_revoking_tasks_count();
}};
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class PartitionedAggSinkLocalState
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }

Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context);

Expand Down Expand Up @@ -271,8 +270,6 @@ class PartitionedAggSinkLocalState

std::unique_ptr<RuntimeState> _runtime_state;

std::shared_ptr<Dependency> _finish_dependency;

// temp structures during spilling
vectorized::MutableColumns key_columns_;
vectorized::MutableColumns value_columns_;
Expand Down
9 changes: 1 addition & 8 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
namespace doris::pipeline {

SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY");
}
: Base(parent, state) {}

Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo& info) {
Expand All @@ -46,7 +43,6 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
RETURN_IF_ERROR(setup_in_memory_sort_op(state));

Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill();
_finish_dependency->block();
return Status::OK();
}

Expand Down Expand Up @@ -181,13 +177,11 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc
RETURN_IF_ERROR(revoke_memory(state, nullptr));
} else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
} else {
RETURN_IF_ERROR(
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
}
return Status::OK();
Expand Down Expand Up @@ -251,7 +245,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
state->get_query_ctx()->decrease_revoking_tasks_count();
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
}
}};

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }

Status setup_in_memory_sort_op(RuntimeState* state);
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
Expand All @@ -59,7 +58,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha
RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;

vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
};

class SpillSortSinkOperatorX final : public DataSinkOperatorX<SpillSortSinkLocalState> {
Expand Down

0 comments on commit 6bed85d

Please sign in to comment.