Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Dec 20, 2024
1 parent db3aff9 commit 75d0874
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 44 deletions.
136 changes: 104 additions & 32 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ uint32_t HandleTable::element_count() const {
return _elems;
}

LRUCache::LRUCache(LRUCacheType type) : _type(type) {
LRUCache::LRUCache(LRUCacheType type, bool is_lru_k) : _type(type), _is_lru_k(is_lru_k) {
// Make empty circular linked list
_lru_normal.next = &_lru_normal;
_lru_normal.prev = &_lru_normal;
Expand Down Expand Up @@ -289,21 +289,35 @@ void LRUCache::_lru_append(LRUHandle* list, LRUHandle* e) {
}

Cache::Handle* LRUCache::lookup(const CacheKey& key, uint32_t hash) {
std::lock_guard l(_mutex);
++_lookup_count;
LRUHandle* e = _table.lookup(key, hash);
if (e != nullptr) {
// we get it from _table, so in_cache must be true
DCHECK(e->in_cache);
if (e->refs == 1) {
// only in LRU free list, remove it from list
_lru_remove(e);
LRUHandle* e;
{
std::lock_guard l(_mutex);
++_lookup_count;
e = _table.lookup(key, hash);
if (e != nullptr) {
// we get it from _table, so in_cache must be true
DCHECK(e->in_cache);
if (e->refs == 1) {
// only in LRU free list, remove it from list
_lru_remove(e);
}
e->refs++;
++_hit_count;
e->last_visit_time = UnixMillis();
} else {
++_miss_count;
}
}
// If key not exist in cache, and is lru k cache, and key in visits list,
// then move the key to beginning of the visits list.
// key in visits list indicates that the key has been inserted once after the cache is full.
if (e == nullptr && _is_lru_k) {
std::lock_guard l(_visits_lru_cache_mutex);
auto it = _visits_lru_cache_map.find(hash);
if (it != _visits_lru_cache_map.end()) {
_visits_lru_cache_list.splice(_visits_lru_cache_list.begin(), _visits_lru_cache_list,
it->second);
}
e->refs++;
++_hit_count;
e->last_visit_time = UnixMillis();
} else {
++_miss_count;
}
return reinterpret_cast<Cache::Handle*>(e);
}
Expand All @@ -316,17 +330,19 @@ void LRUCache::release(Cache::Handle* handle) {
bool last_ref = false;
{
std::lock_guard l(_mutex);
// if last_ref is true, key may have been evict from the cache,
// or if it is lru k, first insert of key may have failed.
last_ref = _unref(e);
if (last_ref) {
_usage -= e->total_size;
} else if (e->in_cache && e->refs == 1) {
if (e->in_cache && e->refs == 1) {
// only exists in cache
if (_usage > _capacity) {
// take this opportunity and remove the item
bool removed = _table.remove(e);
DCHECK(removed);
e->in_cache = false;
_unref(e);
// `entry->in_cache = false` and `_usage -= entry->total_size;` and `_unref(entry)` should appear together.
// see the comment for old entry in `LRUCache::insert`.
_usage -= e->total_size;
last_ref = true;
} else {
Expand Down Expand Up @@ -401,13 +417,52 @@ void LRUCache::_evict_one_entry(LRUHandle* e) {
DCHECK(removed);
e->in_cache = false;
_unref(e);
// `entry->in_cache = false` and `_usage -= entry->total_size;` and `_unref(entry)` should appear together.
// see the comment for old entry in `LRUCache::insert`.
_usage -= e->total_size;
}

bool LRUCache::_check_element_count_limit() {
return _element_count_capacity != 0 && _table.element_count() >= _element_count_capacity;
}

// After cache is full,
// 1.Return false. If key has been inserted into the visits list before,
// key is allowed to be inserted into cache this time (this will trigger cache evict),
// and key is removed from the visits list.
// 2. Return true. If key not in visits list, insert it into visits list.
bool LRUCache::_lru_k_insert_visits_list(size_t total_size, visits_lru_cache_key visits_key) {
if (_usage + total_size > _capacity ||
_check_element_count_limit()) { // this line no lock required
std::lock_guard l(_visits_lru_cache_mutex);
auto it = _visits_lru_cache_map.find(visits_key);
if (it != _visits_lru_cache_map.end()) {
_visits_lru_cache_usage -= it->second->second;
_visits_lru_cache_list.erase(it->second);
_visits_lru_cache_map.erase(it);
} else {
// _visits_lru_cache_list capacity is same as the cache itself.
// If _visits_lru_cache_list is full, some keys will also be evict.
while (_visits_lru_cache_usage + total_size > _capacity &&
_visits_lru_cache_usage != 0) {
DCHECK(!_visits_lru_cache_map.empty());
_visits_lru_cache_usage -= _visits_lru_cache_list.back().second;
_visits_lru_cache_map.erase(_visits_lru_cache_list.back().first);
_visits_lru_cache_list.pop_back();
}
// 1. If true, insert key at the beginning of _visits_lru_cache_list.
// 2. If false, it means total_size > cache _capacity, preventing this insert.
if (_visits_lru_cache_usage + total_size <= _capacity) {
_visits_lru_cache_list.emplace_front(visits_key, total_size);
_visits_lru_cache_map[visits_key] = _visits_lru_cache_list.begin();
_visits_lru_cache_usage += total_size;
}
return true;
}
}
return false;
}

Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
CachePriority priority) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
Expand All @@ -419,13 +474,18 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
// because charge at this time is no longer the memory size, but an weight.
e->total_size = (_type == LRUCacheType::SIZE ? handle_size + charge : charge);
e->hash = hash;
e->refs = 2; // one for the returned handle, one for LRUCache.
e->refs = 1; // only one for the returned handle.
e->next = e->prev = nullptr;
e->in_cache = true;
e->in_cache = false;
e->priority = priority;
e->type = _type;
memcpy(e->key_data, key.data(), key.size());
e->last_visit_time = UnixMillis();

if (_is_lru_k && _lru_k_insert_visits_list(e->total_size, hash)) {
return reinterpret_cast<Cache::Handle*>(e);
}

LRUHandle* to_remove_head = nullptr;
{
std::lock_guard l(_mutex);
Expand All @@ -441,13 +501,22 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
// insert into the cache
// note that the cache might get larger than its capacity if not enough
// space was freed
auto old = _table.insert(e);
auto* old = _table.insert(e);
e->in_cache = true;
_usage += e->total_size;
e->refs++; // one for the returned handle, one for LRUCache.
if (old != nullptr) {
_stampede_count++;
old->in_cache = false;
// `entry->in_cache = false` and `_usage -= entry->total_size;` and `_unref(entry)` should appear together.
// Whether the reference of the old entry is 0, the cache usage is subtracted here,
// because the old entry has been removed from the cache and should not be counted in the cache capacity,
// but the memory of the old entry is still tracked by the cache memory_tracker.
// After all the old handles are released, the old entry will be freed and the memory of the old entry
// will be released from the cache memory_tracker.
_usage -= old->total_size;
// if false, old entry is being used externally, just ref-- and sub _usage,
if (_unref(old)) {
_usage -= old->total_size;
// old is on LRU because it's in cache and its reference count
// was just 1 (Unref returned 0)
_lru_remove(old);
Expand Down Expand Up @@ -476,14 +545,15 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash) {
e = _table.remove(key, hash);
if (e != nullptr) {
last_ref = _unref(e);
if (last_ref) {
_usage -= e->total_size;
if (e->in_cache) {
// locate in free list
_lru_remove(e);
}
// if last_ref is false or in_cache is false, e must not be in lru
if (last_ref && e->in_cache) {
// locate in free list
_lru_remove(e);
}
e->in_cache = false;
// `entry->in_cache = false` and `_usage -= entry->total_size;` and `_unref(entry)` should appear together.
// see the comment for old entry in `LRUCache::insert`.
_usage -= e->total_size;
}
}
// free handle out of mutex, when last_ref is true, e must not be nullptr
Expand Down Expand Up @@ -576,7 +646,8 @@ inline uint32_t ShardedLRUCache::_hash_slice(const CacheKey& s) {
}

ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t capacity, LRUCacheType type,
uint32_t num_shards, uint32_t total_element_count_capacity)
uint32_t num_shards, uint32_t total_element_count_capacity,
bool is_lru_k)
: _name(name),
_num_shard_bits(Bits::FindLSBSetNonZero(num_shards)),
_num_shards(num_shards),
Expand All @@ -592,7 +663,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t capacity, LRUCa
(total_element_count_capacity + (_num_shards - 1)) / _num_shards;
LRUCache** shards = new (std::nothrow) LRUCache*[_num_shards];
for (int s = 0; s < _num_shards; s++) {
shards[s] = new LRUCache(type);
shards[s] = new LRUCache(type, is_lru_k);
shards[s]->set_capacity(per_shard);
shards[s]->set_element_count_capacity(per_shard_element_count_capacity);
}
Expand Down Expand Up @@ -623,8 +694,9 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t capacity, LRUCa
uint32_t num_shards,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp,
uint32_t total_element_count_capacity)
: ShardedLRUCache(name, capacity, type, num_shards, total_element_count_capacity) {
uint32_t total_element_count_capacity, bool is_lru_k)
: ShardedLRUCache(name, capacity, type, num_shards, total_element_count_capacity,
is_lru_k) {
for (int s = 0; s < _num_shards; s++) {
_shards[s]->set_cache_value_time_extractor(cache_value_time_extractor);
_shards[s]->set_cache_value_check_timestamp(cache_value_check_timestamp);
Expand Down
23 changes: 20 additions & 3 deletions be/src/olap/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ enum LRUCacheType {
static constexpr LRUCacheType DEFAULT_LRU_CACHE_TYPE = LRUCacheType::SIZE;
static constexpr uint32_t DEFAULT_LRU_CACHE_NUM_SHARDS = 32;
static constexpr size_t DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY = 0;
static constexpr bool DEFAULT_LRU_CACHE_IS_LRU_K = false;

class CacheKey {
public:
Expand Down Expand Up @@ -180,6 +181,10 @@ class Cache {
//
// When the inserted entry is no longer needed, the key and
// value will be passed to "deleter".
//
// if cache is lru k and cache is full, first insert of key will not succeed.
//
// Note: if is ShardedLRUCache, cache capacity = ShardedLRUCache_capacity / num_shards.
virtual Handle* insert(const CacheKey& key, void* value, size_t charge,
CachePriority priority = CachePriority::NORMAL) = 0;

Expand Down Expand Up @@ -326,9 +331,12 @@ using LRUHandleSortedSet = std::set<std::pair<int64_t, LRUHandle*>>;
// A single shard of sharded cache.
class LRUCache {
public:
LRUCache(LRUCacheType type);
LRUCache(LRUCacheType type, bool is_lru_k = DEFAULT_LRU_CACHE_IS_LRU_K);
~LRUCache();

using visits_lru_cache_key = uint32_t;
using visits_lru_cache_pair = std::pair<visits_lru_cache_key, size_t>;

// Separate from constructor so caller can easily make an array of LRUCache
PrunedInfo set_capacity(size_t capacity);
void set_element_count_capacity(uint32_t element_count_capacity) {
Expand Down Expand Up @@ -365,6 +373,7 @@ class LRUCache {
void _evict_from_lru_with_time(size_t total_size, LRUHandle** to_remove_head);
void _evict_one_entry(LRUHandle* e);
bool _check_element_count_limit();
bool _lru_k_insert_visits_list(size_t total_size, visits_lru_cache_key visits_key);

private:
LRUCacheType _type;
Expand Down Expand Up @@ -396,6 +405,13 @@ class LRUCache {
LRUHandleSortedSet _sorted_durable_entries_with_timestamp;

uint32_t _element_count_capacity = 0;

bool _is_lru_k = false; // LRU-K algorithm, K=2
std::list<visits_lru_cache_pair> _visits_lru_cache_list;
std::unordered_map<visits_lru_cache_key, std::list<visits_lru_cache_pair>::iterator>
_visits_lru_cache_map;
size_t _visits_lru_cache_usage = 0;
std::mutex _visits_lru_cache_mutex;
};

class ShardedLRUCache : public Cache {
Expand All @@ -420,11 +436,12 @@ class ShardedLRUCache : public Cache {
friend class LRUCachePolicy;

explicit ShardedLRUCache(const std::string& name, size_t capacity, LRUCacheType type,
uint32_t num_shards, uint32_t element_count_capacity);
uint32_t num_shards, uint32_t element_count_capacity, bool is_lru_k);
explicit ShardedLRUCache(const std::string& name, size_t capacity, LRUCacheType type,
uint32_t num_shards,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp, uint32_t element_count_capacity);
bool cache_value_check_timestamp, uint32_t element_count_capacity,
bool is_lru_k);

void update_cache_metrics() const;

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class StoragePageCache {
DataPageCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::DATA_PAGE_CACHE, capacity,
LRUCacheType::SIZE, config::data_page_cache_stale_sweep_time_sec,
num_shards) {}
num_shards, DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, true, true) {
}
};

class IndexPageCache : public LRUCachePolicy {
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/memory/lru_cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ class LRUCachePolicy : public CachePolicy {
LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type,
uint32_t stale_sweep_time_s, uint32_t num_shards = DEFAULT_LRU_CACHE_NUM_SHARDS,
uint32_t element_count_capacity = DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY,
bool enable_prune = true)
bool enable_prune = true, bool is_lru_k = DEFAULT_LRU_CACHE_IS_LRU_K)
: CachePolicy(type, capacity, stale_sweep_time_s, enable_prune),
_lru_cache_type(lru_cache_type) {
if (check_capacity(capacity, num_shards)) {
_cache = std::shared_ptr<ShardedLRUCache>(
new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards,
element_count_capacity));
element_count_capacity, is_lru_k));
} else {
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
Expand All @@ -54,14 +54,15 @@ class LRUCachePolicy : public CachePolicy {
uint32_t stale_sweep_time_s, uint32_t num_shards,
uint32_t element_count_capacity,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp, bool enable_prune = true)
bool cache_value_check_timestamp, bool enable_prune = true,
bool is_lru_k = DEFAULT_LRU_CACHE_IS_LRU_K)
: CachePolicy(type, capacity, stale_sweep_time_s, enable_prune),
_lru_cache_type(lru_cache_type) {
if (check_capacity(capacity, num_shards)) {
_cache = std::shared_ptr<ShardedLRUCache>(
new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards,
cache_value_time_extractor, cache_value_check_timestamp,
element_count_capacity));
element_count_capacity, is_lru_k));
} else {
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
Expand Down
Loading

0 comments on commit 75d0874

Please sign in to comment.