From 8b242444476098caa76b6985261c1b5a3aefd5e1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 22 Feb 2023 18:34:52 +0000 Subject: [PATCH 1/3] Use bulk insert when updating the storage provider --- src/IStorage.h | 5 ++-- src/StorageCache.cpp | 4 ++-- src/StorageCache.h | 2 +- src/db.cpp | 52 ++++++++++++++++++++++++++++++++++------- src/server.h | 2 +- src/storage/rocksdb.cpp | 12 ++++++++-- src/storage/rocksdb.h | 2 +- 7 files changed, 61 insertions(+), 18 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index ad956beb6..e53078fdb 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -33,10 +33,11 @@ class IStorage virtual bool enumerate(callback fn) const = 0; virtual size_t count() const = 0; - virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { - insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false); + bool fOverwrite = (rgfOverwrite != nullptr) ? rgfOverwrite[ielem] : false; + insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], fOverwrite); } endWriteBatch(); } diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 9b89023aa..f627cd332 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -114,7 +114,7 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr } long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing); -void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) +void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) { std::vector vechashes; if (m_pdict != nullptr) { @@ -152,7 +152,7 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si } ul.unlock(); - m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem); + m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, rgfOverwrite, celem); bulkInsertsInProgress--; } diff --git a/src/StorageCache.h b/src/StorageCache.h index 3c38450fb..6682e969c 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -41,7 +41,7 @@ class StorageCache void clear(void(callback)(void*)); void clearAsync(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); - void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem); + void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); void emergencyFreeCache(); diff --git a/src/db.cpp b/src/db.cpp index e0f4c862c..8c377b63f 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2905,15 +2905,13 @@ void redisDbPersistentData::storeDatabase() dictReleaseIterator(di); } -/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate) +/* static */ sds redisDbPersistentData::serializeChange(redisDbPersistentData *db, const char *key) { auto itr = db->find_cached_threadsafe(key); if (itr == nullptr) - return; + return nullptr; robj *o = itr.val(); - sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); - storage->insert((sds)key, temp, sdslen(temp), fUpdate); - sdsfree(temp); + return serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); } bool redisDbPersistentData::processChanges(bool fSnapshot) @@ -2956,10 +2954,28 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) { dictIterator *di = dictGetIterator(m_dictChanged); dictEntry *de; + std::vector veckeys; + std::vector veccbkeys; + std::vector vecvals; + std::vector veccbvals; + std::vector vecoverwrite; + veckeys.reserve(dictSize(m_dictChanged)); + veccbkeys.reserve(dictSize(m_dictChanged)); + vecvals.reserve(dictSize(m_dictChanged)); + veccbvals.reserve(dictSize(m_dictChanged)); + vecoverwrite.reserve(dictSize(m_dictChanged)); while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), this, (const char*)dictGetKey(de), (bool)dictGetVal(de)); + sds val = serializeChange(this, (const char*)dictGetKey(de)); + if (val != nullptr) { + veckeys.push_back((char*)dictGetKey(de)); + veccbkeys.push_back(sdslen((sds)dictGetKey(de))); + vecvals.push_back(val); + veccbvals.push_back(sdslen(val)); + vecoverwrite.push_back((bool)dictGetVal(de)); + } } + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size()); dictReleaseIterator(di); } } @@ -2993,7 +3009,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) vecvals.push_back(temp); veccbvals.push_back(sdslen(temp)); } - m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size()); + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), nullptr, veckeys.size()); for (auto val : vecvals) sdsfree(val); dictReleaseIterator(di); @@ -3004,7 +3020,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) { - m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem); + m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, nullptr, celem); } void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) @@ -3013,10 +3029,28 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** { dictIterator *di = dictGetIterator(m_dictChangedStorageFlush); dictEntry *de; + std::vector veckeys; + std::vector veccbkeys; + std::vector vecvals; + std::vector veccbvals; + std::vector vecoverwrite; + veckeys.reserve(dictSize(m_dictChanged)); + veccbkeys.reserve(dictSize(m_dictChanged)); + vecvals.reserve(dictSize(m_dictChanged)); + veccbvals.reserve(dictSize(m_dictChanged)); + vecoverwrite.resize(dictSize(m_dictChanged)); while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de), (bool)dictGetVal(de)); + sds val = serializeChange((redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de)); + if (val != nullptr) { + veckeys.push_back((char*)dictGetKey(de)); + veccbkeys.push_back(sdslen((sds)dictGetKey(de))); + vecvals.push_back(val); + veccbvals.push_back(sdslen(val)); + vecoverwrite.push_back((bool)dictGetVal(de)); + } } + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size()); dictReleaseIterator(di); dictRelease(m_dictChangedStorageFlush); m_dictChangedStorageFlush = nullptr; diff --git a/src/server.h b/src/server.h index f052d0595..b89ea01de 100644 --- a/src/server.h +++ b/src/server.h @@ -1217,7 +1217,7 @@ class redisDbPersistentData uint64_t m_mvccCheckpoint = 0; private: - static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate); + static sds serializeChange(redisDbPersistentData *db, const char *key); void ensure(const char *key); void ensure(const char *key, dictEntry **de); diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index e592c74c5..230825eb7 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -43,8 +43,9 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, ++m_count; } -void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) +void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) { + size_t coverwrites = 0; if (celem >= 16384) { rocksdb::Options options = DefaultRocksDBOptions(); rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), options, options.comparator); @@ -92,8 +93,15 @@ void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char ** m_spdb->Write(WriteOptions(), spbatch.get()); } + if (rgfOverwrite != nullptr) { + for (size_t ielem = 0; ielem < celem; ++ielem) { + if (rgfOverwrite[ielem]) + ++coverwrites; + } + } + std::unique_lock l(m_lock); - m_count += celem; + m_count += celem - coverwrites; } bool RocksDBStorageProvider::erase(const char *key, size_t cchKey) diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 10c54606c..f0c20dee8 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -38,7 +38,7 @@ class RocksDBStorageProvider : public IStorage virtual void beginWriteBatch() override; virtual void endWriteBatch() override; - virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override; + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) override; virtual void batch_lock() override; virtual void batch_unlock() override; From 0d64f32ca1840bb41b71f5bc469903e14de47f15 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 22 Feb 2023 18:36:33 +0000 Subject: [PATCH 2/3] Fix memory leak --- src/db.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index 8c377b63f..7f3db7a21 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2976,6 +2976,8 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) } } m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size()); + for (auto val : vecvals) + sdsfree(val); dictReleaseIterator(di); } } @@ -3051,6 +3053,8 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** } } m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size()); + for (auto val : vecvals) + sdsfree(val); dictReleaseIterator(di); dictRelease(m_dictChangedStorageFlush); m_dictChangedStorageFlush = nullptr; From 25c89a631cf43c794f57675452bae7749c123f3d Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 8 Mar 2023 18:47:46 +0000 Subject: [PATCH 3/3] Fix assert due to incorrect count --- src/StorageCache.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index f627cd332..9c1dab937 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -121,6 +121,7 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si vechashes.reserve(celem); for (size_t ielem = 0; ielem < celem; ++ielem) { + if (rgfOverwrite != nullptr && rgfOverwrite[ielem]) continue; dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry)); de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]); de->v.u64 = 1;