Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commitChanges async api #727

Merged
merged 11 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"

struct StorageToken {
enum class TokenType {
SingleRead,
SingleWrite,
Delete,
BatchWrite,
};
TokenType type;
std::unordered_set<struct client *> setc;
struct redisDbPersistentData *db;
virtual ~StorageToken() {}
Expand Down Expand Up @@ -46,6 +53,9 @@ class IStorage
virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;};
virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {};

virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *, aePostFunctionTokenProc*) {} // NOP
virtual void complete_endWriteBatch(StorageToken * /*tok*/) {};

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
Expand Down
3 changes: 2 additions & 1 deletion src/StorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,5 @@ void StorageCache::emergencyFreeCache() {
dictRelease(d);
});
}
}
}

2 changes: 2 additions & 0 deletions src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class StorageCache
void retrieve(sds key, IStorage::callbackSingle fn) const;
StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey);
void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn);
StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {return m_spstorage->begin_endWriteBatch(el,proc);} // NOP
void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);};
bool erase(sds key);
void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; }
Expand Down
37 changes: 31 additions & 6 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3087,7 +3087,14 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
m_pdbSnapshotStorageFlush = nullptr;
}
if (m_spstorage != nullptr)
m_spstorage->endWriteBatch();
{
auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback);
if (tok != nullptr)
{
tok->db = this;
tok->type = StorageToken::TokenType::BatchWrite;
}
}
}

redisDbPersistentData::~redisDbPersistentData()
Expand Down Expand Up @@ -3420,7 +3427,13 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)

void redisDbPersistentData::processStorageToken(StorageToken *tok) {
auto setc = std::move(tok->setc);
tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) {
switch (tok->type)
{

case StorageToken::TokenType::SingleRead:
{
tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb)
{
auto *db = tok->db;
size_t offset = 0;
sds key = sdsnewlen(szKey, -((ssize_t)cbKey));
Expand Down Expand Up @@ -3451,11 +3464,23 @@ void redisDbPersistentData::processStorageToken(StorageToken *tok) {
serverAssert(db->m_setexpire->find(key) != db->m_setexpire->end());
}
serverAssert(o->FExpires() == (db->m_setexpire->find(key) != db->m_setexpire->end()));
}
});
tok = nullptr; // Invalid past this point
} });
break;
}
case StorageToken::TokenType::BatchWrite:
{
tok->db->m_spstorage->complete_endWriteBatch(tok);
break;
}
default:
serverAssert((tok->type == StorageToken::TokenType::SingleRead) || (tok->type == StorageToken::TokenType::BatchWrite));
break;
AnkitaSuman07 marked this conversation as resolved.
Show resolved Hide resolved
} //switch end

for (client *c : setc) {
tok = nullptr; // Invalid past this point

for (client *c : setc)
{
std::unique_lock<fastlock> ul(c->lock);
if (c->flags & CLIENT_BLOCKED)
unblockClient(c);
Expand Down
27 changes: 26 additions & 1 deletion src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,31 @@ void RocksDBStorageProvider::endWriteBatch()
m_lock.unlock();
}

struct BatchStorageToken : public StorageToken {
std::shared_ptr<rocksdb::DB> tspdb; // Note: This must be first so it is deleted last
std::unique_ptr<rocksdb::WriteBatchWithIndex> tspbatch;
};

StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback){
BatchStorageToken *tok = new BatchStorageToken();
tok->tspbatch = std::move(m_spbatch);
tok->tspdb = m_spdb;
m_spbatch = nullptr;
m_lock.unlock();
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{
tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch());
aePostFunction(el,callback,tok);
});

return tok;
}

void RocksDBStorageProvider::complete_endWriteBatch(StorageToken* tok){
delete tok;
tok = nullptr;
}


void RocksDBStorageProvider::batch_lock()
{
m_lock.lock();
Expand Down Expand Up @@ -330,4 +355,4 @@ void RocksDBStorageProvider::complete_retrieve(StorageToken *tok, callbackSingle
}
}
delete rtok;
}
}
2 changes: 2 additions & 0 deletions src/storage/rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class RocksDBStorageProvider : public IStorage

virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;
virtual StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc);
virtual void complete_endWriteBatch(StorageToken *tok);

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;

Expand Down
Loading