Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commitChanges async api #727

Merged
merged 11 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"

struct StorageToken {
enum class TokenType {
SingleRead,
SingleWrite,
Delete,
BatchWrite,
};
TokenType type;
std::unordered_set<struct client *> setc;
struct redisDbPersistentData *db;
virtual ~StorageToken() {}
Expand Down Expand Up @@ -46,6 +53,9 @@ class IStorage
virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;};
virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {};

virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *, aePostFunctionTokenProc*) {} // NOP
virtual void complete_endWriteBatch(StorageToken * /*tok*/) {};

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
Expand Down
3 changes: 2 additions & 1 deletion src/StorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,5 @@ void StorageCache::emergencyFreeCache() {
dictRelease(d);
});
}
}
}

2 changes: 2 additions & 0 deletions src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class StorageCache
void retrieve(sds key, IStorage::callbackSingle fn) const;
StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey);
void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn);
StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {return m_spstorage->begin_endWriteBatch(el,proc);} // NOP
void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);};
bool erase(sds key);
void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; }
Expand Down
49 changes: 43 additions & 6 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,8 @@ void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbK

void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{

std::unordered_set<client *> setcBlocked;
if (m_pdbSnapshotStorageFlush)
{
dictIterator *di = dictGetIterator(m_dictChangedStorageFlush);
Expand All @@ -3086,8 +3088,22 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
*psnapshotFree = m_pdbSnapshotStorageFlush;
m_pdbSnapshotStorageFlush = nullptr;
}

if (m_spstorage != nullptr)
m_spstorage->endWriteBatch();
{
auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback);
if (tok != nullptr)
{
for (client *c : setcBlocked) //need to check how to push client to blocked list
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this code doesn't work lets remove it.

{
if (!(c->flags & CLIENT_BLOCKED))
blockClient(c, BLOCKED_STORAGE);
}
// tok->setc = std::move(setcBlocked);
AnkitaSuman07 marked this conversation as resolved.
Show resolved Hide resolved
tok->db = this;
tok->type = StorageToken::TokenType::BatchWrite;
}
}
}

redisDbPersistentData::~redisDbPersistentData()
Expand Down Expand Up @@ -3420,7 +3436,13 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)

void redisDbPersistentData::processStorageToken(StorageToken *tok) {
auto setc = std::move(tok->setc);
tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) {
switch (tok->type)
{

case StorageToken::TokenType::SingleRead:
{
tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb)
{
auto *db = tok->db;
size_t offset = 0;
sds key = sdsnewlen(szKey, -((ssize_t)cbKey));
Expand Down Expand Up @@ -3451,11 +3473,26 @@ void redisDbPersistentData::processStorageToken(StorageToken *tok) {
serverAssert(db->m_setexpire->find(key) != db->m_setexpire->end());
}
serverAssert(o->FExpires() == (db->m_setexpire->find(key) != db->m_setexpire->end()));
}
});
tok = nullptr; // Invalid past this point
} });
break;
}
case StorageToken::TokenType::BatchWrite:
{
tok->db->m_spstorage->complete_endWriteBatch(tok);
break;
}
case StorageToken::TokenType::SingleWrite:
{
break;
AnkitaSuman07 marked this conversation as resolved.
Show resolved Hide resolved
}
default:
break;
AnkitaSuman07 marked this conversation as resolved.
Show resolved Hide resolved
} //switch end

for (client *c : setc) {
tok = nullptr; // Invalid past this point

for (client *c : setc)
{
std::unique_lock<fastlock> ul(c->lock);
if (c->flags & CLIENT_BLOCKED)
unblockClient(c);
Expand Down
33 changes: 33 additions & 0 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,39 @@ void RocksDBStorageProvider::endWriteBatch()
m_lock.unlock();
}

struct BatchStorageToken : public StorageToken {
std::shared_ptr<rocksdb::DB> tspdb; // Note: This must be first so it is deleted last
rocksdb::WriteBatch* tspbatch;
AnkitaSuman07 marked this conversation as resolved.
Show resolved Hide resolved
~BatchStorageToken(){
tspdb.reset();
AnkitaSuman07 marked this conversation as resolved.
Show resolved Hide resolved
tspdb = nullptr;
tspbatch = nullptr;
}
};

StorageToken* RocksEncoderStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback)
{
// serverLog(LL_WARNING, "RocksEncoderStorageProvider::begin_endWriteBatch");
BatchStorageToken *tok = new BatchStorageToken();
tok->tspbatch = m_spbatch.get();
tok->tspdb = m_spdb;
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{
// serverAssert(db);
serverAssert(tok->tspdb);
tok->tspdb->Write(WriteOptions(),tok->tspbatch);
aePostFunction(el,callback,tok);
});
return tok;
}

void RocksEncoderStorageProvider::complete_endWriteBatch(StorageToken* tok){
// serverLog(LL_WARNING, "RocksEncoderStorageProvider::complete_endWriteBatch");
m_lock.unlock();
delete tok;
tok = nullptr;
}


void RocksDBStorageProvider::batch_lock()
{
m_lock.lock();
Expand Down
2 changes: 2 additions & 0 deletions src/storage/rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class RocksDBStorageProvider : public IStorage

virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;
virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc);
virtual void complete_endWriteBatch(StorageToken *tok);

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;

Expand Down