Skip to content

Commit

Permalink
[feature](audit-log) add audit-log in insert into (apache#27641)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Nov 29, 2023
1 parent 32367c6 commit d9d5468
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 12 deletions.
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,8 @@ Status PipelineFragmentContext::send_report(bool done) {
_runtime_state.get(),
std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1),
std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1,
std::placeholders::_2)},
std::placeholders::_2),
_dml_query_statistics()},
shared_from_this());
}

Expand Down
12 changes: 12 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag

uint64_t create_time() const { return _create_time; }

void set_query_statistics(std::shared_ptr<QueryStatistics> query_statistics) {
_query_statistics = query_statistics;
}

protected:
Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state);
Status _build_pipelines(ExecNode*, PipelinePtr);
Expand Down Expand Up @@ -222,10 +226,18 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag

private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
std::shared_ptr<QueryStatistics> _dml_query_statistics() {
if (_query_statistics && _query_statistics->collect_dml_statistics()) {
return _query_statistics;
}
return nullptr;
}
std::vector<std::unique_ptr<PipelineTask>> _tasks;
bool _group_commit;

uint64_t _create_time;

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
} // namespace pipeline
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState*
_root(_operators.back()),
_sink(sink) {
_pipeline_task_watcher.start();
_query_statistics.reset(new QueryStatistics());
_query_statistics.reset(new QueryStatistics(state->query_options().query_type));
_sink->set_query_statistics(_query_statistics);
_collect_query_statistics_with_every_batch =
_pipeline->collect_query_statistics_with_every_batch();
fragment_context->set_query_statistics(_query_statistics);
}

PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,8 @@ Status PipelineXFragmentContext::send_report(bool done) {
_runtime_state.get(),
std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1),
std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1,
std::placeholders::_2)},
std::placeholders::_2),
nullptr},
shared_from_this());
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.__set_finished_scan_ranges(req.runtime_state->num_finished_range());

DCHECK(req.runtime_state != nullptr);

if (req.query_statistics) {
TQueryStatistics queryStatistics;
DCHECK(req.query_statistics->collect_dml_statistics());
req.query_statistics->to_thrift(&queryStatistics);
params.__set_query_statistics(queryStatistics);
}

if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) {
// this is a load plan, and load is not finished, just make a brief report
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
Expand Down
25 changes: 22 additions & 3 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
VLOG_NOTICE << "plan_root=\n" << _plan->debug_string();
_prepared = true;

_query_statistics.reset(new QueryStatistics());
_query_statistics.reset(new QueryStatistics(request.query_options.query_type));
if (_sink != nullptr) {
_sink->set_query_statistics(_query_statistics);
}
Expand Down Expand Up @@ -440,7 +440,25 @@ bool PlanFragmentExecutor::is_timeout(const VecDateTimeValue& now) const {

void PlanFragmentExecutor::_collect_query_statistics() {
_query_statistics->clear();
Status status = _plan->collect_query_statistics(_query_statistics.get());
Status status;
/// TODO(yxc):
// The judgment of enable_local_exchange here is a bug, it should not need to be checked. I will fix this later.
bool _is_local = false;
if (_runtime_state->query_options().__isset.enable_local_exchange) {
_is_local = _runtime_state->query_options().enable_local_exchange;
}

if (_is_local) {
if (_runtime_state->num_per_fragment_instances() == 1) {
status = _plan->collect_query_statistics(_query_statistics.get());
} else {
status = _plan->collect_query_statistics(_query_statistics.get(),
_runtime_state->per_fragment_instance_idx());
}
} else {
status = _plan->collect_query_statistics(_query_statistics.get());
}

if (!status.ok()) {
LOG(INFO) << "collect query statistics failed, st=" << status;
return;
Expand Down Expand Up @@ -548,7 +566,8 @@ void PlanFragmentExecutor::send_report(bool done) {
_runtime_state.get(),
std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1),
std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1,
std::placeholders::_2)};
std::placeholders::_2),
_dml_query_statistics()};
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/plan_fragment_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ class PlanFragmentExecutor {

void _collect_query_statistics();

std::shared_ptr<QueryStatistics> _dml_query_statistics() {
if (_query_statistics && _query_statistics->collect_dml_statistics()) {
return _query_statistics;
}
return nullptr;
}

void _collect_node_statistics();
};

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/object_pool.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
#include "task_group/task_group.h"
Expand Down Expand Up @@ -59,6 +60,7 @@ struct ReportStatusRequest {
RuntimeState* runtime_state;
std::function<Status(Status)> update_fn;
std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn;
std::shared_ptr<QueryStatistics> query_statistics;
};
// Save the common components of fragments in a query.
// Some components like DescriptorTbl may be very large
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
}
}

