Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Mar 25, 2024
1 parent e98b4b3 commit e4fb60c
Show file tree
Hide file tree
Showing 11 changed files with 23 additions and 25 deletions.
4 changes: 4 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskReques
auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
if (tablet_ptr != nullptr) {
EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();
} else {
status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
Expand Down Expand Up @@ -929,6 +930,7 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest&
EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id,
check_consistency_req.schema_hash, check_consistency_req.version,
&checksum);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
Status status = engine_task.execute();
if (!status.ok()) {
LOG_WARNING("failed to check consistency")
Expand Down Expand Up @@ -1582,6 +1584,7 @@ void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
std::vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();

// Return result to fe
Expand Down Expand Up @@ -1904,6 +1907,7 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,

std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(engine, clone_req, master_info, req.signature, &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();
// Return result to fe
TFinishTaskRequest finish_task_request;
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_batch_load_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ EngineBatchLoadTask::EngineBatchLoadTask(StorageEngine& engine, TPushReq& push_r
EngineBatchLoadTask::~EngineBatchLoadTask() = default;

Status EngineBatchLoadTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
Status status;
if (_push_req.push_type == TPushType::LOAD_V2) {
RETURN_IF_ERROR(_init());
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_batch_load_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;
class TPushReq;
class TTabletInfo;
class StorageEngine;
Expand Down Expand Up @@ -68,7 +67,6 @@ class EngineBatchLoadTask final : public EngineTask {
std::vector<TTabletInfo>* _tablet_infos;
std::string _remote_file_path;
std::string _local_file_path;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // class EngineBatchLoadTask
} // namespace doris
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
10 changes: 5 additions & 5 deletions be/src/olap/task/engine_checksum_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ EngineChecksumTask::EngineChecksumTask(StorageEngine& engine, TTabletId tablet_i
_tablet_id(tablet_id),
_schema_hash(schema_hash),
_version(version),
_checksum(checksum),
_mem_tracker(std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
"EngineChecksumTask#tabletId=" + std::to_string(tablet_id))) {}
_checksum(checksum) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
"EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
}

EngineChecksumTask::~EngineChecksumTask() = default;

Status EngineChecksumTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
return _compute_checksum();
} // execute

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_checksum_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "olap/task/engine_task.h"

namespace doris {
class MemTrackerLimiter;
class StorageEngine;

// base class for storage engine
Expand All @@ -48,7 +47,6 @@ class EngineChecksumTask final : public EngineTask {
TSchemaHash _schema_hash;
TVersion _version;
uint32_t* _checksum;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ EngineCloneTask::EngineCloneTask(StorageEngine& engine, const TCloneReq& clone_r

Status EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
SCOPED_ATTACH_TASK(_mem_tracker);
if (!_engine.tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/task/engine_clone_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

namespace doris {
class DataDir;
class MemTrackerLimiter;
class TCloneReq;
class TMasterInfo;
class TTabletInfo;
Expand Down Expand Up @@ -95,7 +94,6 @@ class EngineCloneTask final : public EngineTask {
const TMasterInfo& _master_info;
int64_t _copy_size;
int64_t _copy_time_ms;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
std::vector<PendingRowsetGuard> _pending_rs_guards;
}; // EngineTask

Expand Down
17 changes: 8 additions & 9 deletions be/src/olap/task/engine_index_change_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,25 @@
#include "olap/task/engine_index_change_task.h"

#include "olap/storage_engine.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"

namespace doris {

EngineIndexChangeTask::EngineIndexChangeTask(
StorageEngine& engine, const TAlterInvertedIndexReq& alter_inverted_index_request)
: _engine(engine),
_alter_inverted_index_req(alter_inverted_index_request),
_mem_tracker(std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineIndexChangeTask#tabletId={}",
std::to_string(_alter_inverted_index_req.tablet_id)),
config::memory_limitation_per_thread_for_schema_change_bytes)) {}
: _engine(engine), _alter_inverted_index_req(alter_inverted_index_request) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("EngineIndexChangeTask#tabletId={}",
std::to_string(_alter_inverted_index_req.tablet_id)),
config::memory_limitation_per_thread_for_schema_change_bytes);
}

EngineIndexChangeTask::~EngineIndexChangeTask() = default;

Status EngineIndexChangeTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
DorisMetrics::instance()->alter_inverted_index_requests_total->increment(1);
uint64_t start = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/task/engine_index_change_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
namespace doris {
class StorageEngine;
class TAlterInvertedIndexReq;
class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
Expand All @@ -37,8 +36,6 @@ class EngineIndexChangeTask final : public EngineTask {
private:
StorageEngine& _engine;
const TAlterInvertedIndexReq& _alter_inverted_index_req;

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/olap/task/engine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@

namespace doris {

class MemTrackerLimiter;

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
class EngineTask {
public:
virtual ~EngineTask() = default;
virtual Status execute() = 0;
std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return _mem_tracker; }

std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

} // end namespace doris
1 change: 1 addition & 0 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class ThreadContext {
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
Expand Down

0 comments on commit e4fb60c

Please sign in to comment.