Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

a new dedicated workqueue only for async write commit, and block clients while committing changes #741

Open
wants to merge 1 commit into
base: async_flash
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ bool initializeStorageProvider(const char **err)
// Create The Storage Factory (if necessary)
serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)");
adjustOpenFilesLimit();
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue);
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue, &g_pserver->asyncwriteworkqueue);
#else
serverLog(LL_WARNING, "To use the flash storage provider please compile KeyDB with ENABLE_FLASH=yes");
serverLog(LL_WARNING, "Exiting due to the use of an unsupported storage provider");
Expand Down
17 changes: 17 additions & 0 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, in
}
incrRefCount(val);
if (signal) signalModifiedKey(c,db,key);

if(g_pserver->m_pstorageFactory != nullptr) {
if (!(c->flags & CLIENT_BLOCKED)) {
blockClient(c, BLOCKED_STORAGE);
}
serverTL->setclientsCommit.insert(c);
}
}

/* Common case for genericSetKey() where the TTL is not retained. */
Expand Down Expand Up @@ -3091,9 +3098,18 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback);
if (tok != nullptr)
{
for (client *c : serverTL->setclientsCommit)
{
/* Remove from the list of pending writes if needed. */
if (c->flags & CLIENT_PENDING_WRITE) {
c->flags &= ~CLIENT_PENDING_WRITE;
}
}
tok->setc = std::move(serverTL->setclientsCommit);
tok->db = this;
tok->type = StorageToken::TokenType::BatchWrite;
}
serverTL->setclientsCommit.clear();
}
}

Expand Down Expand Up @@ -3416,6 +3432,7 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)
blockClient(c, BLOCKED_STORAGE);
}
tok->setc = std::move(setcBlocked);
tok->type = StorageToken::TokenType::SingleRead;
tok->db = this;
}
return;
Expand Down
3 changes: 3 additions & 0 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4160,6 +4160,9 @@ void InitServerLast() {

g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10);

//Process one write/commit at a time to ensure consistency
g_pserver->asyncwriteworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(1);

// Allocate the repl backlog

}
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,7 @@ struct redisServerThreadVars {
int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */
std::unordered_set<client*> setclientsProcess;
std::unordered_set<client*> setclientsPrefetch;
std::unordered_set<client*> setclientsCommit;
std::unordered_set<StorageToken*> setStorageTokensProcess;
dictAsyncRehashCtl *rehashCtl = nullptr;

Expand Down Expand Up @@ -2705,6 +2706,7 @@ struct redisServer {
uint64_t mvcc_tstamp;

AsyncWorkQueue *asyncworkqueue;
AsyncWorkQueue *asyncwriteworkqueue;

/* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el
tok->tspdb = m_spdb;
m_spbatch = nullptr;
m_lock.unlock();
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{
(*m_pfactory->m_wworkqueue)->AddWorkFunction([this, el,callback,tok]{
tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch());
aePostFunction(el,callback,tok);
});
Expand Down
3 changes: 2 additions & 1 deletion src/storage/rocksdbfactor_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ class RocksDBStorageFactory : public IStorageFactory

public:
AsyncWorkQueue **m_wqueue;
AsyncWorkQueue **m_wworkqueue;

RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);
RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue);
~RocksDBStorageFactory();

virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
Expand Down
9 changes: 4 additions & 5 deletions src/storage/rocksdbfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ rocksdb::Options DefaultRocksDBOptions() {
return options;
}

IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
{
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue);
IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue)
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue, wworkqueue);
}

rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
Expand All @@ -52,8 +51,8 @@ rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
return options;
}

RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
: m_path(dbfile), m_wqueue(wqueue)
RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue)
: m_path(dbfile), m_wqueue(wqueue), m_wworkqueue(wworkqueue)
{
dbnum++; // create an extra db for metadata
// Get the count of column families in the actual database
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rocksdbfactory.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once

class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);
class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue, AsyncWorkQueue **wworkqueue);