Skip to content

Commit

Permalink
FIX
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Dec 26, 2024
1 parent e82bc25 commit 080155e
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 11 deletions.
6 changes: 3 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class ExecEnv {
vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; }
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); }
WalManager* wal_mgr() { return _wal_manager.get(); }
WalManager* wal_mgr() { return _wal_manager; }
DNSCache* dns_cache() { return _dns_cache; }
WriteCooldownMetaExecutors* write_cooldown_meta_executors() {
return _write_cooldown_meta_executors.get();
Expand Down Expand Up @@ -295,7 +295,7 @@ class ExecEnv {
void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
this->_routine_load_task_executor = r;
}
void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager = wm; }
void set_wal_mgr(WalManager* wm) { this->_wal_manager = wm; }
void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) {
this->_dummy_lru_cache = dummy_lru_cache;
}
Expand Down Expand Up @@ -446,7 +446,7 @@ class ExecEnv {
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
WalManager* _wal_manager;
DNSCache* _dns_cache = nullptr;
std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors;

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_wal_manager = WalManager::create_unique(this, config::group_commit_wal_path).release();
_dns_cache = new DNSCache();
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
_spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map));
Expand Down Expand Up @@ -674,7 +674,6 @@ void ExecEnv::destroy() {
_s_ready = false;

SAFE_STOP(_wal_manager);
_wal_manager.reset();
SAFE_STOP(_load_channel_mgr);
SAFE_STOP(_scanner_scheduler);
SAFE_STOP(_broker_mgr);
Expand Down Expand Up @@ -717,6 +716,7 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_s3_file_system_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);

SAFE_DELETE(_wal_manager);
SAFE_DELETE(_load_channel_mgr);

SAFE_DELETE(_spill_stream_mgr);
Expand Down
1 change: 1 addition & 0 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ ThreadPool::~ThreadPool() {
// There should only be one live token: the one used in tokenless submission.
CHECK_EQ(1, _tokens.size()) << strings::Substitute(
"Threadpool $0 destroyed with $1 allocated tokens", _name, _tokens.size());
shutdown();
}

Status ThreadPool::init() {
Expand Down
4 changes: 2 additions & 2 deletions be/test/http/stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) {

TEST_F(StreamLoadTest, TestHeader) {
// 1G
auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), config::group_commit_wal_path);
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0));
ExecEnv::GetInstance()->set_wal_mgr(wal_mgr);
ExecEnv::GetInstance()->set_wal_mgr(wal_mgr.release());
// 1. empty info
{
auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
Expand Down
6 changes: 3 additions & 3 deletions be/test/olap/wal/wal_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class WalManagerTest : public testing::Test {
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_env->_stream_load_executor = StreamLoadExecutor::create_shared(_env);
_env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
_env->_wal_manager = WalManager::create_shared(_env, wal_dir.string());
_env->_wal_manager = WalManager::create_unique(_env, wal_dir.string()).release();
k_stream_load_begin_result = TLoadTxnBeginResult();
}
void TearDown() override {
Expand Down Expand Up @@ -155,9 +155,9 @@ TEST_F(WalManagerTest, recovery_normal) {
}

TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(_env, config::group_commit_wal_path);
static_cast<void>(wal_mgr->init());
_env->set_wal_mgr(wal_mgr);
_env->set_wal_mgr(wal_mgr.release());

// 1T
size_t available_bytes = 1099511627776;
Expand Down
2 changes: 1 addition & 1 deletion be/test/vec/exec/vwal_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ void VWalScannerTest::init() {
_env->_cluster_info->master_fe_addr.hostname = "host name";
_env->_cluster_info->master_fe_addr.port = _backend_id;
_env->_cluster_info->backend_id = 1001;
_env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
_env->_wal_manager = WalManager::create_unique(_env, _wal_dir).release();
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, _label_1, base_path,
Expand Down

0 comments on commit 080155e

Please sign in to comment.