diff --git a/engines/ep/CMakeLists.txt b/engines/ep/CMakeLists.txt index cd67184418..a714ef1784 100644 --- a/engines/ep/CMakeLists.txt +++ b/engines/ep/CMakeLists.txt @@ -10,9 +10,12 @@ INCLUDE(CheckTypeSize) INCLUDE(CMakeDependentOption) INCLUDE(CTest) +OPTION(EP_USE_PLASMA "Enable support for Plasma" ON) + CMAKE_DEPENDENT_OPTION(EP_USE_ROCKSDB "Enable support for RocksDB" ON "ROCKSDB_INCLUDE_DIR;ROCKSDB_LIBRARIES" OFF) + INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/src @@ -48,6 +51,19 @@ IF (EP_USE_ROCKSDB) MESSAGE(STATUS "ep-engine: Using RocksDB") ENDIF (EP_USE_ROCKSDB) +IF (EP_USE_PLASMA) + INCLUDE_DIRECTORIES(AFTER ${PLASMA_INCLUDE_DIR}) + SET(PLASMA_KVSTORE_SOURCE src/plasma-kvstore/plasma-wrapper.c + src/plasma-kvstore/plasma-kvstore.cc + src/plasma-kvstore/plasma-kvstore_config.cc) + LIST(APPEND EP_STORAGE_LIBS plasma-core) + ADD_DEFINITIONS(-DEP_USE_PLASMA=1) + MESSAGE(STATUS "ep-engine: Using Plasma libs at " ${EP_STORAGE_LIBS}) +ELSE (EP_USE_PLASMA) + MESSAGE(STATUS "ep-engine: NOT using plasma!!!" ${EP_USE_PLASMA} + " dir=" ${PLASMA_CORE_LIB} "include=" ${PLASMA_INCLUDE_DIR}) +ENDIF (EP_USE_PLASMA) + INCLUDE_DIRECTORIES(AFTER SYSTEM ${gtest_SOURCE_DIR}/include ${gmock_SOURCE_DIR}/include) @@ -248,6 +264,7 @@ ADD_LIBRARY(ep_objs OBJECT ${CONFIG_SOURCE} ${COUCH_KVSTORE_SOURCE} ${ROCKSDB_KVSTORE_SOURCE} + ${PLASMA_KVSTORE_SOURCE} ${COLLECTIONS_SOURCE}) SET_PROPERTY(TARGET ep_objs PROPERTY POSITION_INDEPENDENT_CODE 1) add_sanitizers(ep_objs) @@ -257,7 +274,7 @@ ADD_LIBRARY(ep SHARED $) SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "") TARGET_LINK_LIBRARIES(ep cJSON JSON_checker ${EP_STORAGE_LIBS} engine_utilities dirutils cbcompress hdr_histogram_static - mcd_util platform phosphor xattr mcd_tracing + mcd_util platform phosphor xattr mcd_tracing sigar ${LIBEVENT_LIBRARIES}) add_sanitizers(ep) @@ -334,7 +351,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS) TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests ${EP_STORAGE_LIBS} cJSON dirutils engine_utilities gtest gmock hdr_histogram_static JSON_checker memcached_logger mcd_util mcd_tracing platform - phosphor xattr cbcompress ${MALLOC_LIBRARIES}) + phosphor sigar xattr cbcompress ${MALLOC_LIBRARIES}) add_sanitizers(ep-engine_ep_unit_tests) ADD_EXECUTABLE(ep-engine_atomic_ptr_test @@ -396,6 +413,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS) mcd_tracing memcached_logger phosphor + sigar platform xattr ${MALLOC_LIBRARIES} @@ -420,7 +438,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS) $) TARGET_LINK_LIBRARIES(ep-engine_sizes cJSON JSON_checker hdr_histogram_static engine_utilities ${EP_STORAGE_LIBS} dirutils cbcompress platform mcd_util - mcd_tracing phosphor xattr ${LIBEVENT_LIBRARIES}) + mcd_tracing phosphor sigar xattr ${LIBEVENT_LIBRARIES}) add_sanitizers(ep-engine_sizes) ADD_LIBRARY(ep_testsuite SHARED @@ -433,6 +451,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS) TARGET_LINK_LIBRARIES(ep_testsuite engine_utilities mcd_util ${EP_STORAGE_LIBS} dirutils JSON_checker platform xattr + sigar ${LIBEVENT_LIBRARIES}) ADD_DEPENDENCIES(ep_testsuite engine_testapp) @@ -445,6 +464,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS) ) SET_TARGET_PROPERTIES(ep_testsuite_basic PROPERTIES PREFIX "") TARGET_LINK_LIBRARIES(ep_testsuite_basic engine_utilities JSON_checker dirutils + sigar platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES}) ADD_DEPENDENCIES(ep_testsuite engine_testapp) @@ -456,7 +476,9 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS) tests/mock/mock_dcp.cc ) SET_TARGET_PROPERTIES(ep_testsuite_dcp PROPERTIES PREFIX "") - TARGET_LINK_LIBRARIES(ep_testsuite_dcp engine_utilities cbcompress JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES}) + TARGET_LINK_LIBRARIES(ep_testsuite_dcp engine_utilities cbcompress JSON_checker dirutils platform + sigar + ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES}) ADD_DEPENDENCIES(ep_testsuite_dcp engine_testapp) ADD_LIBRARY(ep_testsuite_checkpoint SHARED @@ -465,7 +487,9 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS) tests/ep_test_apis.cc src/ext_meta_parser.cc) SET_TARGET_PROPERTIES(ep_testsuite_checkpoint PROPERTIES PREFIX "") - TARGET_LINK_LIBRARIES(ep_testsuite_checkpoint engine_utilities JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES}) + TARGET_LINK_LIBRARIES(ep_testsuite_checkpoint engine_utilities JSON_checker dirutils + sigar + platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES}) ADD_DEPENDENCIES(ep_testsuite_checkpoint engine_testapp) ADD_LIBRARY(ep_testsuite_xdcr SHARED diff --git a/engines/ep/configuration.json b/engines/ep/configuration.json index 20a1ed45f0..3559f0e85d 100644 --- a/engines/ep/configuration.json +++ b/engines/ep/configuration.json @@ -84,13 +84,14 @@ } }, "backend": { - "default": "couchdb", + "default": "plasma", "dynamic": false, "type": "std::string", "validator": { "enum": [ "couchdb", - "rocksdb" + "rocksdb", + "plasma" ] } }, @@ -547,7 +548,7 @@ "type": "size_t" }, "max_num_shards": { - "default": "4", + "default": "16", "descr": "Maximum number of shards", "dynamic": false, "type": "size_t" @@ -962,6 +963,56 @@ "descr": "RocksDB Universal-Compaction 'max_size_amplification_percent' option. The default value is the RocksDB internal default (200).", "type": "size_t" }, + "plasma_mem_quota": { + "default": "2048", + "descr": "Plasma memory quota (in MB).", + "type": "size_t" + }, + "plasma_enable_directio": { + "default": "true", + "descr": "Bypass OS page cache and do direct I/O.", + "type": "bool" + }, + "plasma_kv_separation": { + "default": "true", + "descr": "Separate key and value blobs in plasma.", + "type": "bool" + }, + "plasma_lss_clean_threshold": { + "default": "98", + "descr": "Log cleaning starts at this fragmentation threshold.", + "type": "size_t" + }, + "plasma_lss_clean_max": { + "default": "99", + "descr": "Throttle incoming writes when fragmentation hits this threshold.", + "type": "size_t" + }, + "plasma_delta_chain_len": { + "default": "50", + "descr": "Max number of items allowed in delta chain before page is compacted.", + "type": "size_t" + }, + "plasma_base_page_items": { + "default": "50", + "descr": "Max number of items allowed in base page.", + "type": "size_t" + }, + "plasma_lss_num_segments": { + "default": "4", + "descr": "Max number of segments a plasma page can be split in.", + "type": "size_t" + }, + "plasma_sync_at": { + "default": "500", + "descr": "Time in milliseconds at which sync is forced (0 disables sync).", + "type": "size_t" + }, + "plasma_enable_upsert": { + "default": "true", + "descr": "Allow updates to existing keys without a lookup & delete.", + "type": "bool" + }, "time_synchronization": { "default": "disabled", "descr": "No longer supported. This config parameter has no effect.", diff --git a/engines/ep/src/executorpool.cc b/engines/ep/src/executorpool.cc index 0264334758..f08ff7e78b 100644 --- a/engines/ep/src/executorpool.cc +++ b/engines/ep/src/executorpool.cc @@ -121,6 +121,8 @@ size_t ExecutorPool::getNumReaders(void) { return count; } +size_t numShards; + ExecutorPool *ExecutorPool::get(void) { auto* tmp = instance.load(); if (tmp == nullptr) { @@ -140,6 +142,7 @@ ExecutorPool *ExecutorPool::get(void) { config.getNumWriterThreads(), config.getNumAuxioThreads(), config.getNumNonioThreads()); + numShards = config.getMaxNumShards(); ObjectRegistry::onSwitchThread(epe); instance.store(tmp); } @@ -604,7 +607,8 @@ bool ExecutorPool::_startWorkers(void) { if (!numWorkers[WRITER_TASK_IDX]) { // MB-12279: Limit writers to 4 for faster bgfetches in DGM by default - numWriters = 4; + numWriters = numShards; + numReaders = numShards; } _adjustWorkers(READER_TASK_IDX, numReaders); diff --git a/engines/ep/src/kvshard.cc b/engines/ep/src/kvshard.cc index fa76403f05..2970316955 100644 --- a/engines/ep/src/kvshard.cc +++ b/engines/ep/src/kvshard.cc @@ -29,6 +29,9 @@ #ifdef EP_USE_ROCKSDB #include "rocksdb-kvstore/rocksdb-kvstore_config.h" #endif +#ifdef EP_USE_PLASMA +#include "plasma-kvstore/plasma-kvstore_config.h" +#endif /* [EPHE TODO]: Consider not using KVShard for ephemeral bucket */ KVShard::KVShard(uint16_t id, Configuration& config) @@ -46,6 +49,13 @@ KVShard::KVShard(uint16_t id, Configuration& config) auto stores = KVStoreFactory::create(*kvConfig); rwStore = std::move(stores.rw); } +#endif +#ifdef EP_USE_PLASMA + else if (backend == "plasma") { + kvConfig = std::make_unique(config, id); + auto stores = KVStoreFactory::create(*kvConfig); + rwStore = std::move(stores.rw); + } #endif else { throw std::logic_error( diff --git a/engines/ep/src/kvstore.cc b/engines/ep/src/kvstore.cc index 171adc6748..4642af4e15 100644 --- a/engines/ep/src/kvstore.cc +++ b/engines/ep/src/kvstore.cc @@ -27,6 +27,10 @@ #include "rocksdb-kvstore/rocksdb-kvstore.h" #include "rocksdb-kvstore/rocksdb-kvstore_config.h" #endif +#ifdef EP_USE_PLASMA +#include "plasma-kvstore/plasma-kvstore.h" +#include "plasma-kvstore/plasma-kvstore_config.h" +#endif #include "kvstore.h" #include "kvstore_config.h" #include "persistence_callback.h" @@ -87,6 +91,13 @@ KVStoreRWRO KVStoreFactory::create(KVStoreConfig& config) { dynamic_cast(config)); return {rw.release(), nullptr}; } +#endif +#ifdef EP_USE_PLASMA + else if (backend == "plasma") { + auto rw = std::make_unique( + dynamic_cast(config)); + return {rw.release(), nullptr}; + } #endif else { throw std::invalid_argument("KVStoreFactory::create unknown backend:" + diff --git a/engines/ep/src/kvstore_config.h b/engines/ep/src/kvstore_config.h index eebd4e12c8..c17428b47a 100644 --- a/engines/ep/src/kvstore_config.h +++ b/engines/ep/src/kvstore_config.h @@ -121,10 +121,9 @@ class KVStoreConfig { Logger* logger; bool buffered; bool persistDocNamespace; - - /** - * If non-zero, tell storage layer to issue a sync() operation after every - * N bytes written. - */ - uint64_t periodicSyncBytes; + /** + * If non-zero, tell storage layer to issue a sync() operation after every + * N bytes written. + */ + uint64_t periodicSyncBytes; }; diff --git a/engines/ep/src/plasma-kvstore/plasma-kvstore.cc b/engines/ep/src/plasma-kvstore/plasma-kvstore.cc new file mode 100644 index 0000000000..f24bdbcda8 --- /dev/null +++ b/engines/ep/src/plasma-kvstore/plasma-kvstore.cc @@ -0,0 +1,822 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2017 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" + +#include "plasma-kvstore.h" +#include "plasma-wrapper.h" + +#include "ep_time.h" + +#include "plasma-kvstore_config.h" +#include "kvstore_priv.h" + +#include +#include +#include +#include + +#include "vbucket.h" + +namespace plasmakv { +// MetaData is used to serialize and de-serialize metadata respectively when +// writing a Document mutation request to Plasma and when reading a Document +// from Plasma. +class MetaData { +public: + MetaData() + : deleted(0), + version(0), + datatype(0), + flags(0), + valueSize(0), + exptime(0), + cas(0), + revSeqno(0), + bySeqno(0){}; + MetaData(bool deleted, + uint8_t version, + uint8_t datatype, + uint32_t flags, + uint32_t valueSize, + time_t exptime, + uint64_t cas, + uint64_t revSeqno, + int64_t bySeqno) + : deleted(deleted), + version(version), + datatype(datatype), + flags(flags), + valueSize(valueSize), + exptime(exptime), + cas(cas), + revSeqno(revSeqno), + bySeqno(bySeqno){}; + +// The `#pragma pack(1)` directive and the order of members are to keep +// the size of MetaData as small as possible and uniform across different +// platforms. +#pragma pack(1) + uint8_t deleted : 1; + uint8_t version : 7; + uint8_t datatype; + uint32_t flags; + uint32_t valueSize; + time_t exptime; + uint64_t cas; + uint64_t revSeqno; + int64_t bySeqno; +#pragma pack() +}; +} // namespace plasmakv + +/** + * Class representing a document to be persisted in Plasma. + */ +class PlasmaRequest : public IORequest { +public: + /** + * Constructor + * + * @param item Item instance to be persisted + * @param callback Persistence Callback + * @param del Flag indicating if it is an item deletion or not + */ + PlasmaRequest(const Item& item, MutationRequestCallback& callback) + : IORequest(item.getVBucketId(), + callback, + item.isDeleted(), + item.getKey()), + docBody(item.getValue()), + updatedExistingItem(false) { + docMeta = plasmakv::MetaData( + item.isDeleted(), + 0, + item.getDataType(), + item.getFlags(), + item.getNBytes(), + item.isDeleted() ? ep_real_time() : item.getExptime(), + item.getCas(), + item.getRevSeqno(), + item.getBySeqno()); + } + + const plasmakv::MetaData& getDocMeta() { + return docMeta; + } + + const int64_t getBySeqno() { + return docMeta.bySeqno; + } + + const size_t getKeyLen() { + return getKey().size(); + } + + const char *getKeyData() { + return getKey().c_str(); + } + const size_t getBodySize() { + return docBody ? docBody->valueSize() : 0; + } + + const void *getBodyData() { + return docBody ? docBody->getData() : nullptr; + } + + const bool wasCreate() { return !updatedExistingItem; } + void markAsUpdated() { updatedExistingItem = true; } + +private: + plasmakv::MetaData docMeta; + value_t docBody; + bool updatedExistingItem; +}; + +// using PlsmPtr = std::unique_ptr; + +class KVPlasma { +public: + KVPlasma(const uint16_t vbid, const std::string path) : + vbid(vbid) { + plasmaHandleId = open_plasma(path.c_str(), vbid); + if (plasmaHandleId < 0) { + fprintf(stderr, "FATAL: Unable to open plasma %s, vb %d\n", + path.c_str(), vbid); + throw std::logic_error("PlasmaKVStore::openDB: can't open[" + + std::to_string(vbid) + "] in " + path.c_str()); + } + } + + ~KVPlasma() { + uint64_t persistedSeqno; + close_plasma(vbid, plasmaHandleId, &persistedSeqno); + } + + int SetOrDel(const std::unique_ptr& req) { + if (req->isDelete()) { + //fprintf(stderr, "Deleting a key %s, len %zu from plasma", + // req->getKeyData(), req->getKeyLen()); + return delete_kv(Plasma_KVengine, vbid, plasmaHandleId, + req->getKeyData(), req->getKeyLen()); + } + // fprintf(stderr, "Inserting a key %s, len %zu into plasma", + // req->getKeyData(), req->getKeyLen()); + + /* TODO: Send in the slices of plasma meta & value to avoid memcpy */ + std::memcpy(&big_bad_buf[0], &req->getDocMeta(), + sizeof(plasmakv::MetaData)); + auto valSz = req->getBodySize(); + if (valSz > 3000) { + fprintf(stderr, "FATAL-TOO-BIG-VALUE: val size = %zu\n", + req->getBodySize()); + valSz = 3000; + } + if (req->getBySeqno() == 0) { + fprintf(stderr, "FATAL-ZERO-SEQNUM-IN-INSERT: val size = %zu\n", + req->getBodySize()); + throw std::logic_error("ZERO SEQNUM SHOULD NOT EXIST!!"); + } + std::memcpy(&big_bad_buf[sizeof(plasmakv::MetaData)], req->getBodyData(), + valSz); + int ret = insert_kv(Plasma_KVengine, vbid, plasmaHandleId, + req->getKeyData(), req->getKeyLen(), + &big_bad_buf[0], valSz + sizeof(plasmakv::MetaData), + req->getBySeqno()); + if (ret < 0) { + return ret; + } + if (ret == 1) { // Item previously existing in plasma + req->markAsUpdated(); + } + return 0; + } + + int Get(const StoredDocKey &key, void **value, int *valueLen) { + *value = &big_bad_buf; + *valueLen = sizeof(big_bad_buf); + int ret = lookup_kv(Plasma_KVengine, vbid, plasmaHandleId, + key.data(), key.size(), value, valueLen); + if (ret) { + fprintf(stderr, "FATAL-PLASMA-LOOKUP-ERROR: %d\n", ret); + } + return ret; + } + + uint16_t vbid; + int plasmaHandleId; + char big_bad_buf[3072]; +}; + +static std::mutex initGuard; +static bool plasmaInited; + +PlasmaKVStore::PlasmaKVStore(PlasmaKVStoreConfig& configuration) + : KVStore(configuration), + vbDB(configuration.getMaxVBuckets()), + in_transaction(false), + plasmaPath(configuration.getDBName()+ "/plasma"), + scanCounter(0), + logger(configuration.getLogger()) { + + { + LockHolder lh(initGuard); + if (!plasmaInited) { + uint64_t memQuota = uint64_t(configuration.getPlasmaMemQuota()); + memQuota *= (1024 * 1024); // Input is in MB, convert to bytes + bool directIo = configuration.isPlasmaEnableDirectio(); + bool kvSeparate = configuration.isPlasmaKvSeparation(); + int lssCleanAtFrag = configuration.getPlasmaLssCleanThreshold(); + int lssCleanMax = configuration.getPlasmaLssCleanMax(); + int deltaChainLen = configuration.getPlasmaDeltaChainLen(); + int basePageLen = configuration.getPlasmaBasePageItems(); + int lssNumSegs = configuration.getPlasmaLssNumSegments(); + int syncAt = configuration.getPlasmaSyncAt(); + bool upsert = configuration.isPlasmaEnableUpsert(); + + init_plasma(memQuota, + directIo, + kvSeparate, + lssCleanAtFrag, + lssCleanMax, + deltaChainLen, + basePageLen, + lssNumSegs, + syncAt, + upsert); + plasmaInited = true; + fprintf(stderr, "Initialized plasma kvstore..\n"); + fprintf(stderr, "MemQuota = %zu\n", memQuota); + fprintf(stderr, "DirectIO (%s)\n", directIo ? "yes" : "no"); + fprintf(stderr, "KV Separation (%s)\n", kvSeparate ? "yes" : "no"); + fprintf(stderr, "LSS clean at %d\n", lssCleanAtFrag); + fprintf(stderr, "LSS throttle at %d\n", lssCleanMax); + fprintf(stderr, "Delta Chain Len %d\n", deltaChainLen); + fprintf(stderr, "Base Page Len %d\n", basePageLen); + fprintf(stderr, "LSS Num Segments %d\n", lssNumSegs); + fprintf(stderr, "Sync at %d milliseconds\n", syncAt); + fprintf(stderr, "Upsert (%s)\n", upsert ? "yes" : "no"); + } + } + cachedVBStates.resize(configuration.getMaxVBuckets()); + + createDataDir(configuration.getDBName()); + + // Read persisted VBs state + auto vbids = discoverVBuckets(); + for (auto vbid : vbids) { + KVPlasma db(vbid, plasmaPath); + //readVBState(db); + // Update stats + ++st.numLoadedVb; + } +} + +PlasmaKVStore::~PlasmaKVStore() { + in_transaction = false; +} + +std::string PlasmaKVStore::getVBDBSubdir(uint16_t vbid) { + return configuration.getDBName() + "/plasma." + std::to_string(vbid); +} + +std::vector PlasmaKVStore::discoverVBuckets() { + std::vector vbids; + auto vbDirs = + cb::io::findFilesContaining(configuration.getDBName(), "plasma."); + for (auto& dir : vbDirs) { + size_t lastDotIndex = dir.rfind("."); + size_t vbidLength = dir.size() - lastDotIndex - 1; + std::string vbidStr = dir.substr(lastDotIndex + 1, vbidLength); + uint16_t vbid = atoi(vbidStr.c_str()); + // Take in account only VBuckets managed by this Shard + if ((vbid % configuration.getMaxShards()) == + configuration.getShardId()) { + vbids.push_back(vbid); + } + } + return vbids; +} + +bool PlasmaKVStore::begin(std::unique_ptr txCtx) { + in_transaction = true; + transactionCtx = std::move(txCtx); + return in_transaction; +} + +bool PlasmaKVStore::commit(const Item* collectionsManifest) { + // This behaviour is to replicate the one in Couchstore. + // If `commit` is called when not in transaction, just return true. + if (!in_transaction) { + return true; + } + + if (pendingReqs.size() == 0) { + in_transaction = false; + return true; + } + + // Swap `pendingReqs` with the temporary `commitBatch` so that we can + // shorten the scope of the lock. + std::vector> commitBatch; + { + std::lock_guard lock(writeLock); + std::swap(pendingReqs, commitBatch); + } + + bool success = true; + auto vbid = commitBatch[0]->getVBucketId(); + + // Flush all documents to disk + auto status = saveDocs(vbid, collectionsManifest, commitBatch); + if (status) { + logger.log(EXTENSION_LOG_WARNING, + "PlasmaKVStore::commit: saveDocs error:%d, " + "vb:%" PRIu16, + status, + vbid); + success = false; + } + + commitCallback(status, commitBatch); + + // This behaviour is to replicate the one in Couchstore. + // Set `in_transanction = false` only if `commit` is successful. + if (success) { + in_transaction = false; + transactionCtx.reset(); + } + + return success; +} + +void PlasmaKVStore::commitCallback( + int status, + const std::vector>& commitBatch) { + for (const auto& req : commitBatch) { + if (!status) { + ++st.numSetFailure; + } else { + st.writeTimeHisto.add(req->getDelta() / 1000); + st.writeSizeHisto.add(req->getKeyLen() + req->getBodySize()); + } + // TODO: Should set `mr.second` to true or false depending on if + // this is an insertion (true) or an update of an existing item + // (false). However, to achieve this we would need to perform a lookuup + // to RocksDB which is costly. For now just assume that the item + // did not exist. + mutation_result mr = std::make_pair(1, req->wasCreate()); + req->getSetCallback()->callback(*transactionCtx, mr); + } +} + +void PlasmaKVStore::rollback() { + if (in_transaction) { + in_transaction = false; + transactionCtx.reset(); + } +} + +StorageProperties PlasmaKVStore::getStorageProperties(void) { + StorageProperties rv(StorageProperties::EfficientVBDump::Yes, + StorageProperties::EfficientVBDeletion::Yes, + StorageProperties::PersistedDeletion::No, + StorageProperties::EfficientGet::Yes, + StorageProperties::ConcurrentWriteCompact::Yes); + return rv; +} + +std::vector PlasmaKVStore::listPersistedVbuckets() { + std::vector result; + for (const auto& vb : cachedVBStates) { + result.emplace_back(vb.get()); + } + return result; +} + +void PlasmaKVStore::set(const Item& item, + Callback& cb) { + if (!in_transaction) { + throw std::logic_error( + "PlasmaKVStore::set: in_transaction must be true to perform a " + "set operation."); + } + MutationRequestCallback callback; + callback.setCb = &cb; + pendingReqs.push_back(std::make_unique(item, callback)); +} + +GetValue PlasmaKVStore::get(const StoredDocKey& key, uint16_t vb, bool fetchDelete) { + return getWithHeader(nullptr, key, vb, GetMetaOnly::No, fetchDelete); +} + +GetValue PlasmaKVStore::getWithHeader(void* dbHandle, + const StoredDocKey& key, + uint16_t vb, + GetMetaOnly getMetaOnly, + bool fetchDelete) { + void *value; + int valueLen; + KVPlasma db(vb, plasmaPath); + int status = db.Get(key, &value, &valueLen); + if (status < 0) { + logger.log(EXTENSION_LOG_WARNING, + "PlasmaKVStore::getWithHeader: plasma::DB::Lookup error:%d, " + "vb:%" PRIu16, + status, + vb); + } + std::string valStr(reinterpret_cast(value), valueLen); + return makeGetValue(vb, key, valStr, getMetaOnly); +} + +void PlasmaKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t& itms) { + KVPlasma db(vb, plasmaPath); + for (auto& it : itms) { + auto &key = it.first; + void *value; + int valueLen; + int status = db.Get(key, &value, &valueLen); + if (status < 0) { + logger.log(EXTENSION_LOG_WARNING, + "PlasmaKVStore::getMulti: plasma::DB::Lookup error:%d, " + "vb:%" PRIu16, + status, + vb); + for (auto &fetch : it.second.bgfetched_list) { + fetch->value->setStatus(ENGINE_KEY_ENOENT); + } + continue; + } + std::string valStr(reinterpret_cast(value), valueLen); + it.second.value = makeGetValue(vb, key, valStr, it.second.isMetaOnly); + GetValue *rv = &it.second.value; + for (auto &fetch : it.second.bgfetched_list) { + fetch->value = rv; + } + } +} + +void PlasmaKVStore::reset(uint16_t vbucketId) { + // TODO Plsm: Implement. +} + +void PlasmaKVStore::del(const Item& item, + Callback& cb) { + if (!in_transaction) { + throw std::logic_error( + "PlasmaKVStore::del: in_transaction must be true to perform a " + "delete operation."); + } + // TODO: Deleted items remain as tombstones, but are not yet expired, + // they will accumuate forever. + MutationRequestCallback callback; + callback.delCb = &cb; + pendingReqs.push_back(std::make_unique(item, callback)); +} + +void PlasmaKVStore::delVBucket(uint16_t vbid, uint64_t vb_version) { + std::lock_guard lg(writeLock); + // TODO: check if needs lock on `openDBMutex`. We should not need (e.g., + // there was no synchonization between this and `commit`), but we could + // have an error if we destroy `vbDB[vbid]` while the same DB is used + // somewhere else. Also, from Plasma docs: + // "Calling DestroyDB() on a live DB is an undefined behavior." + vbDB[vbid].reset(); + // Just destroy the DB in the sub-folder for vbid + auto dbname = getVBDBSubdir(vbid); + // DESTROY DB... +} + +bool PlasmaKVStore::snapshotVBucket(uint16_t vbucketId, + const vbucket_state& vbstate, + VBStatePersist options) { + // TODO Plsm: Refactor out behaviour common to this and CouchKVStore + auto start = ProcessClock::now(); + + if (updateCachedVBState(vbucketId, vbstate) && + (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT || + options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) { + int handleId = open_plasma(plasmaPath.c_str(), vbucketId); + uint64_t persistedSeqno; + close_plasma(vbucketId, handleId, &persistedSeqno); + /* + auto& db = openDB(vbucketId); + if (!saveVBState(db, vbstate).ok()) { + logger.log(EXTENSION_LOG_WARNING, + "PlasmaKVStore::snapshotVBucket: saveVBState failed " + "state:%s, vb:%" PRIu16, + VBucket::toString(vbstate.state), + vbucketId); + return false; + } + */ + } + + LOG(EXTENSION_LOG_DEBUG, + "PlasmaKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16 + " state:%s", + vbucketId, + vbstate.toJSON().c_str()); + + st.snapshotHisto.add(std::chrono::duration_cast( + ProcessClock::now() - start)); + + return true; +} + +bool PlasmaKVStore::snapshotStats(const std::map&) { + // TODO Plsm: Implement + return true; +} + +void PlasmaKVStore::destroyInvalidVBuckets(bool) { + // TODO Plsm: implement +} + +size_t PlasmaKVStore::getNumShards() { + return configuration.getMaxShards(); +} + +std::unique_ptr PlasmaKVStore::makeItem(uint16_t vb, + const DocKey& key, + const std::string& value, + GetMetaOnly getMetaOnly) { + const char* data = value.c_str(); + + plasmakv::MetaData meta; + std::memcpy(&meta, data, sizeof(meta)); + data += sizeof(meta); + + bool includeValue = getMetaOnly == GetMetaOnly::No && meta.valueSize; + + auto item = std::make_unique(key, + meta.flags, + meta.exptime, + includeValue ? data : nullptr, + includeValue ? meta.valueSize : 0, + meta.datatype, + meta.cas, + meta.bySeqno, + vb, + meta.revSeqno); + + if (meta.deleted) { + item->setDeleted(); + } + + return item; +} + +GetValue PlasmaKVStore::makeGetValue(uint16_t vb, + const DocKey& key, + const std::string & value, + GetMetaOnly getMetaOnly) { + return GetValue( + makeItem(vb, key, value, getMetaOnly), ENGINE_SUCCESS, -1, 0); +} + +void PlasmaKVStore::readVBState(const KVPlasma& db) { + // Largely copied from CouchKVStore + // TODO Plsm: refactor out sections common to CouchKVStore + vbucket_state_t state = vbucket_state_dead; + uint64_t checkpointId = 0; + uint64_t maxDeletedSeqno = 0; + int64_t highSeqno = readHighSeqnoFromDisk(db); + std::string failovers; + uint64_t purgeSeqno = 0; + uint64_t lastSnapStart = 0; + uint64_t lastSnapEnd = 0; + uint64_t maxCas = 0; + int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised; + bool mightContainXattrs = false; + + auto key = getVbstateKey(); + std::string vbstate; + auto vbid = db.vbid; + cachedVBStates[vbid] = std::make_unique(state, + checkpointId, + maxDeletedSeqno, + highSeqno, + purgeSeqno, + lastSnapStart, + lastSnapEnd, + maxCas, + hlcCasEpochSeqno, + mightContainXattrs, + failovers); +} + +int PlasmaKVStore::saveDocs( + uint16_t vbid, + const Item* collectionsManifest, + const std::vector>& commitBatch) { + auto reqsSize = commitBatch.size(); + if (reqsSize == 0) { + st.docsCommitted = 0; + return 0; + } + + auto& vbstate = cachedVBStates[vbid]; + if (vbstate == nullptr) { + throw std::logic_error("PlasmaKVStore::saveDocs: cachedVBStates[" + + std::to_string(vbid) + "] is NULL"); + } + + int64_t lastSeqno = 0; + int status = 0; + + auto begin = ProcessClock::now(); + { + KVPlasma db(vbid, plasmaPath); + + for (const auto& request : commitBatch) { + status = db.SetOrDel(request); + if (status < 0) { + logger.log(EXTENSION_LOG_WARNING, + "PlasmaKVStore::saveDocs: plasma::DB::Insert error:%d, " + "vb:%" PRIu16, + status, + vbid); + } + if (request->getBySeqno() > lastSeqno) { + lastSeqno = request->getBySeqno(); + } + } + } + + st.commitHisto.add(std::chrono::duration_cast( + ProcessClock::now() - begin)); + if (status) { + logger.log(EXTENSION_LOG_WARNING, + "PlasmaKVStore::saveDocs: plasma::DB::Write error:%d, " + "vb:%" PRIu16, + status, + vbid); + return status; + } + + vbstate->highSeqno = lastSeqno; + + return status; +} + +int64_t PlasmaKVStore::readHighSeqnoFromDisk(const KVPlasma& db) { + return 0; +} + +std::string PlasmaKVStore::getVbstateKey() { + return "vbstate"; +} + +ScanContext* PlasmaKVStore::initScanContext( + std::shared_ptr > cb, + std::shared_ptr > cl, + uint16_t vbid, + uint64_t startSeqno, + DocumentFilter options, + ValueFilter valOptions) { + size_t scanId = scanCounter++; + + // As we cannot efficiently determine how many documents this scan will + // find, we approximate this value with the seqno difference + 1 + // as scan is supposed to be inclusive at both ends, + // seqnos 2 to 4 covers 3 docs not 4 - 2 = 2 + + uint64_t endSeqno = cachedVBStates[vbid]->highSeqno; + return new ScanContext(cb, + cl, + vbid, + scanId, + startSeqno, + endSeqno, + options, + valOptions, + /* documentCount */ endSeqno - startSeqno + 1, + configuration); +} + +scan_error_t PlasmaKVStore::scan(ScanContext* ctx) { + if (!ctx) { + return scan_failed; + } + + if (ctx->lastReadSeqno == ctx->maxSeqno) { + return scan_success; + } + + auto startSeqno = ctx->startSeqno; + if (ctx->lastReadSeqno != 0) { + startSeqno = ctx->lastReadSeqno + 1; + } + + GetMetaOnly isMetaOnly = ctx->valFilter == ValueFilter::KEYS_ONLY + ? GetMetaOnly::Yes + : GetMetaOnly::No; + + logger.log(EXTENSION_LOG_WARNING, + "PlasmaKVStore::scan from start seqno %zu to %zu on vb %d", + startSeqno, ctx->maxSeqno, ctx->vbid); + + int bfillHandle = open_backfill_query(ctx->vbid, startSeqno); + + if (bfillHandle < 0) { + char errbf[256]; + sprintf(errbf, "PlasmaKVStore::scan: plasma backfill query fail! err=%d vbid=%d, startseqno=%zu", bfillHandle, ctx->vbid, startSeqno); + throw std::logic_error(errbf); + } + + char keyBuf[200]; // TODO: Find a way to have Plasma allocate memory + void *Key = &keyBuf; + int keyLen = sizeof(keyBuf); + char valueBuf[3072]; // TODO: Find a way to have Plasma to allocate memory + void *value = &valueBuf; + int valueLen = sizeof(valueBuf); + uint64_t seqNo; + + while (true) { + keyLen = sizeof(keyBuf); // reset back for every query call + valueLen = sizeof(valueBuf); + int err = next_backfill_query(ctx->vbid, bfillHandle, &Key, &keyLen, + &value, &valueLen, &seqNo); + if (err) { + if (err == ErrBackfillQueryEOF) { + logger.log(EXTENSION_LOG_WARNING, + "BACKFILL complete for vb %d: max seqno %zu\n", + ctx->vbid, ctx->maxSeqno); + break; + } + fprintf(stderr, "FATAL-PLASMA-BACKFILL-ERROR: %d\n", err); + throw std::logic_error( + "PlasmaKVStore::scan: plasma backfill query next fail!"); + } + if (int64_t(seqNo) > ctx->maxSeqno) { // don't return sequence numbers out of snapshot + continue; + } + DocKey key(reinterpret_cast(Key), keyLen, + DocNamespace::DefaultCollection); + + std::string valStr(reinterpret_cast(value), valueLen); + std::unique_ptr itm = + makeItem(ctx->vbid, key, valStr, isMetaOnly); + bool includeDeletes = + (ctx->docFilter == DocumentFilter::NO_DELETES) ? false : true; + bool onlyKeys = + (ctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false; + + if (!includeDeletes && itm->isDeleted()) { + continue; + } + int64_t byseqno = seqNo; + CacheLookup lookup(key, byseqno, ctx->vbid, + ctx->collectionsContext.getSeparator()); + ctx->lookup->callback(lookup); + + int status = ctx->lookup->getStatus(); + + if (status == ENGINE_KEY_EEXISTS) { + ctx->lastReadSeqno = byseqno; + continue; + } else if (status == ENGINE_ENOMEM) { + logger.log(EXTENSION_LOG_WARNING, + "BACKFILL scan-again: cache lookup ENOMEM: %zu %zu %d\n", + startSeqno, ctx->maxSeqno, ctx->vbid); + return scan_again; + } + + GetValue rv(std::move(itm), ENGINE_SUCCESS, -1, onlyKeys); + ctx->callback->callback(rv); + status = ctx->callback->getStatus(); + + if (status == ENGINE_ENOMEM) { + logger.log(EXTENSION_LOG_WARNING, + "BACKFILL scan-again: value callback ENOMEM: %zu %zu %d\n", + startSeqno, ctx->maxSeqno, ctx->vbid); + return scan_again; + } + + ctx->lastReadSeqno = byseqno; + } + close_backfill_query(ctx->vbid, bfillHandle); + + return scan_success; +} + +void PlasmaKVStore::destroyScanContext(ScanContext* ctx) { + // TODO Plsm: Might be nice to have the snapshot in the ctx and + // release it on destruction +} diff --git a/engines/ep/src/plasma-kvstore/plasma-kvstore.h b/engines/ep/src/plasma-kvstore/plasma-kvstore.h new file mode 100644 index 0000000000..0a633f7be6 --- /dev/null +++ b/engines/ep/src/plasma-kvstore/plasma-kvstore.h @@ -0,0 +1,313 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2017 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Experimental Plasma KVStore implementation + * + */ + +#pragma once + +#include +#include +#include + +#include + +#include + +#include "../objectregistry.h" +#include "vbucket_bgfetch_item.h" + +class PlasmaRequest; +class PlasmaKVStoreConfig; +class KVPlasma; +struct KVStatsCtx; + +/** + * A persistence store based on plasma. + */ +class PlasmaKVStore : public KVStore { +public: + /** + * Constructor + * + * @param config Configuration information + */ + PlasmaKVStore(PlasmaKVStoreConfig& config); + + ~PlasmaKVStore(); + + void operator=(PlasmaKVStore& from) = delete; + + /** + * Reset database to a clean state. + */ + void reset(uint16_t vbucketId) override; + + /** + * Begin a transaction (if not already in one). + */ + bool begin(std::unique_ptr txCtx) override; + + /** + * Commit a transaction (unless not currently in one). + * + * Returns false if the commit fails. + */ + bool commit(const Item* collectionsManifest) override; + + /** + * Rollback a transaction (unless not currently in one). + */ + void rollback() override; + + /** + * Query the properties of the underlying storage. + */ + StorageProperties getStorageProperties() override; + + /** + * Overrides set(). + */ + void set(const Item& item, + Callback& cb) override; + + /** + * Overrides get(). + */ + GetValue get(const StoredDocKey& key, + uint16_t vb, + bool fetchDelete = false) override; + + GetValue getWithHeader(void* dbHandle, + const StoredDocKey& key, + uint16_t vb, + GetMetaOnly getMetaOnly, + bool fetchDelete = false) override; + + void getMulti(uint16_t vb, vb_bgfetch_queue_t& itms) override; + + /** + * Overrides del(). + */ + void del(const Item& itm, Callback& cb) override; + + void delVBucket(uint16_t vbucket, uint64_t vb_version) override; + + std::vector listPersistedVbuckets(void) override; + + /** + * Take a snapshot of the stats in the main DB. + */ + bool snapshotStats(const std::map& m); + /** + * Take a snapshot of the vbucket states in the main DB. + */ + bool snapshotVBucket(uint16_t vbucketId, + const vbucket_state& vbstate, + VBStatePersist options) override; + + void destroyInvalidVBuckets(bool); + + size_t getNumShards(); + + void optimizeWrites(std::vector&) { + } + + uint16_t getNumVbsPerFile(void) override { + // TODO vmx 2016-10-29: return the actual value + return 1024; + } + + bool compactDB(compaction_ctx*) override { + // Explicit compaction is not needed. + // Compaction is continuously occurring in separate threads + // under Plasma's control + return true; + } + + uint16_t getDBFileId(const protocol_binary_request_compact_db&) override { + // Not needed if there is no explicit compaction + return 0; + } + + vbucket_state* getVBucketState(uint16_t vbucketId) override { + return cachedVBStates[vbucketId].get(); + } + + size_t getNumPersistedDeletes(uint16_t vbid) override { + // TODO vmx 2016-10-29: implement + return 0; + } + + DBFileInfo getDbFileInfo(uint16_t vbid) override { + // TODO vmx 2016-10-29: implement + DBFileInfo vbinfo; + return vbinfo; + } + + DBFileInfo getAggrDbFileInfo() override { + // TODO vmx 2016-10-29: implement + DBFileInfo vbinfo; + return vbinfo; + } + + size_t getItemCount(uint16_t vbid) override { + // TODO vmx 2016-10-29: implement + return 0; + } + + RollbackResult rollback(uint16_t vbid, + uint64_t rollbackSeqno, + std::shared_ptr cb) override { + // TODO vmx 2016-10-29: implement + // NOTE vmx 2016-10-29: For LevelDB/Plasma it will probably + // always be a full rollback as it doesn't support Couchstore + // like rollback semantics + return RollbackResult(false, 0, 0, 0); + } + + void pendingTasks() override { + // NOTE vmx 2016-10-29: Intentionally left empty; + } + + ENGINE_ERROR_CODE getAllKeys( + uint16_t vbid, + const DocKey start_key, + uint32_t count, + std::shared_ptr> cb) override { + // TODO vmx 2016-10-29: implement + return ENGINE_SUCCESS; + } + + ScanContext* initScanContext(std::shared_ptr> cb, + std::shared_ptr> cl, + uint16_t vbid, + uint64_t startSeqno, + DocumentFilter options, + ValueFilter valOptions) override; + + scan_error_t scan(ScanContext* sctx) override; + + void destroyScanContext(ScanContext* ctx) override; + + std::string getCollectionsManifest(uint16_t vbid) override { + // TODO DJR 2017-05-19 implement this. + return ""; + } + + void incrementRevision(uint16_t vbid) override { + // TODO DJR 2017-05-19 implement this. + } + + uint64_t prepareToDelete(uint16_t vbid) override { + // TODO DJR 2017-05-19 implement this. + return 0; + } + + /* + std::unique_ptr makeReadOnlyStore() { + // Not using make_unique due to the private constructor we're calling + return std::unique_ptr( + new PlasmaKVStore(configuration)); + } + */ + +private: + // This is used for synchonization in `openDB` to avoid that we open two + // instances on the same DB (e.g., this would be possible + // when we `Flush` and `Warmup` run in parallel). + std::mutex openDBMutex; + // Thus, we put an entry in this vector at position `vbid` when we `openDB` + // for a VBucket for the first time. Then, further calls to `openDB(vbid)` + // return the pointer stored in this vector. An entry is removed only when + // `delVBucket(vbid)`. + std::vector> vbDB; + + /* + * This function returns an instance of `KVPlasma` for the given `vbid`. + * The DB for `vbid` is created if it does not exist. + * + * @param vbid vbucket id for the vbucket DB to open + */ + const KVPlasma& openDB(uint16_t vbid); + + /* + * The DB for each VBucket is created in a separated subfolder of + * `configuration.getDBName()`. This function returns the path of the DB + * subfolder for the given `vbid`. + * + * @param vbid vbucket id for the vbucket DB subfolder to return + */ + std::string getVBDBSubdir(uint16_t vbid); + + /* + * This function returns a vector of Vbucket IDs that already exist on + * disk. The function considers only the Vbuckets managed by the current + * Shard. + */ + std::vector discoverVBuckets(); + + std::unique_ptr makeItem(uint16_t vb, + const DocKey& key, + const std::string& value, + GetMetaOnly getMetaOnly); + + GetValue makeGetValue(uint16_t vb, + const DocKey& key, + const std::string& value, + GetMetaOnly getMetaOnly = GetMetaOnly::No); + + void readVBState(const KVPlasma& db); + + int saveDocs( + uint16_t vbid, + const Item* collectionsManifest, + const std::vector>& commitBatch); + + void commitCallback( + int status, + const std::vector>& commitBatch); + + int64_t readHighSeqnoFromDisk(const KVPlasma& db); + + std::string getVbstateKey(); + + // Used for queueing mutation requests (in `set` and `del`) and flushing + // them to disk (in `commit`). + std::vector> pendingReqs; + + // Plasma does *not* need additional synchronisation around + // db->Write, but we need to prevent delVBucket racing with + // commit, potentially losing data. + std::mutex writeLock; + + // This variable is used to verify that the KVStore API is used correctly + // when Plasma is used as store. "Correctly" means that the caller must + // use the API in the following way: + // - begin() x1 + // - set() / del() xN + // - commit() + bool in_transaction; + std::unique_ptr transactionCtx; + const std::string plasmaPath; + + std::atomic scanCounter; // atomic counter for generating scan id + + Logger& logger; +}; diff --git a/engines/ep/src/plasma-kvstore/plasma-kvstore_config.cc b/engines/ep/src/plasma-kvstore/plasma-kvstore_config.cc new file mode 100644 index 0000000000..6801653761 --- /dev/null +++ b/engines/ep/src/plasma-kvstore/plasma-kvstore_config.cc @@ -0,0 +1,33 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2018 Couchbase, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "plasma-kvstore_config.h" + +PlasmaKVStoreConfig::PlasmaKVStoreConfig(Configuration& config, + uint16_t shardid) + : KVStoreConfig(config, shardid) { + plasmaMemQuota = uint64_t(config.getPlasmaMemQuota()); + plasmaEnableDirectio = config.isPlasmaEnableDirectio(); + plasmaKvSeparation = config.isPlasmaKvSeparation(); + plasmaLssCleanThreshold = config.getPlasmaLssCleanThreshold(); + plasmaLssCleanMax = config.getPlasmaLssCleanMax(); + plasmaDeltaChainLen = config.getPlasmaDeltaChainLen(); + plasmaBasePageItems = config.getPlasmaBasePageItems(); + plasmaLssNumSegments = config.getPlasmaLssNumSegments(); + plasmaSyncAt = config.getPlasmaSyncAt(); + plasmaEnableUpsert = config.isPlasmaEnableUpsert(); +} diff --git a/engines/ep/src/plasma-kvstore/plasma-kvstore_config.h b/engines/ep/src/plasma-kvstore/plasma-kvstore_config.h new file mode 100644 index 0000000000..4eda87695a --- /dev/null +++ b/engines/ep/src/plasma-kvstore/plasma-kvstore_config.h @@ -0,0 +1,96 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2018 Couchbase, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "kvstore_config.h" + +#include + +class Configuration; + +// This class represents the PlasmaKVStore specific configuration. +// PlasmaKVStore uses this in place of the KVStoreConfig base class. +class PlasmaKVStoreConfig : public KVStoreConfig { +public: + // Initialize the object from the central EPEngine Configuration + PlasmaKVStoreConfig(Configuration& config, uint16_t shardid); + + uint64_t getPlasmaMemQuota() { + return plasmaMemQuota; + } + bool isPlasmaEnableDirectio() { + return plasmaEnableDirectio; + } + bool isPlasmaKvSeparation() { + return plasmaKvSeparation; + } + int getPlasmaLssCleanThreshold() { + return plasmaLssCleanThreshold; + } + int getPlasmaLssCleanMax() { + return plasmaLssCleanMax; + } + int getPlasmaDeltaChainLen() { + return plasmaDeltaChainLen; + } + int getPlasmaBasePageItems() { + return plasmaBasePageItems; + } + int getPlasmaLssNumSegments() { + return plasmaLssNumSegments; + } + int getPlasmaSyncAt() { + return plasmaSyncAt; + } + bool isPlasmaEnableUpsert() { + return plasmaEnableUpsert; + } + + +private: + // Plasma Memory Quota + size_t plasmaMemQuota; + + // Plasma Enable Direct I/O + bool plasmaEnableDirectio; + + // Plasma Enable Key Value Separation + bool plasmaKvSeparation; + + // Plasma LSS Clean Fragmentation + size_t plasmaLssCleanThreshold; + + // Plasma LSS Clean Throtle + size_t plasmaLssCleanMax; + + // Plasma delta chain len + size_t plasmaDeltaChainLen; + + // Plasma base page len + size_t plasmaBasePageItems; + + // Plasma LSS Num Segments + size_t plasmaLssNumSegments; + + // Plasma Sync at ms + size_t plasmaSyncAt; + + // Plasma enable upsert + bool plasmaEnableUpsert; + +}; diff --git a/engines/ep/src/plasma-kvstore/plasma-wrapper.c b/engines/ep/src/plasma-kvstore/plasma-wrapper.c new file mode 100644 index 0000000000..97aa6eaa3d --- /dev/null +++ b/engines/ep/src/plasma-kvstore/plasma-wrapper.c @@ -0,0 +1,270 @@ +#include +#include +#include +#include +#include + +#include "plasma-wrapper.h" +#include "libplasma-core.h" + +#define DEFAULT_VALUE_SIZE (2048) + +const bool debug = false; + +void +init_plasma(const uint64_t memQuota, + const bool dio, + const bool kv, + const int cleaner, + const int cleanermax, + const int delta, + const int items, + const int segments, + const int sync, + const bool upsert) +{ + GoUint64 mq = memQuota; + GoUint8 di = dio; + GoUint8 kvsep = kv; + GoInt32 cl = cleaner; + GoInt32 clmax = cleanermax; + GoInt32 dl = delta; + GoInt32 it = items; + GoInt32 seg = segments; + GoInt32 s = sync; + GoUint8 u = upsert; + + InitPlasma(mq, di, kvsep, cl, clmax, dl, it, seg, s, u); +} + +int +shutdown_plasma() +{ + GoInt perr; + + perr = ShutdownPlasma(); + return (int)perr; +} + +int +open_plasma(const char *dbPath, const int vbid) +{ + GoString path; + GoInt vBucketId; + GoInt plasma_handle; + + path.p = dbPath; + path.n = strlen(dbPath); + vBucketId = (GoInt)vbid; + + plasma_handle = OpenPlasma(path, vBucketId); + if (debug) { + fprintf(stderr, "OpenPlasma(%s, %d) %d\n", + (char *)path.p, (int)vBucketId, (int)plasma_handle); + } + return (int)plasma_handle; +} + +int +close_plasma(const int vbid, const int handle_id, uint64_t *ret_seq_num) +{ + GoInt vBucketId = (GoInt)vbid; + GoInt plasma_handle = (GoInt)handle_id; + GoInt perr; + GoUint64 retSeqNum; + + perr = ClosePlasma(vBucketId, plasma_handle, &retSeqNum); + if (debug) { + fprintf(stderr, "ClosePlasma(%d, %d) %d\n", + (int)vBucketId, (int)plasma_handle, (int)perr); + } + + *ret_seq_num = (uint64_t)retSeqNum; + + return (int)perr; +} + +int +insert_kv(const int db, + const int vbid, + const int handle_id, + const void *key, + const int keylen, + const void *value, + const int valuelen, + const uint64_t seq_num) +{ + GoInt plasmaDb = (GoInt)db; + GoInt vBucketId = (GoInt)vbid; + GoInt plasma_handle = (GoInt)handle_id; + GoString gokey, govalue; + GoUint64 goseq; + GoInt perr; + + gokey.p = key; + gokey.n = keylen; + govalue.p = value; + govalue.n = valuelen; + goseq = (GoUint64)seq_num; + + perr = InsertKV(plasmaDb, vBucketId, plasma_handle, gokey, govalue, goseq); + + if (debug) { + fprintf(stderr, "InsertKV(%d, %d, %d, %s, %d, %20.20s, %d, %lu) %d\n", + db, vbid, handle_id, (char *)key, + keylen, (char *)value, valuelen, seq_num, (int)perr); + } + return (int)perr; +} + +int +delete_kv(const int db, + const int vbid, + const int handle_id, + const void *key, + const int keylen) +{ + GoInt plasmaDb = (GoInt)db; + GoInt vBucketId = (GoInt)vbid; + GoInt plasma_handle = (GoInt)handle_id; + GoString gokey; + GoInt perr; + + gokey.p = key; + gokey.n = keylen; + + perr = DeleteKV(plasmaDb, vBucketId, plasma_handle, gokey); + if (debug) { + fprintf(stderr, "DeleteKV(%d, %d, %d, %s, %d) %d\n", + db, vbid, handle_id, (char *)key, + keylen, (int)perr); + } + return (int)perr; +} + +int +lookup_kv(const int db, + const int vbid, + const int handle_id, + const void *key, + const int keylen, + void **value, + int *valuelen) +{ + GoInt plasmaDb = (GoInt)db; + GoInt vBucketId = (GoInt)vbid; + GoInt plasma_handle = (GoInt)handle_id; + GoString gokey; + GoInt ret; + GoInt govaluelen; + + gokey.p = key; + gokey.n = keylen; + govaluelen = (GoInt)*valuelen; + + ret = LookupKV(plasmaDb, vBucketId, plasma_handle, gokey, value, &govaluelen); + + *valuelen = (int)govaluelen; + + if (debug) { + fprintf(stderr, "LookupKV(%d, %d, %d, %s, %d) %30.30s %d %d\n", + db, vbid, handle_id, (char *)key, + keylen, (char *)*value, *valuelen, (int)ret); + } + + return (int)ret; +} + +void +get_stats(const int vbid, + uint64_t *di_memsz, + uint64_t *di_memszidx, + uint64_t *di_numpages, + uint64_t *di_itemscount, + uint64_t *di_lssfrag, + uint64_t *di_lssdatasize, + uint64_t *di_lssusedspace, + uint64_t *di_reclaimpending, + uint64_t *st_memsz, + uint64_t *st_memszidx, + uint64_t *st_reclaimpending) +{ + struct PlasmaStats_return psr; + GoInt vBucketId = (GoInt)vbid; + + psr = PlasmaStats(vBucketId); + + *di_memsz = (uint64_t)psr.r0; + *di_memszidx = (uint64_t)psr.r1; + *di_numpages = (uint64_t)psr.r2; + *di_itemscount = (uint64_t)psr.r3; + *di_lssfrag = (uint64_t)psr.r4; + *di_lssdatasize = (uint64_t)psr.r5; + *di_lssusedspace = (uint64_t)psr.r6; + *di_reclaimpending = (uint64_t)psr.r7; + *st_memsz = (uint64_t)psr.r8; + *st_memszidx = (uint64_t)psr.r9; + *st_reclaimpending = (uint64_t)psr.r10; + + return; +} + +int +open_backfill_query(const int vbid, const uint64_t seq_num) +{ + GoInt vBucketId; + GoInt plasma_handle; + GoUint64 goseq; + + vBucketId = (GoInt)vbid; + goseq = (GoUint64)seq_num; + + plasma_handle = OpenBackfillQuery(vBucketId, goseq); + if (debug) { + fprintf(stderr, "OpenPlasma(%d) %d\n", (int)vBucketId, (int)plasma_handle); + } + return (int)plasma_handle; +} + +int +close_backfill_query(const int vbid, const int handle_id) +{ + GoInt vBucketId = (GoInt)vbid; + GoInt plasma_handle = (GoInt)handle_id; + GoInt perr; + + perr = CloseBackfillQuery(vBucketId, plasma_handle); + if (debug) { + fprintf(stderr, "CloseBackfillQuery(%d, %d) %d\n", + (int)vBucketId, (int)plasma_handle, (int)perr); + } + return (int)perr; +} + +int +next_backfill_query( + const int vbid, + const int handle_id, + void **retkey, + int *retkeylen, + void **retval, + int *retvallen, + uint64_t *ret_seq_num) +{ + GoInt vBucketId = (GoInt)vbid; + GoInt plasma_handle = (GoInt)handle_id; + GoInt keyLen, valueLen; + GoUint64 seqNum; + GoInt ret; + + keyLen = (GoInt)*retkeylen; + valueLen = (GoInt)*retvallen; + + ret = NextBackfillQuery(vBucketId, plasma_handle, retkey, &keyLen, retval, &valueLen, &seqNum); + + *retkeylen = (int)keyLen; + *retvallen = (int)valueLen; + *ret_seq_num = (uint64_t)seqNum; + + return (int)ret; +} diff --git a/engines/ep/src/plasma-kvstore/plasma-wrapper.h b/engines/ep/src/plasma-kvstore/plasma-wrapper.h new file mode 100644 index 0000000000..ebc6b14266 --- /dev/null +++ b/engines/ep/src/plasma-kvstore/plasma-wrapper.h @@ -0,0 +1,121 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2017 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _PLASMA_WRAPPER_H +#define _PLASMA_WRAPPER_H + +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { + SuccessUpdate = 1, + SuccessInsert = 0, + Success = 0, + SuccessSync = 1, + ErrInitPlasmaNotCalled = -1, + ErrDbOpen = -2, + ErrDbNotOpen = -3, + ErrHandleNotInUse = -4, + ErrInsertValue = -5, + ErrItemNotFound = -6, + ErrInternal = -7, + ErrValueBufTooSmall = -8, + ErrBackfillQueryNotOpen = -9, + ErrBackfillQueryEOF = -10 +} wrapper_err_codes_t; + +enum { + Plasma_KVengine = 1, + Plasma_LocalDb = 2 +}; + +void init_plasma( + const uint64_t memQuota, + const bool dio, + const bool kv, + const int cleaner, + const int cleanermax, + const int delta, + const int items, + const int segments, + const int sync, + const bool upsert); +int shutdown_plasma(); +int open_plasma( + const char *dbPath, + const int vbid); +int close_plasma( + const int vbid, + const int handle_id, + uint64_t *ret_seq_num); +int insert_kv( + const int db, + const int vbid, + const int handle_id, + const void *key, + const int keylen, + const void *value, + const int valuelen, + const uint64_t seq_num); +int delete_kv( + const int db, + const int vbid, + const int handle_id, + const void *key, + const int keylen); +int lookup_kv( + const int db, + const int vbid, + const int handle_id, + const void *key, + const int keylen, + void **value, + int *valuelen); +int open_backfill_query(const int vbid, const uint64_t seq_num); +int close_backfill_query(const int vbid, const int handle_id); +int next_backfill_query( + const int vbid, + const int handle_id, + void **retkey, + int *retkeylen, + void **retval, + int *retvallen, + uint64_t *ret_seq_num); +void get_stats( + const int vbid, + uint64_t *di_memsz, + uint64_t *di_memszidx, + uint64_t *di_numpages, + uint64_t *di_itemscount, + uint64_t *di_lssfrag, + uint64_t *di_lssdatasize, + uint64_t *di_lssusedspace, + uint64_t *di_reclaimpending, + uint64_t *st_memsz, + uint64_t *st_memszidx, + uint64_t *st_reclaimpending); + +#ifdef __cplusplus +} +#endif +#endif /* _PLASMA_WRAPPER_H */