Skip to content

Commit

Permalink
improve spill audit log
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Nov 12, 2024
1 parent 580dee6 commit 160cedf
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 80 deletions.
8 changes: 0 additions & 8 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,10 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {

Status close(RuntimeState* state) override {
if (Base::_query_statistics) {
auto* write_block_bytes = Base::profile()->get_counter("SpillWriteBlockBytes");
auto* write_file_bytes = Base::profile()->get_counter("SpillWriteFileBytes");
auto* read_block_bytes = Base::profile()->get_counter("SpillReadBlockBytes");
auto* read_file_bytes = Base::profile()->get_counter("SpillReadFileBytes");
Base::_query_statistics->add_spill_bytes(
write_block_bytes ? write_block_bytes->value() : 0,
write_file_bytes ? write_file_bytes->value() : 0,
read_block_bytes ? read_block_bytes->value() : 0,
read_file_bytes ? read_file_bytes->value() : 0);
}
return Base::close(state);
Expand Down Expand Up @@ -747,14 +743,10 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA

Status close(RuntimeState* state, Status exec_status) override {
if (Base::_query_statistics) {
auto* write_block_bytes = Base::profile()->get_counter("SpillWriteBlockBytes");
auto* write_file_bytes = Base::profile()->get_counter("SpillWriteFileBytes");
auto* read_block_bytes = Base::profile()->get_counter("SpillReadBlockBytes");
auto* read_file_bytes = Base::profile()->get_counter("SpillReadFileBytes");
Base::_query_statistics->add_spill_bytes(
write_block_bytes ? write_block_bytes->value() : 0,
write_file_bytes ? write_file_bytes->value() : 0,
read_block_bytes ? read_block_bytes->value() : 0,
read_file_bytes ? read_file_bytes->value() : 0);
}
return Base::close(state, exec_status);
Expand Down
24 changes: 8 additions & 16 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ void QueryStatistics::merge(const QueryStatistics& other) {
this->current_used_memory_bytes = other_memory_used;
}

_spill_write_block_bytes += other._spill_write_block_bytes;
_spill_write_file_bytes += other._spill_write_file_bytes;
_spill_read_block_bytes += other._spill_read_block_bytes;
_spill_read_file_bytes += other._spill_read_file_bytes;
_spill_write_bytes_to_local_storage += other._spill_write_bytes_to_local_storage;
_spill_read_bytes_from_local_storage += other._spill_read_bytes_from_local_storage;
}

void QueryStatistics::to_pb(PQueryStatistics* statistics) {
Expand All @@ -60,10 +58,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
statistics->set_spill_write_block_bytes(_spill_write_block_bytes);
statistics->set_spill_write_file_bytes(_spill_write_file_bytes);
statistics->set_spill_read_block_bytes(_spill_read_block_bytes);
statistics->set_spill_read_file_bytes(_spill_read_file_bytes);
statistics->set_spill_write_bytes_to_local_storage(_spill_write_bytes_to_local_storage);
statistics->set_spill_read_bytes_from_local_storage(_spill_read_bytes_from_local_storage);
}

void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
Expand All @@ -78,10 +74,8 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
statistics->__set_shuffle_send_rows(shuffle_send_rows);
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
statistics->__set_spill_write_block_bytes(_spill_write_block_bytes);
statistics->__set_spill_write_file_bytes(_spill_write_file_bytes);
statistics->__set_spill_read_block_bytes(_spill_read_block_bytes);
statistics->__set_spill_read_file_bytes(_spill_read_file_bytes);
statistics->__set_spill_write_bytes_to_local_storage(_spill_write_bytes_to_local_storage);
statistics->__set_spill_read_bytes_from_local_storage(_spill_read_bytes_from_local_storage);
}

void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
Expand All @@ -90,10 +84,8 @@ void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
_scan_bytes_from_local_storage = statistics.scan_bytes_from_local_storage();
_scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage();
_spill_write_block_bytes = statistics.spill_write_block_bytes();
_spill_write_file_bytes = statistics.spill_write_file_bytes();
_spill_read_block_bytes = statistics.spill_read_block_bytes();
_spill_read_file_bytes = statistics.spill_read_file_bytes();
_spill_write_bytes_to_local_storage = statistics.spill_write_bytes_to_local_storage();
_spill_read_bytes_from_local_storage = statistics.spill_read_bytes_from_local_storage();
}

