Skip to content

Commit

Permalink
be code2
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Dec 14, 2023
1 parent ab953de commit e64a585
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions be/src/vec/sink/vresult_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ VResultSink::VResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>
_name = "ResultSink";
_limit = sink.__isset.limit ? sink.limit : -1;
_offset = sink.__isset.offset ? sink.offset : 0;
LOG(WARNING) << _name << " ,limit: " << _limit << " ,_offset: " << _offset;
}

VResultSink::~VResultSink() = default;
Expand Down Expand Up @@ -141,28 +142,36 @@ Status VResultSink::send(RuntimeState* state, Block* block, bool eos) {
COUNTER_UPDATE(_output_rows_counter, block->rows());

int block_rows = block->rows();
if (_num_row_skipped + block_rows < _offset) { // skip this block
LOG(WARNING) << "block_rows: " << block_rows << " _num_row_skipped: " << _num_row_skipped;
// doris always send block directly, because the limit have done before other node.
// but when enable_found_rows = true, will need doing limit at here.
// so only send limit rows to FE, other blocks only to get rows not send to FE.
if (_num_row_skipped + block_rows <= _offset) { // skip this block
_num_row_skipped += block_rows;
LOG(WARNING) << "skip this block: " << block_rows;
block->set_num_rows(0);
return Status::OK();
} else if (_num_row_skipped < _offset) { // skip some rows
auto offset = _offset - _num_row_skipped;
_num_row_skipped = _offset;
block->set_num_rows(block_rows - offset);
block_rows = block->rows();
LOG(WARNING) << "skip some rows left: " << block_rows;
}

_total_return_rows += block_rows;
LOG(WARNING) << "block_rows: " << block_rows << " ,_total_return_rows: " << _total_return_rows;
if (!_reached_limit) {
if (_limit != -1 && _total_return_rows >= _limit) { // need cut some rows
if (_limit != -1 && _total_return_rows > _limit) { // need cut some rows
block->set_num_rows((_limit + block_rows) - _total_return_rows);
LOG(WARNING) << "cut some rows left: " << block->rows();
_reached_limit = true;
}
} else {
LOG(WARNING) << "no need any more data: " << block->rows();
// have reach limit, so could not send any data, just return to update _total_return_rows
return Status::OK();
}

LOG(WARNING) << "_writer->append_block: " << block->rows();
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
DCHECK(_sink_type == TResultSinkType::MYSQL_PROTOCAL);
RETURN_IF_ERROR(second_phase_fetch_data(state, block));
Expand Down Expand Up @@ -198,8 +207,10 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) {
_sender->update_num_written_rows(_writer->get_written_rows());
}
_sender->update_max_peak_memory_bytes();
// limit 3, 10 : need check to add _num_row_skipped ???
_sender->update_total_return_rows(_total_return_rows + _num_row_skipped);
LOG(WARNING)
<< "----------------------update_total_return_rows-----------------------------"
<< (_total_return_rows + _num_row_skipped);
RETURN_IF_ERROR(_sender->close(final_status));
}
RETURN_IF_ERROR(state->exec_env()->result_mgr()->cancel_at_time(
Expand Down

0 comments on commit e64a585

Please sign in to comment.