Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Feb 20, 2024
1 parent 94c9f37 commit 849b910
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 104 deletions.
31 changes: 22 additions & 9 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "gutil/bits.h"
#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
#include "util/time.h"

using std::string;
using std::stringstream;
Expand Down Expand Up @@ -173,7 +174,7 @@ LRUCache::LRUCache(LRUCacheType type) : _type(type) {
}

LRUCache::~LRUCache() {
prune();
prune(nullptr);
}

bool LRUCache::_unref(LRUHandle* e) {
Expand Down Expand Up @@ -243,6 +244,7 @@ Cache::Handle* LRUCache::lookup(const CacheKey& key, uint32_t hash) {
}
e->refs++;
++_hit_count;
e->last_visit_time = UnixMillis();
}
return reinterpret_cast<Cache::Handle*>(e);
}
Expand Down Expand Up @@ -371,6 +373,7 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
e->mem_tracker = tracker;
e->type = _type;
memcpy(e->key_data, key.data(), key.size());
e->last_visit_time = UnixMillis();
// The memory of the parameter value should be recorded in the tls mem tracker,
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
THREAD_MEM_TRACKER_TRANSFER_TO(e->bytes, tracker);
Expand Down Expand Up @@ -440,7 +443,7 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash) {
}
}

int64_t LRUCache::prune() {
int64_t LRUCache::prune(int64_t* freed_size) {
LRUHandle* to_remove_head = nullptr;
{
std::lock_guard l(_mutex);
Expand All @@ -458,23 +461,28 @@ int64_t LRUCache::prune() {
}
}
int64_t pruned_count = 0;
int64_t size = 0;
while (to_remove_head != nullptr) {
++pruned_count;
size += to_remove_head->bytes;
LRUHandle* next = to_remove_head->next;
to_remove_head->free();
to_remove_head = next;
}
if (freed_size) {
*freed_size = size;
}
return pruned_count;
}

int64_t LRUCache::prune_if(CacheValuePredicate pred, bool lazy_mode) {
int64_t LRUCache::prune_if(CachePrunePredicate pred, int64_t* freed_size, bool lazy_mode) {
LRUHandle* to_remove_head = nullptr;
{
std::lock_guard l(_mutex);
LRUHandle* p = _lru_normal.next;
while (p != &_lru_normal) {
LRUHandle* next = p->next;
if (pred(p->value)) {
if (pred(p)) {
_evict_one_entry(p);
p->next = to_remove_head;
to_remove_head = p;
Expand All @@ -487,7 +495,7 @@ int64_t LRUCache::prune_if(CacheValuePredicate pred, bool lazy_mode) {
p = _lru_durable.next;
while (p != &_lru_durable) {
LRUHandle* next = p->next;
if (pred(p->value)) {
if (pred(p)) {
_evict_one_entry(p);
p->next = to_remove_head;
to_remove_head = p;
Expand All @@ -498,12 +506,17 @@ int64_t LRUCache::prune_if(CacheValuePredicate pred, bool lazy_mode) {
}
}
int64_t pruned_count = 0;
int64_t size = 0;
while (to_remove_head != nullptr) {
++pruned_count;
size += to_remove_head->bytes;
LRUHandle* next = to_remove_head->next;
to_remove_head->free();
to_remove_head = next;
}
if (freed_size) {
*freed_size = size;
}
return pruned_count;
}

Expand Down Expand Up @@ -622,18 +635,18 @@ uint64_t ShardedLRUCache::new_id() {
return _last_id.fetch_add(1, std::memory_order_relaxed);
}

int64_t ShardedLRUCache::prune() {
int64_t ShardedLRUCache::prune(int64_t* freed_size) {
int64_t num_prune = 0;
for (int s = 0; s < _num_shards; s++) {
num_prune += _shards[s]->prune();
num_prune += _shards[s]->prune(freed_size);
}
return num_prune;
}

int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred, bool lazy_mode) {
int64_t ShardedLRUCache::prune_if(CachePrunePredicate pred, int64_t* freed_size, bool lazy_mode) {
int64_t num_prune = 0;
for (int s = 0; s < _num_shards; s++) {
num_prune += _shards[s]->prune_if(pred, lazy_mode);
num_prune += _shards[s]->prune_if(pred, freed_size, lazy_mode);
}
return num_prune;
}
Expand Down
41 changes: 27 additions & 14 deletions be/src/olap/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace doris {

class Cache;
class LRUCachePolicy;
struct LRUHandle;

enum LRUCacheType {
SIZE, // The capacity of cache is based on the memory size of cache entry, memory size = handle size + charge.
Expand Down Expand Up @@ -150,7 +151,7 @@ class CacheKey {
// The entry with smaller CachePriority will evict firstly
enum class CachePriority { NORMAL = 0, DURABLE = 1 };

using CacheValuePredicate = std::function<bool(const void*)>;
using CachePrunePredicate = std::function<bool(const LRUHandle*)>;
// CacheValueTimeExtractor can extract timestamp
// in cache value through the specified function,
// such as last_visit_time in InvertedIndexSearcherCache::CacheValue
Expand Down Expand Up @@ -219,12 +220,15 @@ class Cache {
// encouraged to override the default implementation. A future release of
// leveldb may change prune() to a pure abstract method.
// return num of entries being pruned.
virtual int64_t prune() { return 0; }
virtual int64_t prune(int64_t* freed_size = nullptr) { return 0; }

// Same as prune(), but the entry will only be pruned if the predicate matched.
// NOTICE: the predicate should be simple enough, or the prune_if() function
// may hold lock for a long time to execute predicate.
virtual int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) { return 0; }
virtual int64_t prune_if(CachePrunePredicate pred, int64_t* freed_size = nullptr,
bool lazy_mode = false) {
return 0;
}

virtual int64_t mem_consumption() = 0;

Expand All @@ -246,15 +250,19 @@ struct LRUHandle {
struct LRUHandle* prev = nullptr; // previous entry in lru list
size_t charge;
size_t key_length;
size_t total_size; // including key length
size_t bytes; // Used by LRUCacheType::NUMBER, LRUCacheType::SIZE equal to total_size.
bool in_cache; // Whether entry is in the cache.
size_t total_size; // Entry charge, used to limit cache capacity, LRUCacheType::SIZE including key length.
size_t bytes; // Used by LRUCacheType::NUMBER, LRUCacheType::SIZE equal to total_size.
bool in_cache; // Whether entry is in the cache.
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
CachePriority priority = CachePriority::NORMAL;
MemTrackerLimiter* mem_tracker;
LRUCacheType type;
// Save the last visit time of this cache entry.
// Use atomic because it may be modified by multi threads.
int64_t last_visit_time;
char key_data[1]; // Beginning of key
// Note! key_data must be at the end.

CacheKey key() const {
// For cheaper lookups, we allow a temporary Handle object
Expand Down Expand Up @@ -345,8 +353,9 @@ class LRUCache {
Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
void release(Cache::Handle* handle);
void erase(const CacheKey& key, uint32_t hash);
int64_t prune();
int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false);
int64_t prune(int64_t* freed_size = nullptr);
int64_t prune_if(CachePrunePredicate pred, int64_t* freed_size = nullptr,
bool lazy_mode = false);

void set_cache_value_time_extractor(CacheValueTimeExtractor cache_value_time_extractor);
void set_cache_value_check_timestamp(bool cache_value_check_timestamp);
Expand Down Expand Up @@ -384,8 +393,8 @@ class LRUCache {

HandleTable _table;

uint64_t _lookup_count = 0; // cache查找总次数
uint64_t _hit_count = 0; // 命中cache的总次数
uint64_t _lookup_count = 0; // number of cache lookups
uint64_t _hit_count = 0; // number of cache hits

CacheValueTimeExtractor _cache_value_time_extractor;
bool _cache_value_check_timestamp = false;
Expand All @@ -408,8 +417,9 @@ class ShardedLRUCache : public Cache {
virtual void* value(Handle* handle) override;
Slice value_slice(Handle* handle) override;
virtual uint64_t new_id() override;
virtual int64_t prune() override;
int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) override;
virtual int64_t prune(int64_t* freed_size = nullptr) override;
int64_t prune_if(CachePrunePredicate pred, int64_t* freed_size = nullptr,
bool lazy_mode = false) override;
int64_t mem_consumption() override;
int64_t get_usage() override;
size_t get_total_capacity() override { return _total_capacity; };
Expand Down Expand Up @@ -479,8 +489,11 @@ class DummyLRUCache : public Cache {
void* value(Handle* handle) override;
Slice value_slice(Handle* handle) override;
uint64_t new_id() override { return 0; };
int64_t prune() override { return 0; };
int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) override { return 0; };
int64_t prune(int64_t* freed_size = nullptr) override { return 0; };
int64_t prune_if(CachePrunePredicate pred, int64_t* freed_size = nullptr,
bool lazy_mode = false) override {
return 0;
};
int64_t mem_consumption() override { return 0; };
int64_t get_usage() override { return 0; };
size_t get_total_capacity() override { return 0; };
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle,
return false;
}
*handle = PageCacheHandle(cache, lru_handle);
handle->update_last_visit_time();
return true;
}

Expand All @@ -79,7 +78,6 @@ void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHand
auto cache = _get_page_cache(page_type);
auto lru_handle = cache->insert(key.encode(), data, data->capacity(), deleter, priority);
*handle = PageCacheHandle(cache, lru_handle);
handle->update_last_visit_time();
}

} // namespace doris
7 changes: 1 addition & 6 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace doris {
class PageCacheHandle;

template <typename TAllocator>
class PageBase : private TAllocator, public LRUCacheValueBase {
class PageBase : private TAllocator {
public:
PageBase() : _data(nullptr), _size(0), _capacity(0) {}

Expand Down Expand Up @@ -218,11 +218,6 @@ class PageCacheHandle {
return Slice(cache_value->data(), cache_value->size());
}

void update_last_visit_time() {
DataPage* cache_value = (DataPage*)_cache->value(_handle);
cache_value->last_visit_time = UnixMillis();
}

private:
Cache* _cache = nullptr;
Cache::Handle* _handle = nullptr;
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin

std::unique_ptr<InvertedIndexQueryCache::CacheValue> cache_value_ptr =
std::make_unique<InvertedIndexQueryCache::CacheValue>();
cache_value_ptr->last_visit_time = UnixMillis();
cache_value_ptr->bitmap = bitmap;
cache_value_ptr->size = bitmap->getSizeInBytes();
if (key.encode().empty()) {
return;
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ class InvertedIndexSearcherCache {

// The cache value of index_searcher lru cache.
// Holding an opened index_searcher.
struct CacheValue : public LRUCacheValueBase {
struct CacheValue {
IndexSearcherPtr index_searcher;
size_t size = 0;
int64_t last_visit_time;

CacheValue() = default;
explicit CacheValue(IndexSearcherPtr searcher, size_t mem_size, int64_t visit_time)
Expand Down Expand Up @@ -230,7 +232,7 @@ class InvertedIndexQueryCache : public LRUCachePolicy {
}
};

struct CacheValue : public LRUCacheValueBase {
struct CacheValue {
std::shared_ptr<roaring::Roaring> bitmap;
};

Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class SchemaCache : public LRUCachePolicy {
if (lru_handle) {
Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); });
auto value = (CacheValue*)cache()->value(lru_handle);
value->last_visit_time = UnixMillis();
VLOG_DEBUG << "use cache schema";
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
return value->tablet_schema;
Expand All @@ -88,7 +87,6 @@ class SchemaCache : public LRUCachePolicy {
return;
}
CacheValue* value = new CacheValue;
value->last_visit_time = UnixMillis();
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
value->type = Type::TABLET_SCHEMA;
value->tablet_schema = schema;
Expand All @@ -108,7 +106,7 @@ class SchemaCache : public LRUCachePolicy {
// Try to prune the cache if expired.
Status prune();

struct CacheValue : public LRUCacheValueBase {
struct CacheValue {
Type type;
// either tablet_schema or schema
TabletSchemaSPtr tablet_schema = nullptr;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/segment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SegmentCache : public LRUCachePolicy {

// The cache value of segment lru cache.
// Holding all opened segments of a rowset.
struct CacheValue : public LRUCacheValueBase {
struct CacheValue {
segment_v2::SegmentSharedPtr segment;
};

Expand Down Expand Up @@ -133,7 +133,6 @@ class SegmentCacheHandle {

void push_segment(Cache* cache, Cache::Handle* handle) {
segments.push_back(((SegmentCache::CacheValue*)cache->value(handle))->segment);
((SegmentCache::CacheValue*)cache->value(handle))->last_visit_time = UnixMillis();
cache->release(handle);
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,6 @@ int CreateTabletIdxCache::get_index(const std::string& key) {
if (lru_handle) {
Defer release([cache = cache(), lru_handle] { cache->release(lru_handle); });
auto value = (CacheValue*)cache()->value(lru_handle);
value->last_visit_time = UnixMillis();
VLOG_DEBUG << "use create tablet idx cache key=" << key << " value=" << value->idx;
return value->idx;
}
Expand All @@ -1477,7 +1476,6 @@ int CreateTabletIdxCache::get_index(const std::string& key) {
void CreateTabletIdxCache::set_index(const std::string& key, int next_idx) {
assert(next_idx >= 0);
CacheValue* value = new CacheValue;
value->last_visit_time = UnixMillis();
value->idx = next_idx;
auto deleter = [](const doris::CacheKey& key, void* value) {
CacheValue* cache_value = (CacheValue*)value;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ class CreateTabletIdxCache : public LRUCachePolicy {

void set_index(const std::string& key, int next_idx);

struct CacheValue : public LRUCacheValueBase {
struct CacheValue {
int idx = 0;
};

Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/tablet_schema_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std:
TabletSchemaSPtr tablet_schema_ptr;
if (lru_handle) {
auto* value = (CacheValue*)cache()->value(lru_handle);
value->last_visit_time = UnixMillis();
tablet_schema_ptr = value->tablet_schema;
} else {
auto* value = new CacheValue;
value->last_visit_time = UnixMillis();
tablet_schema_ptr = std::make_shared<TabletSchema>();
TabletSchemaPB pb;
pb.ParseFromString(key);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_schema_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TabletSchemaCache : public LRUCachePolicy {
void release(Cache::Handle*);

private:
struct CacheValue : public LRUCacheValueBase {
struct CacheValue {
TabletSchemaSPtr tablet_schema;
};
};
Expand Down
Loading

0 comments on commit 849b910

Please sign in to comment.