Skip to content

Commit

Permalink
Merge branch 'master' into 20241021_fix_scratch_sink
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Oct 22, 2024
2 parents 71fe098 + ea7544c commit 43ca732
Show file tree
Hide file tree
Showing 49 changed files with 1,031 additions and 411 deletions.
2 changes: 0 additions & 2 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,6 @@ if (COMPILER_CLANG)
-Wno-implicit-float-conversion
-Wno-implicit-int-conversion
-Wno-sign-conversion
-Wno-missing-field-initializers
-Wno-unused-const-variable
-Wno-shorten-64-to-32)
if (USE_LIBCPP)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
Expand Down
25 changes: 10 additions & 15 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,12 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, boo
result->push_back(cell.file_block);
}

if (cell.file_block->cache_type() != FileCacheType::TTL ||
config::enable_ttl_cache_evict_using_lru) {
auto& queue = get_queue(cell.file_block->cache_type());
DCHECK(cell.queue_iterator) << "impossible";
/// Move to the end of the queue. The iterator remains valid.
if (move_iter_flag) {
queue.move_to_end(*cell.queue_iterator, cache_lock);
}
auto& queue = get_queue(cell.file_block->cache_type());
/// Move to the end of the queue. The iterator remains valid.
if (cell.queue_iterator && move_iter_flag) {
queue.move_to_end(*cell.queue_iterator, cache_lock);
}

cell.update_atime();
cell.is_deleted = false;
}
Expand Down Expand Up @@ -358,7 +355,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
if (cell.queue_iterator) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
}
Expand Down Expand Up @@ -1056,7 +1053,7 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b
auto st = cell.file_block->change_cache_type_between_ttl_and_others(
FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
if (cell.queue_iterator) {
ttl_queue.remove(cell.queue_iterator.value(), cache_lock);
}
auto& queue = get_queue(FileCacheType::NORMAL);
Expand Down Expand Up @@ -1133,7 +1130,7 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size
_files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
FileBlockCell* cell = get_cell(hash, offset, cache_lock);
DCHECK(cell != nullptr);
if (cell->file_block->cache_type() != FileCacheType::TTL) {
if (cell->queue_iterator) {
auto& queue = get_queue(cell->file_block->cache_type());
DCHECK(queue.contains(hash, offset, cache_lock));
auto iter = queue.get(hash, offset, cache_lock);
Expand Down Expand Up @@ -1809,10 +1806,8 @@ std::string BlockFileCache::clear_file_cache_directly() {
<< " time_elapsed=" << duration_cast<milliseconds>(steady_clock::now() - start).count()
<< " num_files=" << num_files << " cache_size=" << cache_size
<< " index_queue_size=" << index_queue_size << " normal_queue_size=" << normal_queue_size
<< " disposible_queue_size=" << disposible_queue_size;
if (config::enable_ttl_cache_evict_using_lru) {
ss << "ttl_queue_size=" << ttl_queue_size;
}
<< " disposible_queue_size=" << disposible_queue_size << "ttl_queue_size=" << ttl_queue_size;

auto msg = ss.str();
LOG(INFO) << msg;
return msg;
Expand Down
18 changes: 15 additions & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,26 @@ Result<DorisFSDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index

if (exists) {
LOG(ERROR) << "try to init a directory:" << local_fs_index_path << " already exists";
return ResultError(Status::InternalError("init_fulltext_index directory already exists"));
return ResultError(
Status::InternalError("InvertedIndexFileWriter::open directory already exists"));
}

bool can_use_ram_dir = true;
auto* dir = DorisFSDirectoryFactory::getDirectory(local_fs, local_fs_index_path.c_str(),
can_use_ram_dir);
_indices_dirs.emplace(std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()),
std::unique_ptr<DorisFSDirectory>(dir));
auto key = std::make_pair(index_meta->index_id(), index_meta->get_index_suffix());
auto [it, inserted] = _indices_dirs.emplace(key, std::unique_ptr<DorisFSDirectory>(dir));
if (!inserted) {
LOG(ERROR) << "InvertedIndexFileWriter::open attempted to insert a duplicate key: ("
<< key.first << ", " << key.second << ")";
LOG(ERROR) << "Directories already in map: ";
for (const auto& entry : _indices_dirs) {
LOG(ERROR) << "Key: (" << entry.first.first << ", " << entry.first.second << ")";
}
return ResultError(Status::InternalError(
"InvertedIndexFileWriter::open attempted to insert a duplicate dir"));
}

return dir;
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)),
_default_rowset_type(BETA_ROWSET),
_create_tablet_idx_lru_cache(
new CreateTabletIdxCache(config::partition_disk_index_lru_size)),
new CreateTabletRRIdxCache(config::partition_disk_index_lru_size)),
_snapshot_mgr(std::make_unique<SnapshotManager>(*this)) {
REGISTER_HOOK_METRIC(unused_rowsets_count, [this]() {
// std::lock_guard<std::mutex> lock(_gc_mutex);
Expand Down Expand Up @@ -515,7 +515,7 @@ Status StorageEngine::set_cluster_id(int32_t cluster_id) {

int StorageEngine::_get_and_set_next_disk_index(int64 partition_id,
TStorageMedium::type storage_medium) {
auto key = CreateTabletIdxCache::get_key(partition_id, storage_medium);
auto key = CreateTabletRRIdxCache::get_key(partition_id, storage_medium);
int curr_index = _create_tablet_idx_lru_cache->get_index(key);
// -1, lru can't find key
if (curr_index == -1) {
Expand Down Expand Up @@ -1511,7 +1511,7 @@ Status StorageEngine::_persist_broken_paths() {
return Status::OK();
}

int CreateTabletIdxCache::get_index(const std::string& key) {
int CreateTabletRRIdxCache::get_index(const std::string& key) {
auto* lru_handle = lookup(key);
if (lru_handle) {
Defer release([cache = this, lru_handle] { cache->release(lru_handle); });
Expand All @@ -1522,7 +1522,7 @@ int CreateTabletIdxCache::get_index(const std::string& key) {
return -1;
}

void CreateTabletIdxCache::set_index(const std::string& key, int next_idx) {
void CreateTabletRRIdxCache::set_index(const std::string& key, int next_idx) {
assert(next_idx >= 0);
auto* value = new CacheValue;
value->idx = next_idx;
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Thread;
class ThreadPool;
class TxnManager;
class ReportWorker;
class CreateTabletIdxCache;
class CreateTabletRRIdxCache;
struct DirInfo;
class SnapshotManager;

Expand Down Expand Up @@ -532,15 +532,15 @@ class StorageEngine final : public BaseStorageEngine {
// next index for create tablet
std::map<TStorageMedium::type, int> _last_use_index;

std::unique_ptr<CreateTabletIdxCache> _create_tablet_idx_lru_cache;
std::unique_ptr<CreateTabletRRIdxCache> _create_tablet_idx_lru_cache;

std::unique_ptr<SnapshotManager> _snapshot_mgr;
};

// lru cache for create tabelt round robin in disks
// key: partitionId_medium
// value: index
class CreateTabletIdxCache : public LRUCachePolicy {
class CreateTabletRRIdxCache : public LRUCachePolicy {
public:
// get key, delimiter with DELIMITER '-'
static std::string get_key(int64_t partition_id, TStorageMedium::type medium) {
Expand All @@ -557,10 +557,10 @@ class CreateTabletIdxCache : public LRUCachePolicy {
int idx = 0;
};

CreateTabletIdxCache(size_t capacity)
CreateTabletRRIdxCache(size_t capacity)
: LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity,
LRUCacheType::NUMBER,
/*stale_sweep_time_s*/ 30 * 60) {}
/*stale_sweep_time_s*/ 30 * 60, 1) {}
};

struct DirInfo {
Expand Down
Loading

0 comments on commit 43ca732

Please sign in to comment.