Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei committed Dec 27, 2024
1 parent 442a274 commit 856c981
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
8 changes: 5 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class ExecEnv {
UserFunctionCache* user_function_cache() { return _user_function_cache; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
ClusterInfo* cluster_info() { return _cluster_info; }
ClusterInfo* cluster_info() { return _cluster_info.get(); }
LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
BfdParser* bfd_parser() const { return _bfd_parser; }
BrokerMgr* broker_mgr() const { return _broker_mgr; }
Expand Down Expand Up @@ -274,7 +274,9 @@ class ExecEnv {
void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
_memtable_memory_limiter.reset(limiter);
}
void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; }
void set_cluster_info(std::unique_ptr<ClusterInfo>&& cluster_info) {
this->_cluster_info = std::move(cluster_info);
}
void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr);
void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor);

Expand Down Expand Up @@ -418,7 +420,7 @@ class ExecEnv {
WorkloadGroupMgr* _workload_group_manager = nullptr;

ResultCache* _result_cache = nullptr;
ClusterInfo* _cluster_info = nullptr;
std::unique_ptr<ClusterInfo> _cluster_info = nullptr;
LoadPathMgr* _load_path_mgr = nullptr;

BfdParser* _bfd_parser = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
config::query_cache_elasticity_size_mb);
_cluster_info = new ClusterInfo();
_cluster_info = std::make_unique<ClusterInfo>();
_load_path_mgr = new LoadPathMgr(this);
_bfd_parser = BfdParser::create();
_broker_mgr = new BrokerMgr(this);
Expand Down Expand Up @@ -829,7 +829,7 @@ void ExecEnv::destroy() {
// Master info should be deconstruct later than fragment manager, because fragment will
// access cluster_info.backend_id to access some info. If there is a running query and master
// info is deconstructed then BE process will core at coordinator back method in fragment mgr.
SAFE_DELETE(_cluster_info);
_cluster_info.reset();

// NOTE: runtime query statistics mgr could be visited by query and daemon thread
// so it should be created before all query begin and deleted after all query and daemon thread stoppped
Expand Down
2 changes: 1 addition & 1 deletion be/test/olap/wal/wal_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class WalManagerTest : public testing::Test {
_env->_cluster_info->master_fe_addr.hostname = "host name";
_env->_cluster_info->master_fe_addr.port = 1234;
_env->_cluster_info->backend_id = 1001;
// _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_env->_stream_load_executor = StreamLoadExecutor::create_unique(_env);
Expand Down
16 changes: 9 additions & 7 deletions be/test/runtime/routine_load_task_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,29 @@ extern TLoadTxnRollbackResult k_stream_load_rollback_result;
extern TStreamLoadPutResult k_stream_load_put_result;

class RoutineLoadTaskExecutorTest : public testing::Test {
public:
ExecEnv* _env;

public:
RoutineLoadTaskExecutorTest() = default;
~RoutineLoadTaskExecutorTest() override = default;

void SetUp() override {
_env = ExecEnv::GetInstance();
k_stream_load_begin_result = TLoadTxnBeginResult();
k_stream_load_commit_result = TLoadTxnCommitResult();
k_stream_load_rollback_result = TLoadTxnRollbackResult();
k_stream_load_put_result = TStreamLoadPutResult();

_env.set_cluster_info(new ClusterInfo());
//_env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
_env->set_cluster_info(std::make_unique<ClusterInfo>());
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env));

config::max_routine_load_thread_pool_size = 1024;
config::max_consumer_num_per_group = 3;
}

void TearDown() override { delete _env.cluster_info(); }

ExecEnv _env;
//void TearDown() override { delete _env.cluster_info(); }
};

TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
Expand All @@ -92,7 +94,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {

task.__set_kafka_load_info(k_info);

RoutineLoadTaskExecutor executor(&_env);
RoutineLoadTaskExecutor executor(_env);
Status st;
st = executor.init(1024 * 1024);
EXPECT_TRUE(st.ok());
Expand Down

0 comments on commit 856c981

Please sign in to comment.