Skip to content

Commit

Permalink
be code
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Dec 12, 2023
1 parent e158753 commit ab953de
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 16 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
}
if (tnode.__isset.calc_found_rows && tnode.calc_found_rows) {
_limit = -1; //TODO: check how to handle offset ???
}
}

ExecNode::~ExecNode() = default;
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class BufferControlBlock {
}
}

void update_total_return_rows(int64_t rows) {
if (_query_statistics != nullptr) {
_query_statistics->set_total_return_rows(rows);
}
}

void update_max_peak_memory_bytes() {
if (_query_statistics != nullptr) {
int64_t max_peak_memory_bytes = _query_statistics->calculate_max_peak_memory_bytes();
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ void QueryStatistics::merge(const QueryStatistics& other) {
scan_rows += other.scan_rows;
scan_bytes += other.scan_bytes;
cpu_ms += other.cpu_ms;
for (auto& other_node_statistics : other._nodes_statistics_map) {
for (const auto& other_node_statistics : other._nodes_statistics_map) {
int64_t node_id = other_node_statistics.first;
auto node_statistics = add_nodes_statistics(node_id);
auto* node_statistics = add_nodes_statistics(node_id);
node_statistics->merge(*other_node_statistics.second);
}
}
Expand All @@ -55,8 +55,9 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_cpu_ms(cpu_ms);
statistics->set_returned_rows(returned_rows);
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
statistics->set_total_return_rows(total_return_rows);
for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) {
auto node_statistics = statistics->add_nodes_statistics();
auto* node_statistics = statistics->add_nodes_statistics();
node_statistics->set_node_id(iter->first);
iter->second->to_pb(node_statistics);
}
Expand Down
24 changes: 13 additions & 11 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,7 @@ class NodeStatistics {
class QueryStatistics {
public:
QueryStatistics(TQueryType::type query_type = TQueryType::type::SELECT)
: scan_rows(0),
scan_bytes(0),
cpu_ms(0),
returned_rows(0),
max_peak_memory_bytes(0),
_query_type(query_type) {}
: _query_type(query_type) {}
virtual ~QueryStatistics();

void merge(const QueryStatistics& other);
Expand All @@ -90,6 +85,8 @@ class QueryStatistics {

void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }

void set_total_return_rows(int64_t rows) { this->total_return_rows = rows; }

void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
this->max_peak_memory_bytes = max_peak_memory_bytes;
}
Expand All @@ -106,6 +103,7 @@ class QueryStatistics {
scan_rows = 0;
scan_bytes = 0;
cpu_ms = 0;
total_return_rows = 0;
returned_rows = 0;
max_peak_memory_bytes = 0;
clearNodeStatistics();
Expand All @@ -124,15 +122,19 @@ class QueryStatistics {

private:
friend class QueryStatisticsRecvr;
int64_t scan_rows;
int64_t scan_bytes;
int64_t cpu_ms;
int64_t scan_rows = 0;
int64_t scan_bytes = 0;
int64_t cpu_ms = 0;
// eg: select sum(k1) from table group by k2 limit 3;
// total_return_rows = count(select sum(k1) from table group by k2);
// returned_rows = 3;
int64_t total_return_rows = 0;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
int64_t returned_rows = 0;
// Maximum memory peak for all backends.
// only set once by result sink when closing.
int64_t max_peak_memory_bytes;
int64_t max_peak_memory_bytes = 0;
// The statistics of the query on each backend.
using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
NodeStatisticsMap _nodes_statistics_map;
Expand Down
32 changes: 30 additions & 2 deletions be/src/vec/sink/vresult_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ VResultSink::VResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>
}
_fetch_option = sink.fetch_option;
_name = "ResultSink";
_limit = sink.__isset.limit ? sink.limit : -1;
_offset = sink.__isset.offset ? sink.offset : 0;
}

VResultSink::~VResultSink() = default;
Expand Down Expand Up @@ -137,6 +139,30 @@ Status VResultSink::send(RuntimeState* state, Block* block, bool eos) {
SCOPED_TIMER(_exec_timer);
COUNTER_UPDATE(_blocks_sent_counter, 1);
COUNTER_UPDATE(_output_rows_counter, block->rows());

int block_rows = block->rows();
if (_num_row_skipped + block_rows < _offset) { // skip this block
_num_row_skipped += block_rows;
block->set_num_rows(0);
return Status::OK();
} else if (_num_row_skipped < _offset) { // skip some rows
auto offset = _offset - _num_row_skipped;
_num_row_skipped = _offset;
block->set_num_rows(block_rows - offset);
block_rows = block->rows();
}

_total_return_rows += block_rows;
if (!_reached_limit) {
if (_limit != -1 && _total_return_rows >= _limit) { // need cut some rows
block->set_num_rows((_limit + block_rows) - _total_return_rows);
_reached_limit = true;
}
} else {
// have reach limit, so could not send any data, just return to update _total_return_rows
return Status::OK();
}

if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
DCHECK(_sink_type == TResultSinkType::MYSQL_PROTOCAL);
RETURN_IF_ERROR(second_phase_fetch_data(state, block));
Expand Down Expand Up @@ -172,9 +198,11 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) {
_sender->update_num_written_rows(_writer->get_written_rows());
}
_sender->update_max_peak_memory_bytes();
static_cast<void>(_sender->close(final_status));
// limit 3, 10 : need check to add _num_row_skipped ???
_sender->update_total_return_rows(_total_return_rows + _num_row_skipped);
RETURN_IF_ERROR(_sender->close(final_status));
}
static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
RETURN_IF_ERROR(state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
state->fragment_instance_id()));
return DataSink::close(state, exec_status);
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/sink/vresult_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <gen_cpp/Types_types.h>
#include <stddef.h>

#include <cstdint>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -157,6 +158,11 @@ class VResultSink : public DataSink {

// for fetch data by rowids
TFetchOption _fetch_option;
int64_t _offset = 0;
int64_t _limit = -1;
bool _reached_limit = false;
int64_t _total_return_rows = 0;
int64_t _num_row_skipped = 0;
};
} // namespace vectorized

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public static void logAuditLog(ConnectContext ctx, String origStmt, StatementBas
long endTime = System.currentTimeMillis();
long elapseMs = endTime - ctx.getStartTime();

long totalReturnRows = statistics == null ? 0 : statistics.getTotalReturnRows();
ctx.setTotalReturnRows(totalReturnRows);
ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
.setState(ctx.getState().toString())
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ message PQueryStatistics {
optional int64 cpu_ms = 4;
optional int64 max_peak_memory_bytes = 5;
repeated PNodeStatistics nodes_statistics = 6;
optional int64 total_return_rows = 7;
}

message PRowBatch {
Expand Down

0 comments on commit ab953de

Please sign in to comment.