Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Nov 11, 2024
1 parent a586bac commit 7551d08
Show file tree
Hide file tree
Showing 9 changed files with 1,273 additions and 47 deletions.
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator_cross.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/partitioned_hash_join_probe_operator.h"

Expand Down Expand Up @@ -272,6 +273,9 @@ template class JoinProbeOperatorX<HashJoinProbeLocalState>;
template class JoinProbeLocalState<NestedLoopJoinSharedState, NestedLoopJoinProbeLocalState>;
template class JoinProbeOperatorX<NestedLoopJoinProbeLocalState>;

template class JoinProbeLocalState<NestedLoopJoinSharedState, NestedLoopJoinProbeLocalStateCross>;
template class JoinProbeOperatorX<NestedLoopJoinProbeLocalStateCross>;

template class JoinProbeLocalState<PartitionedHashJoinSharedState,
PartitionedHashJoinProbeLocalState>;
template class JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>;
Expand Down
73 changes: 72 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,69 @@ struct RuntimeFilterBuild {
NestedLoopJoinBuildSinkLocalState* _parent = nullptr;
};


BlockAccumulator::BlockAccumulator(size_t desired_size) : _desired_size(desired_size) {}

void BlockAccumulator::set_desired_size(size_t desired_size) {
_desired_size = desired_size;
}

void BlockAccumulator::reset() {
_output.clear();
_tmp_block.reset();
_accumulate_count = 0;
}

Status BlockAccumulator::push(vectorized::Block block) {
size_t input_rows = block.rows();
Status ok = Status::OK();
// TODO: optimize for zero-copy scenario
// Cut the input block into pieces if larger than desired
for (size_t start = 0; start < input_rows;) {
size_t remain_rows = input_rows - start;
size_t need_rows = 0;
if (_tmp_block) {
need_rows = std::min(_desired_size - _tmp_block->rows(), remain_rows);
ok = _tmp_block->add_rows(&block, start, need_rows);
} else {
need_rows = std::min(_desired_size, remain_rows);
_tmp_block = vectorized::MutableBlock::create_unique(block.clone_empty());
ok = _tmp_block->add_rows(&block, start, need_rows);
}

if (_tmp_block->rows() >= _desired_size) {
_output.emplace_back(_tmp_block->to_block());
_tmp_block.reset();
}
start += need_rows;
}
_accumulate_count++;
return ok;
}

bool BlockAccumulator::empty() const {
return _output.empty();
}

bool BlockAccumulator::reach_limit() const {
return _accumulate_count >= kAccumulateLimit;
}

vectorized::Block BlockAccumulator::pull() {
auto& output_block = _output.front();
_output.pop_front();
_accumulate_count--;
return output_block;
}

void BlockAccumulator::finalize() {
if (_tmp_block) {
_output.emplace_back(_tmp_block->to_block());
_tmp_block.reset();
}
_accumulate_count = 0;
}

NestedLoopJoinBuildSinkLocalState::NestedLoopJoinBuildSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: JoinBuildSinkLocalState<NestedLoopJoinSharedState, NestedLoopJoinBuildSinkLocalState>(
Expand All @@ -69,6 +132,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
RETURN_IF_ERROR(state->register_producer_runtime_filter(
p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], false));
}
accumulator = std::make_unique<BlockAccumulator>(4096);
return Status::OK();
}

Expand Down Expand Up @@ -131,14 +195,21 @@ Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector
if (rows != 0) {
local_state._build_rows += rows;
local_state._total_mem_usage += mem_usage;
local_state._shared_state->build_blocks.emplace_back(std::move(*block));
RETURN_IF_ERROR(local_state.accumulator->push(std::move(*block)));
// local_state._shared_state->build_blocks.emplace_back(std::move(*block));
if (_match_all_build || _is_right_semi_anti) {
local_state._shared_state->build_side_visited_flags.emplace_back(
vectorized::ColumnUInt8::create(rows, 0));
}
}

if (eos) {
local_state.accumulator->finalize();
while (!local_state.accumulator->empty()) {
local_state._shared_state->build_blocks.emplace_back(std::move(local_state.accumulator->_output.front()));
local_state.accumulator->_output.pop_front();
CHECK(local_state._shared_state->build_blocks.back().rows() !=0);
}
RuntimeFilterBuild rf_ctx(&local_state);
RETURN_IF_ERROR(rf_ctx(state));

Expand Down
25 changes: 24 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,29 @@

namespace doris::pipeline {


// Accumulate small block into desired size
class BlockAccumulator {
public:
static inline size_t kAccumulateLimit = 64;

BlockAccumulator() = default;
BlockAccumulator(size_t desired_size);
void set_desired_size(size_t desired_size);
void reset();
void finalize();
bool empty() const;
bool reach_limit() const;
Status push(vectorized::Block block);
vectorized::Block pull();

// private:
size_t _desired_size;
std::unique_ptr<vectorized::MutableBlock> _tmp_block;
std::deque<vectorized::Block> _output;
size_t _accumulate_count = 0;
};

class NestedLoopJoinBuildSinkOperatorX;

class NestedLoopJoinBuildSinkLocalState final
Expand Down Expand Up @@ -51,7 +74,7 @@ class NestedLoopJoinBuildSinkLocalState final
friend class NestedLoopJoinBuildSinkOperatorX;
uint64_t _build_rows = 0;
uint64_t _total_mem_usage = 0;

std::unique_ptr<BlockAccumulator> accumulator;
vectorized::VExprContextSPtrs _filter_src_expr_ctxs;
};

Expand Down
Loading

0 comments on commit 7551d08

Please sign in to comment.