Skip to content

Commit

Permalink
[feature-wip] (memory tracker) (step4) Switch TLS mem tracker to sepa…
Browse files Browse the repository at this point in the history
…rate more detailed memory usage (apache#8669)

Based on apache#8605, Separate out the memory usage of each operator from the Query/Load/StorageEngine mem tracker.
  • Loading branch information
xinyiZzz authored Apr 8, 2022
1 parent 7fb4b6a commit 519305c
Show file tree
Hide file tree
Showing 79 changed files with 378 additions and 150 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");
// Whether Hook TCmalloc new/delete, currently consume/release tls mem tracker in Hook.
CONF_Bool(track_new_delete, "true");

// If true, switch TLS MemTracker to count more detailed memory,
// including caches such as ExecNode operators and TabletManager.
CONF_Bool(memory_verbose_track, "true");

// Default level of MemTracker to show in web page
// now MemTracker support two level:
// OVERVIEW: 0
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/analytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Status AnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status AnalyticEvalNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
_child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0];
_curr_tuple_pool.reset(new MemPool(mem_tracker().get()));
Expand Down Expand Up @@ -184,6 +185,7 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {

Status AnalyticEvalNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
//RETURN_IF_ERROR(QueryMaintenance(state));
Expand Down Expand Up @@ -812,6 +814,7 @@ inline int64_t AnalyticEvalNode::num_output_rows_ready() const {

Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
//RETURN_IF_ERROR(QueryMaintenance(state));
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/assert_num_rows_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Status AssertNumRowsNode::prepare(RuntimeState* state) {

Status AssertNumRowsNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
// ISSUE-3435
RETURN_IF_ERROR(child(0)->open(state));
Expand All @@ -58,6 +59,7 @@ Status AssertNumRowsNode::open(RuntimeState* state) {
Status AssertNumRowsNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
output_batch->reset();
child(0)->get_next(state, output_batch, eos);
_num_rows_returned += output_batch->num_rows();
Expand Down
19 changes: 10 additions & 9 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
_counter(counter),
_src_tuple(nullptr),
_src_tuple_row(nullptr),
#if BE_TEST
_mem_tracker(new MemTracker()),
#else
_mem_tracker(MemTracker::create_tracker(
-1, state->query_type() == TQueryType::LOAD
? "BaseScanner:" + std::to_string(state->load_job_id())
: "BaseScanner:Select")),
#endif
_mem_pool(std::make_unique<MemPool>(_mem_tracker.get())),
_dest_tuple_desc(nullptr),
_pre_filter_texprs(pre_filter_texprs),
_strict_mode(false),
Expand All @@ -48,15 +57,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
_read_timer(nullptr),
_materialize_timer(nullptr),
_success(false),
_scanner_eof(false) {
#ifndef BE_TEST
_mem_pool.reset(new MemPool(state->query_type() == TQueryType::LOAD
? "BaseScanner:" + std::to_string(state->load_job_id())
: "BaseScanner:Select"));
#else
_mem_pool.reset(new MemPool());
#endif
}
_scanner_eof(false) {}

Status BaseScanner::open() {
RETURN_IF_ERROR(init_expr_ctxes());
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/blocking_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"

namespace doris {
Expand All @@ -46,6 +45,7 @@ BlockingJoinNode::~BlockingJoinNode() {
Status BlockingJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());

_build_pool.reset(new MemPool(mem_tracker().get()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
Expand Down Expand Up @@ -88,8 +88,9 @@ void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Statu
}

Status BlockingJoinNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
// RETURN_IF_ERROR(Expr::open(_conjuncts, state));

RETURN_IF_CANCELLED(state);
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "runtime/dpp_sink_internal.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"

namespace doris {
Expand Down Expand Up @@ -61,6 +60,7 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status BrokerScanNode::prepare(RuntimeState* state) {
VLOG_QUERY << "BrokerScanNode prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// get tuple desc
_runtime_state = state;
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
Expand Down Expand Up @@ -88,6 +88,7 @@ Status BrokerScanNode::prepare(RuntimeState* state) {

Status BrokerScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
Expand All @@ -108,6 +109,7 @@ Status BrokerScanNode::start_scanners() {

Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// check if CANCELLED.
if (state->is_cancelled()) {
std::unique_lock<std::mutex> l(_batch_queue_lock);
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"

Expand All @@ -35,6 +34,7 @@ CrossJoinNode::CrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Des
Status CrossJoinNode::prepare(RuntimeState* state) {
DCHECK(_join_op == TJoinOp::CROSS_JOIN);
RETURN_IF_ERROR(BlockingJoinNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_build_batch_pool.reset(new ObjectPool());
return Status::OK();
}
Expand Down Expand Up @@ -89,6 +89,7 @@ Status CrossJoinNode::get_next(RuntimeState* state, RowBatch* output_batch, bool
// TOOD(zhaochun)
// RETURN_IF_ERROR(state->check_query_state());
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());

if (reached_limit() || _eos) {
*eos = true;
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Status CsvScanNode::prepare(RuntimeState* state) {
}

RETURN_IF_ERROR(ScanNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());

// add timer
_split_check_timer = ADD_TIMER(_runtime_profile, "split check timer");
Expand Down Expand Up @@ -210,6 +211,8 @@ Status CsvScanNode::prepare(RuntimeState* state) {
}

Status CsvScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
VLOG_CRITICAL << "CsvScanNode::Open";

Expand All @@ -225,7 +228,6 @@ Status CsvScanNode::open(RuntimeState* state) {

RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_csv_scanner->open());

return Status::OK();
Expand All @@ -244,6 +246,7 @@ Status CsvScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());

if (reached_limit()) {
*eos = true;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/es_http_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "runtime/dpp_sink_internal.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/runtime_profile.h"

Expand Down Expand Up @@ -68,6 +67,7 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status EsHttpScanNode::prepare(RuntimeState* state) {
VLOG_QUERY << "EsHttpScanNode prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());

_scanner_profile.reset(new RuntimeProfile("EsHttpScanNode"));
runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
Expand Down Expand Up @@ -124,6 +124,7 @@ Status EsHttpScanNode::build_conjuncts_list() {

Status EsHttpScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
Expand Down Expand Up @@ -199,6 +200,7 @@ Status EsHttpScanNode::collect_scanners_status() {

Status EsHttpScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (state->is_cancelled()) {
std::unique_lock<std::mutex> l(_batch_queue_lock);
if (update_status(Status::Cancelled("Cancelled"))) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/es_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Status EsScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "EsScanNode::Prepare";

RETURN_IF_ERROR(ScanNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
std::stringstream ss;
Expand All @@ -85,6 +86,7 @@ Status EsScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));

// TExtOpenParams.row_schema
Expand Down Expand Up @@ -205,6 +207,7 @@ Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());

// create tuple
MemPool* tuple_pool = row_batch->tuple_data_pool();
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/except_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"

namespace doris {
ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
Expand All @@ -40,8 +39,9 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
}

Status ExceptNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Except Node, while probing the hash table.");
RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
Expand Down Expand Up @@ -88,6 +88,7 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos)
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
*eos = true;
if (reached_limit()) {
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status ExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime");
// TODO: figure out appropriate buffer size
DCHECK_GT(_num_senders, 0);
Expand All @@ -75,6 +76,7 @@ Status ExchangeNode::prepare(RuntimeState* state) {

Status ExchangeNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
if (_is_merging) {
RETURN_IF_ERROR(_sort_exec_exprs.open(state));
Expand Down Expand Up @@ -129,6 +131,7 @@ Status ExchangeNode::fill_input_row_batch(RuntimeState* state) {
Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());

if (reached_limit()) {
_stream_recvr->transfer_all_resources(output_batch);
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
#include "runtime/mem_tracker.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "runtime/query_statistics.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/blocking_queue.hpp"
#include "util/runtime_profile.h"
#include "util/uid_util.h" // for print_id

#include "vec/exprs/vexpr_context.h"

namespace doris {
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "runtime/row_batch.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"

Expand Down Expand Up @@ -96,6 +95,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());

_build_pool.reset(new MemPool(mem_tracker().get()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
Expand Down Expand Up @@ -220,6 +220,7 @@ Status HashJoinNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(Expr::open(_build_expr_ctxs, state));
RETURN_IF_ERROR(Expr::open(_probe_expr_ctxs, state));
Expand Down Expand Up @@ -306,6 +307,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo
// it may cause the memory to exceed the limit.
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());

if (reached_limit()) {
*eos = true;
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/intersect_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"

namespace doris {
IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
Expand All @@ -44,8 +43,9 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
// 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Intersect Node, while probing the hash table.");
RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
Expand Down Expand Up @@ -88,6 +88,7 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
*eos = true;
if (reached_limit()) {
return Status::OK();
Expand Down
Loading

0 comments on commit 519305c

Please sign in to comment.