void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
DCHECK(statistics != nullptr);
statistics->scan_bytes = scan_bytes;
statistics->scan_rows = scan_rows;
statistics->cpu_ms = cpu_ms;
statistics->returned_rows = returned_rows;
statistics->max_peak_memory_bytes = max_peak_memory_bytes;
}

void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
Expand Down
19 changes: 15 additions & 4 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <stdint.h>

#include <map>
Expand Down Expand Up @@ -57,9 +59,14 @@ class NodeStatistics {
// or plan's statistics and QueryStatisticsRecvr is responsible for collecting it.
class QueryStatistics {
public:
QueryStatistics()
: scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0), max_peak_memory_bytes(0) {}
~QueryStatistics();
QueryStatistics(TQueryType::type query_type = TQueryType::type::SELECT)
: scan_rows(0),
scan_bytes(0),
cpu_ms(0),
returned_rows(0),
max_peak_memory_bytes(0),
_query_type(query_type) {}
virtual ~QueryStatistics();

void merge(const QueryStatistics& other);

Expand Down Expand Up @@ -107,11 +114,14 @@ class QueryStatistics {
}

void to_pb(PQueryStatistics* statistics);

void to_thrift(TQueryStatistics* statistics) const;
void from_pb(const PQueryStatistics& statistics);
bool collected() const { return _collected; }
void set_collected() { _collected = true; }

// LOAD does not need to collect information on the exchange node.
bool collect_dml_statistics() { return _query_type == TQueryType::LOAD; }

private:
friend class QueryStatisticsRecvr;
int64_t scan_rows;
Expand All @@ -127,6 +137,7 @@ class QueryStatistics {
using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
NodeStatisticsMap _nodes_statistics_map;
bool _collected = false;
const TQueryType::type _query_type;
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/exec/vexchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,16 @@ void VExchangeNode::release_resource(RuntimeState* state) {

Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->merge(_sub_plan_query_statistics_recvr.get());
if (!statistics->collect_dml_statistics()) {
statistics->merge(_sub_plan_query_statistics_recvr.get());
}
return Status::OK();
}
Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
if (!statistics->collect_dml_statistics()) {
statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
}
return Status::OK();
}
Status VExchangeNode::close(RuntimeState* state) {
Expand Down
12 changes: 12 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,16 @@ struct TDetailedReportParams {
3: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
}


struct TQueryStatistics {
// A thrift structure identical to the PQueryStatistics structure.
1: optional i64 scan_rows
2: optional i64 scan_bytes
3: optional i64 returned_rows
4: optional i64 cpu_ms
5: optional i64 max_peak_memory_bytes
}

// The results of an INSERT query, sent to the coordinator as part of
// TReportExecStatusParams
struct TReportExecStatusParams {
Expand Down Expand Up @@ -458,6 +468,8 @@ struct TReportExecStatusParams {
22: optional i32 finished_scan_ranges

23: optional list<TDetailedReportParams> detailed_report

24: optional TQueryStatistics query_statistics
}

struct TFeResult {
Expand Down

0 comments on commit d9d5468

Please sign in to comment.