Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plasma: Introduce PlasmaKVStore #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions engines/ep/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ INCLUDE(CheckTypeSize)
INCLUDE(CMakeDependentOption)
INCLUDE(CTest)

OPTION(EP_USE_PLASMA "Enable support for Plasma" ON)

CMAKE_DEPENDENT_OPTION(EP_USE_ROCKSDB "Enable support for RocksDB" ON
"ROCKSDB_INCLUDE_DIR;ROCKSDB_LIBRARIES" OFF)


INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/src
Expand Down Expand Up @@ -48,6 +51,19 @@ IF (EP_USE_ROCKSDB)
MESSAGE(STATUS "ep-engine: Using RocksDB")
ENDIF (EP_USE_ROCKSDB)

IF (EP_USE_PLASMA)
INCLUDE_DIRECTORIES(AFTER ${PLASMA_INCLUDE_DIR})
SET(PLASMA_KVSTORE_SOURCE src/plasma-kvstore/plasma-wrapper.c
src/plasma-kvstore/plasma-kvstore.cc
src/plasma-kvstore/plasma-kvstore_config.cc)
LIST(APPEND EP_STORAGE_LIBS plasma-core)
ADD_DEFINITIONS(-DEP_USE_PLASMA=1)
MESSAGE(STATUS "ep-engine: Using Plasma libs at " ${EP_STORAGE_LIBS})
ELSE (EP_USE_PLASMA)
MESSAGE(STATUS "ep-engine: NOT using plasma!!!" ${EP_USE_PLASMA}
" dir=" ${PLASMA_CORE_LIB} "include=" ${PLASMA_INCLUDE_DIR})
ENDIF (EP_USE_PLASMA)

INCLUDE_DIRECTORIES(AFTER SYSTEM
${gtest_SOURCE_DIR}/include
${gmock_SOURCE_DIR}/include)
Expand Down Expand Up @@ -248,6 +264,7 @@ ADD_LIBRARY(ep_objs OBJECT
${CONFIG_SOURCE}
${COUCH_KVSTORE_SOURCE}
${ROCKSDB_KVSTORE_SOURCE}
${PLASMA_KVSTORE_SOURCE}
${COLLECTIONS_SOURCE})
SET_PROPERTY(TARGET ep_objs PROPERTY POSITION_INDEPENDENT_CODE 1)
add_sanitizers(ep_objs)
Expand All @@ -257,7 +274,7 @@ ADD_LIBRARY(ep SHARED $<TARGET_OBJECTS:ep_objs>)
SET_TARGET_PROPERTIES(ep PROPERTIES PREFIX "")
TARGET_LINK_LIBRARIES(ep cJSON JSON_checker ${EP_STORAGE_LIBS}
engine_utilities dirutils cbcompress hdr_histogram_static
mcd_util platform phosphor xattr mcd_tracing
mcd_util platform phosphor xattr mcd_tracing sigar
${LIBEVENT_LIBRARIES})
add_sanitizers(ep)

Expand Down Expand Up @@ -334,7 +351,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
TARGET_LINK_LIBRARIES(ep-engine_ep_unit_tests ${EP_STORAGE_LIBS} cJSON
dirutils engine_utilities gtest gmock hdr_histogram_static
JSON_checker memcached_logger mcd_util mcd_tracing platform
phosphor xattr cbcompress ${MALLOC_LIBRARIES})
phosphor sigar xattr cbcompress ${MALLOC_LIBRARIES})
add_sanitizers(ep-engine_ep_unit_tests)

ADD_EXECUTABLE(ep-engine_atomic_ptr_test
Expand Down Expand Up @@ -396,6 +413,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
mcd_tracing
memcached_logger
phosphor
sigar
platform
xattr
${MALLOC_LIBRARIES}
Expand All @@ -420,7 +438,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
$<TARGET_OBJECTS:ep_objs>)
TARGET_LINK_LIBRARIES(ep-engine_sizes cJSON JSON_checker hdr_histogram_static
engine_utilities ${EP_STORAGE_LIBS} dirutils cbcompress platform mcd_util
mcd_tracing phosphor xattr ${LIBEVENT_LIBRARIES})
mcd_tracing phosphor sigar xattr ${LIBEVENT_LIBRARIES})
add_sanitizers(ep-engine_sizes)

