Skip to content

Commit

Permalink
[feature](move-memtable) support pipelineX in sink v2 (apache#27067)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Nov 16, 2023
1 parent 5498917 commit e29d8cb
Show file tree
Hide file tree
Showing 10 changed files with 948 additions and 677 deletions.
8 changes: 2 additions & 6 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
Expand Down Expand Up @@ -301,15 +299,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, &status));
sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, output_exprs, false));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc, output_exprs, false));
}
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
Expand Down
50 changes: 50 additions & 0 deletions be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap_table_sink_v2_operator.h"

#include "common/status.h"

namespace doris::pipeline {

OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
}

Status OlapTableSinkV2LocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
return Status::OK();
}

Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status exec_status) {
if (Base::_closed) {
return Status::OK();
}
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
if (_closed) {
return _close_status;
}
_close_status = Base::close(state, exec_status);
return _close_status;
}

} // namespace doris::pipeline
74 changes: 71 additions & 3 deletions be/src/pipeline/exec/olap_table_sink_v2_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/vtablet_sink_v2.h"

namespace doris {
Expand All @@ -41,9 +42,76 @@ class OlapTableSinkV2Operator final : public DataSinkOperator<OlapTableSinkV2Ope
bool can_write() override { return true; } // TODO: need use mem_limit
};

OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
}
class OlapTableSinkV2OperatorX;

class OlapTableSinkV2LocalState final
: public AsyncWriterSink<vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX> {
public:
using Base = AsyncWriterSink<vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>;
using Parent = OlapTableSinkV2OperatorX;
ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {};
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
return Base::open(state);
}

Status close(RuntimeState* state, Status exec_status) override;
friend class OlapTableSinkV2OperatorX;

private:
Status _close_status = Status::OK();
};

class OlapTableSinkV2OperatorX final : public DataSinkOperatorX<OlapTableSinkV2LocalState> {
public:
using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>;
OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr, bool group_commit)
: Base(operator_id, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_group_commit(group_commit),
_pool(pool) {};

Status init(const TDataSink& thrift_sink) override {
RETURN_IF_ERROR(Base::init(thrift_sink));
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
return Status::OK();
}

Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(Base::prepare(state));
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}

Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
return local_state.sink(state, in_block, source_state);
}

private:
friend class OlapTableSinkV2LocalState;
template <typename Writer, typename Parent>
friend class AsyncWriterSink;
const RowDescriptor& _row_desc;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
const std::vector<TExpr>& _t_output_expr;
const bool _group_commit;
ObjectPool* _pool;
};

} // namespace pipeline
} // namespace doris
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
Expand Down Expand Up @@ -544,6 +545,7 @@ DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(LocalExchangeSinkLocalState)
Expand Down Expand Up @@ -624,5 +626,6 @@ template class PipelineXLocalState<LocalExchangeDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>;

} // namespace doris::pipeline
30 changes: 27 additions & 3 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
Expand Down Expand Up @@ -268,9 +269,10 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
if (state->query_options().enable_memtable_on_sink_node) {
return Status::InternalError(
"Unsuported OLAP_TABLE_SINK with enable_memtable_on_sink_node ");
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
_sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), row_desc,
output_exprs, false));
} else {
_sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), row_desc, output_exprs,
false));
Expand Down Expand Up @@ -412,6 +414,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
_runtime_states[i]->set_total_load_streams(request.total_load_streams);
_runtime_states[i]->set_num_local_sink(request.num_local_sink);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = std::make_unique<PipelineXTask>(
Expand Down Expand Up @@ -1005,4 +1010,23 @@ Status PipelineXFragmentContext::send_report(bool done) {
std::placeholders::_2)},
shared_from_this());
}

bool PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) {
OlapTableSchemaParam schema;
if (!schema.init(sink.schema).ok()) {
return false;
}
if (schema.is_partial_update()) {
return true;
}
for (const auto& index_schema : schema.indexes()) {
for (const auto& index : index_schema->indexes) {
if (index->index_type() == INVERTED) {
return true;
}
}
}
return false;
}

} // namespace doris::pipeline
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
const TPipelineFragmentParams& params, const RowDescriptor& row_desc,
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);

bool _has_inverted_index_or_partial_update(TOlapTableSink sink);

OperatorXPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
Expand Down
Loading

0 comments on commit e29d8cb

Please sign in to comment.