QueryStatistics::~QueryStatistics() {}
Expand Down
22 changes: 8 additions & 14 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ class QueryStatistics {
current_used_memory_bytes(0),
shuffle_send_bytes(0),
shuffle_send_rows(0),
_spill_write_block_bytes(0),
_spill_write_file_bytes(0),
_spill_read_block_bytes(0),
_spill_read_file_bytes(0) {}
_spill_write_bytes_to_local_storage(0),
_spill_read_bytes_from_local_storage(0) {}
virtual ~QueryStatistics();

void merge(const QueryStatistics& other);
Expand Down Expand Up @@ -84,12 +82,10 @@ class QueryStatistics {
current_used_memory_bytes = current_used_memory;
}

void add_spill_bytes(int64_t spill_write_block_bytes, int64_t spill_write_file_bytes,
int64_t spill_read_block_bytes, int64_t spill_read_file_bytes) {
_spill_write_block_bytes += spill_write_block_bytes;
_spill_write_file_bytes += spill_write_file_bytes;
_spill_read_block_bytes += spill_read_block_bytes;
_spill_read_file_bytes += spill_read_file_bytes;
void add_spill_bytes(int64_t spill_write_bytes_to_local_storage,
int64_t spill_read_bytes_from_local_storage) {
_spill_write_bytes_to_local_storage += spill_write_bytes_to_local_storage;
_spill_read_bytes_from_local_storage += spill_read_bytes_from_local_storage;
}

void to_pb(PQueryStatistics* statistics);
Expand Down Expand Up @@ -119,10 +115,8 @@ class QueryStatistics {
std::atomic<int64_t> shuffle_send_bytes;
std::atomic<int64_t> shuffle_send_rows;

std::atomic<int64_t> _spill_write_block_bytes;
std::atomic<int64_t> _spill_write_file_bytes;
std::atomic<int64_t> _spill_read_block_bytes;
std::atomic<int64_t> _spill_read_file_bytes;
std::atomic<int64_t> _spill_write_bytes_to_local_storage;
std::atomic<int64_t> _spill_read_bytes_from_local_storage;
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
Expand Down
30 changes: 8 additions & 22 deletions fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,10 @@ public enum EventType {
public long scanBytesFromLocalStorage = -1;
@AuditField(value = "ScanBytesFromRemoteStorage")
public long scanBytesFromRemoteStorage = -1;
@AuditField(value = "SpillWriteBlockBytes")
public long spillWriteBlockBytes = -1;
@AuditField(value = "SpillWriteFileBytes")
public long spillWriteFileBytes = -1;
@AuditField(value = "SpillReadBlockBytes")
public long spillReadBlockBytes = -1;
@AuditField(value = "SpillReadFileBytes")
public long spillReadFileBytes = -1;
@AuditField(value = "SpillWriteBytesToLocalStorage")
public long spillWriteBytesToLocalStorage = -1;
@AuditField(value = "SpillReadBytesFromLocalStorage")
public long spillReadBytesFromLocalStorage = -1;

public long pushToAuditLogQueueTime;

Expand Down Expand Up @@ -278,23 +274,13 @@ public AuditEventBuilder setScanBytesFromRemoteStorage(long scanBytesFromRemoteS
return this;
}

public AuditEventBuilder setSpillWriteBlockBytes(long spillWriteBlockBytes) {
auditEvent.spillWriteBlockBytes = spillWriteBlockBytes;
public AuditEventBuilder setSpillWriteBytesToLocalStorage(long bytes) {
auditEvent.spillWriteBytesToLocalStorage = bytes;
return this;
}

public AuditEventBuilder setSpillWriteFileBytes(long spillWriteFileBytes) {
auditEvent.spillWriteFileBytes = spillWriteFileBytes;
return this;
}

public AuditEventBuilder setSpillReadBlockBytes(long spillReadBlockBytes) {
auditEvent.spillReadBlockBytes = spillReadBlockBytes;
return this;
}

public AuditEventBuilder setSpillReadFileBytes(long spillReadFileBytes) {
auditEvent.spillReadFileBytes = spillReadFileBytes;
public AuditEventBuilder setSpillReadBytesFromLocalStorage(long bytes) {
auditEvent.spillReadBytesFromLocalStorage = bytes;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,8 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
.setQueryTime(elapseMs)
.setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
.setScanRows(statistics == null ? 0 : statistics.getScanRows())
.setSpillWriteBlockBytes(statistics == null ? 0 : statistics.getSpillWriteBlockBytes())
.setSpillWriteFileBytes(statistics == null ? 0 : statistics.getSpillWriteFileBytes())
.setSpillReadBlockBytes(statistics == null ? 0 : statistics.getSpillReadBlockBytes())
.setSpillReadFileBytes(statistics == null ? 0 : statistics.getSpillReadFileBytes())
.setSpillWriteBytesToLocalStorage (statistics == null ? 0 : statistics.getSpillWriteBytesToLocalStorage ())
.setSpillReadBytesFromLocalStorage (statistics == null ? 0 : statistics.getSpillReadBytesFromLocalStorage ())
.setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
.setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes())
.setReturnRows(ctx.getReturnRows())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,8 @@ protected void runAfterCatalogReady() {
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
auditEvent.spillWriteBlockBytes = queryStats.spill_write_block_bytes;
auditEvent.spillWriteFileBytes = queryStats.spill_write_file_bytes;
auditEvent.spillReadBlockBytes = queryStats.spill_read_block_bytes;
auditEvent.spillReadFileBytes = queryStats.spill_read_file_bytes;
auditEvent.spillWriteBytesToLocalStorage = queryStats.spill_write_bytes_to_local_storage;
auditEvent.spillReadBytesFromLocalStorage = queryStats.spill_read_bytes_from_local_storage;
}
boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
if (!ret) {
Expand Down Expand Up @@ -230,10 +228,8 @@ private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) {
if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
}
dst.spill_write_block_bytes += src.spill_write_block_bytes;
dst.spill_write_file_bytes += src.spill_write_file_bytes;
dst.spill_read_block_bytes += src.spill_read_block_bytes;
dst.spill_read_file_bytes += src.spill_read_file_bytes;
dst.spill_write_bytes_to_local_storage += src.spill_write_bytes_to_local_storage ;
dst.spill_read_bytes_from_local_storage += src.spill_read_bytes_from_local_storage ;
}

private void queryAuditEventLogWriteLock() {
Expand Down
6 changes: 2 additions & 4 deletions gensrc/proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ message PQueryStatistics {
repeated PNodeStatistics nodes_statistics = 6;
optional int64 scan_bytes_from_local_storage = 7;
optional int64 scan_bytes_from_remote_storage = 8;
optional int64 spill_write_block_bytes = 9;
optional int64 spill_write_file_bytes = 10;
optional int64 spill_read_block_bytes = 11;
optional int64 spill_read_file_bytes = 12;
optional int64 spill_write_bytes_to_local_storage = 9;
optional int64 spill_read_bytes_from_local_storage = 10;
}

message PRowBatch {
Expand Down
6 changes: 2 additions & 4 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,8 @@ struct TQueryStatistics {
9: optional i64 shuffle_send_rows
10: optional i64 scan_bytes_from_local_storage
11: optional i64 scan_bytes_from_remote_storage
12: optional i64 spill_write_block_bytes
13: optional i64 spill_write_file_bytes
14: optional i64 spill_read_block_bytes
15: optional i64 spill_read_file_bytes
12: optional i64 spill_write_bytes_to_local_storage
13: optional i64 spill_read_bytes_from_local_storage
}

struct TReportWorkloadRuntimeStatusParams {
Expand Down

0 comments on commit 160cedf

Please sign in to comment.