ADD_LIBRARY(ep_testsuite SHARED
Expand All @@ -433,6 +451,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
TARGET_LINK_LIBRARIES(ep_testsuite engine_utilities mcd_util
${EP_STORAGE_LIBS} dirutils JSON_checker platform
xattr
sigar
${LIBEVENT_LIBRARIES})
ADD_DEPENDENCIES(ep_testsuite engine_testapp)

Expand All @@ -445,6 +464,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
)
SET_TARGET_PROPERTIES(ep_testsuite_basic PROPERTIES PREFIX "")
TARGET_LINK_LIBRARIES(ep_testsuite_basic engine_utilities JSON_checker dirutils
sigar
platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
ADD_DEPENDENCIES(ep_testsuite engine_testapp)

Expand All @@ -456,7 +476,9 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
tests/mock/mock_dcp.cc
)
SET_TARGET_PROPERTIES(ep_testsuite_dcp PROPERTIES PREFIX "")
TARGET_LINK_LIBRARIES(ep_testsuite_dcp engine_utilities cbcompress JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
TARGET_LINK_LIBRARIES(ep_testsuite_dcp engine_utilities cbcompress JSON_checker dirutils platform
sigar
${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
ADD_DEPENDENCIES(ep_testsuite_dcp engine_testapp)

ADD_LIBRARY(ep_testsuite_checkpoint SHARED
Expand All @@ -465,7 +487,9 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
tests/ep_test_apis.cc
src/ext_meta_parser.cc)
SET_TARGET_PROPERTIES(ep_testsuite_checkpoint PROPERTIES PREFIX "")
TARGET_LINK_LIBRARIES(ep_testsuite_checkpoint engine_utilities JSON_checker dirutils platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
TARGET_LINK_LIBRARIES(ep_testsuite_checkpoint engine_utilities JSON_checker dirutils
sigar
platform ${LIBEVENT_LIBRARIES} ${SNAPPY_LIBRARIES})
ADD_DEPENDENCIES(ep_testsuite_checkpoint engine_testapp)

ADD_LIBRARY(ep_testsuite_xdcr SHARED
Expand Down
57 changes: 54 additions & 3 deletions engines/ep/configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@
}
},
"backend": {
"default": "couchdb",
"default": "plasma",
"dynamic": false,
"type": "std::string",
"validator": {
"enum": [
"couchdb",
"rocksdb"
"rocksdb",
"plasma"
]
}
},
Expand Down Expand Up @@ -547,7 +548,7 @@
"type": "size_t"
},
"max_num_shards": {
"default": "4",
"default": "16",
"descr": "Maximum number of shards",
"dynamic": false,
"type": "size_t"
Expand Down Expand Up @@ -962,6 +963,56 @@
"descr": "RocksDB Universal-Compaction 'max_size_amplification_percent' option. The default value is the RocksDB internal default (200).",
"type": "size_t"
},
"plasma_mem_quota": {
"default": "2048",
"descr": "Plasma memory quota (in MB).",
"type": "size_t"
},
"plasma_enable_directio": {
"default": "true",
"descr": "Bypass OS page cache and do direct I/O.",
"type": "bool"
},
"plasma_kv_separation": {
"default": "true",
"descr": "Separate key and value blobs in plasma.",
"type": "bool"
},
"plasma_lss_clean_threshold": {
"default": "98",
"descr": "Log cleaning starts at this fragmentation threshold.",
"type": "size_t"
},
"plasma_lss_clean_max": {
"default": "99",
"descr": "Throttle incoming writes when fragmentation hits this threshold.",
"type": "size_t"
},
"plasma_delta_chain_len": {
"default": "50",
"descr": "Max number of items allowed in delta chain before page is compacted.",
"type": "size_t"
},
"plasma_base_page_items": {
"default": "50",
"descr": "Max number of items allowed in base page.",
"type": "size_t"
},
"plasma_lss_num_segments": {
"default": "4",
"descr": "Max number of segments a plasma page can be split in.",
"type": "size_t"
},
"plasma_sync_at": {
"default": "500",
"descr": "Time in milliseconds at which sync is forced (0 disables sync).",
"type": "size_t"
},
"plasma_enable_upsert": {
"default": "true",
"descr": "Allow updates to existing keys without a lookup & delete.",
"type": "bool"
},
"time_synchronization": {
"default": "disabled",
"descr": "No longer supported. This config parameter has no effect.",
Expand Down
6 changes: 5 additions & 1 deletion engines/ep/src/executorpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ size_t ExecutorPool::getNumReaders(void) {
return count;
}

size_t numShards;

ExecutorPool *ExecutorPool::get(void) {
auto* tmp = instance.load();
if (tmp == nullptr) {
Expand All @@ -140,6 +142,7 @@ ExecutorPool *ExecutorPool::get(void) {
config.getNumWriterThreads(),
config.getNumAuxioThreads(),
config.getNumNonioThreads());
numShards = config.getMaxNumShards();
ObjectRegistry::onSwitchThread(epe);
instance.store(tmp);
}
Expand Down Expand Up @@ -604,7 +607,8 @@ bool ExecutorPool::_startWorkers(void) {

if (!numWorkers[WRITER_TASK_IDX]) {
// MB-12279: Limit writers to 4 for faster bgfetches in DGM by default
numWriters = 4;
numWriters = numShards;
numReaders = numShards;
}

_adjustWorkers(READER_TASK_IDX, numReaders);
Expand Down
10 changes: 10 additions & 0 deletions engines/ep/src/kvshard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#ifdef EP_USE_ROCKSDB
#include "rocksdb-kvstore/rocksdb-kvstore_config.h"
#endif
#ifdef EP_USE_PLASMA
#include "plasma-kvstore/plasma-kvstore_config.h"
#endif

/* [EPHE TODO]: Consider not using KVShard for ephemeral bucket */
KVShard::KVShard(uint16_t id, Configuration& config)
Expand All @@ -46,6 +49,13 @@ KVShard::KVShard(uint16_t id, Configuration& config)
auto stores = KVStoreFactory::create(*kvConfig);
rwStore = std::move(stores.rw);
}
#endif
#ifdef EP_USE_PLASMA
else if (backend == "plasma") {
kvConfig = std::make_unique<PlasmaKVStoreConfig>(config, id);
auto stores = KVStoreFactory::create(*kvConfig);
rwStore = std::move(stores.rw);
}
#endif
else {
throw std::logic_error(
Expand Down
11 changes: 11 additions & 0 deletions engines/ep/src/kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include "rocksdb-kvstore/rocksdb-kvstore.h"
#include "rocksdb-kvstore/rocksdb-kvstore_config.h"
#endif
#ifdef EP_USE_PLASMA
#include "plasma-kvstore/plasma-kvstore.h"
#include "plasma-kvstore/plasma-kvstore_config.h"
#endif
#include "kvstore.h"
#include "kvstore_config.h"
#include "persistence_callback.h"
Expand Down Expand Up @@ -87,6 +91,13 @@ KVStoreRWRO KVStoreFactory::create(KVStoreConfig& config) {
dynamic_cast<RocksDBKVStoreConfig&>(config));
return {rw.release(), nullptr};
}
#endif
#ifdef EP_USE_PLASMA
else if (backend == "plasma") {
auto rw = std::make_unique<PlasmaKVStore>(
dynamic_cast<PlasmaKVStoreConfig&>(config));
return {rw.release(), nullptr};
}
#endif
else {
throw std::invalid_argument("KVStoreFactory::create unknown backend:" +
Expand Down
11 changes: 5 additions & 6 deletions engines/ep/src/kvstore_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ class KVStoreConfig {
Logger* logger;
bool buffered;
bool persistDocNamespace;

/**
* If non-zero, tell storage layer to issue a sync() operation after every
* N bytes written.
*/
uint64_t periodicSyncBytes;
/**
* If non-zero, tell storage layer to issue a sync() operation after every
* N bytes written.
*/
uint64_t periodicSyncBytes;
};
Loading