From ef6befa3fd0de1b36a70213a260d9deacf91a5cb Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sat, 18 Nov 2023 16:16:52 -0700 Subject: [PATCH] Use shared_ptr properly to allow for shared backing stores for ArrayBuffers that share the same memory --- src/cursor.cpp | 14 +++++++++----- src/dbi.cpp | 18 ++++++++++------- src/env.cpp | 50 +++++++++++++++++++++++++++++++++++++++--------- src/lmdb-js.h | 7 ++++++- test/threads.cjs | 8 ++++++-- 5 files changed, 73 insertions(+), 24 deletions(-) diff --git a/src/cursor.cpp b/src/cursor.cpp index cdebef13d3..fabcc391d0 100644 --- a/src/cursor.cpp +++ b/src/cursor.cpp @@ -103,12 +103,16 @@ int CursorWrap::returnEntry(int lastRC, MDB_val &key, MDB_val &data) { if (result) { fits = valToBinaryFast(data, dw); // it fit in the global/compression-target buffer } - //if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// if it was decompressed - *((uint32_t*)keyBuffer) = data.mv_size; - *((uint32_t*)(keyBuffer + 4)) = 0; // buffer id of 0 -/* } else { +#if ENABLE_V8_API + if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// if it was decompressed +#endif + *((uint32_t*)keyBuffer) = data.mv_size; + *((uint32_t*)(keyBuffer + 4)) = 0; // buffer id of 0 +#if ENABLE_V8_API + } else { EnvWrap::toSharedBuffer(dw->ew->env, (uint32_t*) dw->ew->keyBuffer, data); - }*/ + } +#endif } if (!(flags & VALUES_FOR_KEY)) { memcpy(keyBuffer + 32, key.mv_data, key.mv_size); diff --git a/src/dbi.cpp b/src/dbi.cpp index d2ca143c91..33243a3774 100644 --- a/src/dbi.cpp +++ b/src/dbi.cpp @@ -164,14 +164,18 @@ int32_t DbiWrap::doGetByBinary(uint32_t keySize, uint32_t ifNotTxnId, int64_t tx if (result) { fits = valToBinaryFast(data, this); // it fits in the global/compression-target buffer } - //if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// result = 2 if it was decompressed - if (data.mv_size < 0x80000000) - return data.mv_size; - *((uint32_t*)keyBuffer) = data.mv_size; - return -30000; - /*} else { +#if ENABLE_V8_API + if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// result = 2 if it was decompressed +#endif + if (data.mv_size < 0x80000000) + return data.mv_size; + *((uint32_t*)keyBuffer) = data.mv_size; + return -30000; +#if ENABLE_V8_API + } else { return EnvWrap::toSharedBuffer(ew->env, (uint32_t*) ew->keyBuffer, data); - }*/ + } +#endif } NAPI_FUNCTION(directWrite) { diff --git a/src/env.cpp b/src/env.cpp index afee5b3e54..376c4db765 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -8,13 +8,16 @@ using namespace Napi; #define IGNORE_NOTFOUND (1) - +#if ENABLE_V8_API +#include +#endif MDB_txn* ExtendedEnv::prefetchTxns[20]; pthread_mutex_t* ExtendedEnv::prefetchTxnsLock; env_tracking_t* EnvWrap::envTracking = EnvWrap::initTracking(); thread_local std::vector* EnvWrap::openEnvWraps = nullptr; thread_local js_buffers_t* EnvWrap::sharedBuffers = nullptr; +std::unordered_map> EnvWrap::backingStores; //thread_local std::unordered_map* EnvWrap::sharedBuffers = nullptr; void* getSharedBuffers() { return (void*) EnvWrap::sharedBuffers; @@ -444,11 +447,20 @@ NAPI_FUNCTION(setEnvsPointer) { RETURN_UNDEFINED; } -napi_finalize cleanupSharedExternal = [](napi_env env, void* data, void* buffer_info) { - // Data belongs to LMDB, we shouldn't free it here +napi_finalize cleanupLMDB = [](napi_env env, void* data, void* buffer_info) { + fprintf(stderr,"Cleanup called on LMDB chunk %p\n", data); +}; +void cleanupSharedMap(void* data, size_t length, void* deleter_data) { + // Data belongs to LMDB, we shouldn't free it here, but we do need to remove the reference + // to the backing store, since it longer exists + EnvWrap::backingStores.erase(data); + fprintf(stderr,"Cleanup called on LMDB chunk %p\n", data); +}; +napi_finalize cleanupExternal = [](napi_env env, void* data, void* buffer_info) { + fprintf(stderr,"Cleanup called on external chunk %p\n", data); }; -napi_finalize cleanupAllocatedExternal = [](napi_env env, void* data, void* buffer_info) { +void cleanupAllocatedExternal(void* data, size_t length, void* buffer_info) { int32_t id = ((buffer_info_t*) buffer_info)->id; pthread_mutex_lock(&EnvWrap::sharedBuffers->modification_lock); for (auto bufferRef = EnvWrap::sharedBuffers->buffers.begin(); bufferRef != EnvWrap::sharedBuffers->buffers.end();) { @@ -467,6 +479,7 @@ napi_finalize cleanupAllocatedExternal = [](napi_env env, void* data, void* buff NAPI_FUNCTION(getSharedBuffer) { ARGS(2) +#if ENABLE_V8_API int32_t bufferId; GET_UINT32_ARG(bufferId, 0); GET_INT64_ARG(1); @@ -477,7 +490,7 @@ NAPI_FUNCTION(getSharedBuffer) { char* start = bufferRef->first; buffer_info_t* buffer = &bufferRef->second; if (buffer->env == ew->env) { - //fprintf(stderr, "found exiting buffer for %u\n", bufferId); + fprintf(stderr, "found existing buffer for %u\n", bufferId); napi_get_reference_value(env, buffer->ref, &returnValue); pthread_mutex_unlock(&EnvWrap::sharedBuffers->modification_lock); return returnValue; @@ -485,6 +498,7 @@ NAPI_FUNCTION(getSharedBuffer) { if (buffer->env) { // if for some reason it is different env that didn't get cleaned up napi_value arrayBuffer; + fprintf(stderr, "Changing the env for %u\n", bufferId); napi_get_reference_value(env, buffer->ref, &arrayBuffer); napi_detach_arraybuffer(env, arrayBuffer); napi_delete_reference(env, buffer->ref); @@ -495,8 +509,25 @@ NAPI_FUNCTION(getSharedBuffer) { size_t size = end - start; if (size > 0x100000000) fprintf(stderr, "Getting invalid shared buffer size %llu from start: %llu to %end: %llu", size, start, end); - napi_create_external_arraybuffer(env, start, size, - buffer->isSharedMap ? cleanupSharedExternal : cleanupAllocatedExternal, (void*) buffer, &returnValue); + auto store_ref = EnvWrap::backingStores.find(start); + std::shared_ptr bs; + if (store_ref == EnvWrap::backingStores.end()) { + bs = v8::ArrayBuffer::NewBackingStore(start, size, buffer->isSharedMap ? cleanupSharedMap : cleanupAllocatedExternal, (void*) buffer); + // this is the most mysterious part, if we don't create an extra shared pointer to the backing store, it gets deleted + // even though the unordered_map is supposed to preserve a reference to it + auto permanent_pointer = new std::shared_ptr(bs); + EnvWrap::backingStores.emplace(start, bs); + //fprintf(stderr, "Creating a new backing (shared %u) store for %p %p\n", buffer->isSharedMap, start, bs.get()); + } else { + bs = store_ref->second; + //fprintf(stderr, "Reusing existing backing store for %p %p\n", start, bs.get()); + } + v8::Local ab = v8::ArrayBuffer::New(v8::Isolate::GetCurrent(), bs); + //fprintf(stderr, "Use count for backing store after %u\n", bs.use_count()); + returnValue = reinterpret_cast(*ab); + // TODO: We may want to enable this for Bun, it probably doesn't have the problems with shared external buffers that V8 does + /*napi_create_external_arraybuffer(env, start, size, + buffer->isSharedMap ? cleanupLMDB : cleanupExternal, (void*) buffer, &returnValue);*/ int64_t result; napi_create_reference(env, returnValue, 1, &buffer->ref); if (buffer->isSharedMap) { @@ -510,6 +541,9 @@ NAPI_FUNCTION(getSharedBuffer) { } } pthread_mutex_unlock(&EnvWrap::sharedBuffers->modification_lock); +#else + return throwError(env, "Can use shared buffers without V8 linking"); +#endif RETURN_UNDEFINED; } NAPI_FUNCTION(setTestRef) { @@ -668,8 +702,6 @@ void EnvWrap::closeEnv(bool hasLock) { for (auto bufferRef = EnvWrap::sharedBuffers->buffers.begin(); bufferRef != EnvWrap::sharedBuffers->buffers.end();) { if (bufferRef->second.env == env) { napi_value arrayBuffer; - napi_get_reference_value(napiEnv, bufferRef->second.ref, &arrayBuffer); - napi_detach_arraybuffer(napiEnv, arrayBuffer); napi_delete_reference(napiEnv, bufferRef->second.ref); int64_t result; if (bufferRef->second.id >= 0) diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 3dbe920393..1d05a00b70 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -7,6 +7,9 @@ #include #include #include +#if ENABLE_V8_API +#include +#endif #include "lmdb.h" #include "lz4.h" @@ -329,7 +332,9 @@ class EnvWrap : public ObjectWrap { pthread_mutex_t* writingLock; pthread_cond_t* writingCond; std::vector workers; - +#if ENABLE_V8_API + static std::unordered_map> backingStores; +#endif MDB_txn* currentReadTxn; WriteWorker* writeWorker; bool readTxnRenewed; diff --git a/test/threads.cjs b/test/threads.cjs index 98d43c5ed2..1ad93f6c2b 100644 --- a/test/threads.cjs +++ b/test/threads.cjs @@ -21,7 +21,11 @@ if (isMainThread) { var workerCount = Math.min(numCPUs * 2, 20); var value = {test: '48656c6c6f2c20776f726c6421'}; - + var str = 'this is supposed to be bigger than 16KB threshold for shared memory buffers'; + for (let i = 0; i < 9; i++) { + str += str; + } + var bigValue = {test: str}; // This will start as many workers as there are CPUs available. var workers = []; for (var i = 0; i < workerCount; i++) { @@ -53,7 +57,7 @@ if (isMainThread) { let last for (var i = 0; i < workers.length; i++) { - last = db.put('key' + i, value); + last = db.put('key' + i, i % 2 === 1 ? bigValue : value); } last.then(() => {