Skip to content

Commit

Permalink
Use shared_ptr properly to allow for shared backing stores for ArrayB…
Browse files Browse the repository at this point in the history
…uffers that share the same memory
  • Loading branch information
kriszyp committed Nov 18, 2023
1 parent 3e29586 commit ef6befa
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 24 deletions.
14 changes: 9 additions & 5 deletions src/cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,16 @@ int CursorWrap::returnEntry(int lastRC, MDB_val &key, MDB_val &data) {
if (result) {
fits = valToBinaryFast(data, dw); // it fit in the global/compression-target buffer
}
//if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// if it was decompressed
*((uint32_t*)keyBuffer) = data.mv_size;
*((uint32_t*)(keyBuffer + 4)) = 0; // buffer id of 0
/* } else {
#if ENABLE_V8_API
if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// if it was decompressed
#endif
*((uint32_t*)keyBuffer) = data.mv_size;
*((uint32_t*)(keyBuffer + 4)) = 0; // buffer id of 0
#if ENABLE_V8_API
} else {
EnvWrap::toSharedBuffer(dw->ew->env, (uint32_t*) dw->ew->keyBuffer, data);
}*/
}
#endif
}
if (!(flags & VALUES_FOR_KEY)) {
memcpy(keyBuffer + 32, key.mv_data, key.mv_size);
Expand Down
18 changes: 11 additions & 7 deletions src/dbi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,18 @@ int32_t DbiWrap::doGetByBinary(uint32_t keySize, uint32_t ifNotTxnId, int64_t tx
if (result) {
fits = valToBinaryFast(data, this); // it fits in the global/compression-target buffer
}
//if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// result = 2 if it was decompressed
if (data.mv_size < 0x80000000)
return data.mv_size;
*((uint32_t*)keyBuffer) = data.mv_size;
return -30000;
/*} else {
#if ENABLE_V8_API
if (fits || result == 2 || data.mv_size < SHARED_BUFFER_THRESHOLD) {// result = 2 if it was decompressed
#endif
if (data.mv_size < 0x80000000)
return data.mv_size;
*((uint32_t*)keyBuffer) = data.mv_size;
return -30000;
#if ENABLE_V8_API
} else {
return EnvWrap::toSharedBuffer(ew->env, (uint32_t*) ew->keyBuffer, data);
}*/
}
#endif
}

NAPI_FUNCTION(directWrite) {
Expand Down
50 changes: 41 additions & 9 deletions src/env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
using namespace Napi;

#define IGNORE_NOTFOUND (1)

#if ENABLE_V8_API
#include <v8.h>
#endif

MDB_txn* ExtendedEnv::prefetchTxns[20];
pthread_mutex_t* ExtendedEnv::prefetchTxnsLock;
env_tracking_t* EnvWrap::envTracking = EnvWrap::initTracking();
thread_local std::vector<EnvWrap*>* EnvWrap::openEnvWraps = nullptr;
thread_local js_buffers_t* EnvWrap::sharedBuffers = nullptr;
std::unordered_map<void*, std::shared_ptr<v8::BackingStore>> EnvWrap::backingStores;
//thread_local std::unordered_map<void*, buffer_info_t>* EnvWrap::sharedBuffers = nullptr;
void* getSharedBuffers() {
return (void*) EnvWrap::sharedBuffers;
Expand Down Expand Up @@ -444,11 +447,20 @@ NAPI_FUNCTION(setEnvsPointer) {
RETURN_UNDEFINED;
}

napi_finalize cleanupSharedExternal = [](napi_env env, void* data, void* buffer_info) {
// Data belongs to LMDB, we shouldn't free it here
napi_finalize cleanupLMDB = [](napi_env env, void* data, void* buffer_info) {
fprintf(stderr,"Cleanup called on LMDB chunk %p\n", data);
};
void cleanupSharedMap(void* data, size_t length, void* deleter_data) {
// Data belongs to LMDB, we shouldn't free it here, but we do need to remove the reference
// to the backing store, since it longer exists
EnvWrap::backingStores.erase(data);
fprintf(stderr,"Cleanup called on LMDB chunk %p\n", data);
};
napi_finalize cleanupExternal = [](napi_env env, void* data, void* buffer_info) {
fprintf(stderr,"Cleanup called on external chunk %p\n", data);
};

napi_finalize cleanupAllocatedExternal = [](napi_env env, void* data, void* buffer_info) {
void cleanupAllocatedExternal(void* data, size_t length, void* buffer_info) {
int32_t id = ((buffer_info_t*) buffer_info)->id;
pthread_mutex_lock(&EnvWrap::sharedBuffers->modification_lock);
for (auto bufferRef = EnvWrap::sharedBuffers->buffers.begin(); bufferRef != EnvWrap::sharedBuffers->buffers.end();) {
Expand All @@ -467,6 +479,7 @@ napi_finalize cleanupAllocatedExternal = [](napi_env env, void* data, void* buff

NAPI_FUNCTION(getSharedBuffer) {
ARGS(2)
#if ENABLE_V8_API
int32_t bufferId;
GET_UINT32_ARG(bufferId, 0);
GET_INT64_ARG(1);
Expand All @@ -477,14 +490,15 @@ NAPI_FUNCTION(getSharedBuffer) {
char* start = bufferRef->first;
buffer_info_t* buffer = &bufferRef->second;
if (buffer->env == ew->env) {
//fprintf(stderr, "found exiting buffer for %u\n", bufferId);
fprintf(stderr, "found existing buffer for %u\n", bufferId);
napi_get_reference_value(env, buffer->ref, &returnValue);
pthread_mutex_unlock(&EnvWrap::sharedBuffers->modification_lock);
return returnValue;
}
if (buffer->env) {
// if for some reason it is different env that didn't get cleaned up
napi_value arrayBuffer;
fprintf(stderr, "Changing the env for %u\n", bufferId);
napi_get_reference_value(env, buffer->ref, &arrayBuffer);
napi_detach_arraybuffer(env, arrayBuffer);
napi_delete_reference(env, buffer->ref);
Expand All @@ -495,8 +509,25 @@ NAPI_FUNCTION(getSharedBuffer) {
size_t size = end - start;
if (size > 0x100000000)
fprintf(stderr, "Getting invalid shared buffer size %llu from start: %llu to %end: %llu", size, start, end);
napi_create_external_arraybuffer(env, start, size,
buffer->isSharedMap ? cleanupSharedExternal : cleanupAllocatedExternal, (void*) buffer, &returnValue);
auto store_ref = EnvWrap::backingStores.find(start);
std::shared_ptr<v8::BackingStore> bs;
if (store_ref == EnvWrap::backingStores.end()) {
bs = v8::ArrayBuffer::NewBackingStore(start, size, buffer->isSharedMap ? cleanupSharedMap : cleanupAllocatedExternal, (void*) buffer);
// this is the most mysterious part, if we don't create an extra shared pointer to the backing store, it gets deleted
// even though the unordered_map is supposed to preserve a reference to it
auto permanent_pointer = new std::shared_ptr<v8::BackingStore>(bs);
EnvWrap::backingStores.emplace(start, bs);
//fprintf(stderr, "Creating a new backing (shared %u) store for %p %p\n", buffer->isSharedMap, start, bs.get());
} else {
bs = store_ref->second;
//fprintf(stderr, "Reusing existing backing store for %p %p\n", start, bs.get());
}
v8::Local<v8::ArrayBuffer> ab = v8::ArrayBuffer::New(v8::Isolate::GetCurrent(), bs);
//fprintf(stderr, "Use count for backing store after %u\n", bs.use_count());
returnValue = reinterpret_cast<napi_value>(*ab);
// TODO: We may want to enable this for Bun, it probably doesn't have the problems with shared external buffers that V8 does
/*napi_create_external_arraybuffer(env, start, size,
buffer->isSharedMap ? cleanupLMDB : cleanupExternal, (void*) buffer, &returnValue);*/
int64_t result;
napi_create_reference(env, returnValue, 1, &buffer->ref);
if (buffer->isSharedMap) {
Expand All @@ -510,6 +541,9 @@ NAPI_FUNCTION(getSharedBuffer) {
}
}
pthread_mutex_unlock(&EnvWrap::sharedBuffers->modification_lock);
#else
return throwError(env, "Can use shared buffers without V8 linking");
#endif
RETURN_UNDEFINED;
}
NAPI_FUNCTION(setTestRef) {
Expand Down Expand Up @@ -668,8 +702,6 @@ void EnvWrap::closeEnv(bool hasLock) {
for (auto bufferRef = EnvWrap::sharedBuffers->buffers.begin(); bufferRef != EnvWrap::sharedBuffers->buffers.end();) {
if (bufferRef->second.env == env) {
napi_value arrayBuffer;
napi_get_reference_value(napiEnv, bufferRef->second.ref, &arrayBuffer);
napi_detach_arraybuffer(napiEnv, arrayBuffer);
napi_delete_reference(napiEnv, bufferRef->second.ref);
int64_t result;
if (bufferRef->second.id >= 0)
Expand Down
7 changes: 6 additions & 1 deletion src/lmdb-js.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include <ctime>
#include <napi.h>
#include <node_api.h>
#if ENABLE_V8_API
#include <v8.h>
#endif

#include "lmdb.h"
#include "lz4.h"
Expand Down Expand Up @@ -329,7 +332,9 @@ class EnvWrap : public ObjectWrap<EnvWrap> {
pthread_mutex_t* writingLock;
pthread_cond_t* writingCond;
std::vector<AsyncWorker*> workers;

#if ENABLE_V8_API
static std::unordered_map<void*, std::shared_ptr<v8::BackingStore>> backingStores;
#endif
MDB_txn* currentReadTxn;
WriteWorker* writeWorker;
bool readTxnRenewed;
Expand Down
8 changes: 6 additions & 2 deletions test/threads.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ if (isMainThread) {

var workerCount = Math.min(numCPUs * 2, 20);
var value = {test: '48656c6c6f2c20776f726c6421'};

var str = 'this is supposed to be bigger than 16KB threshold for shared memory buffers';
for (let i = 0; i < 9; i++) {
str += str;
}
var bigValue = {test: str};
// This will start as many workers as there are CPUs available.
var workers = [];
for (var i = 0; i < workerCount; i++) {
Expand Down Expand Up @@ -53,7 +57,7 @@ if (isMainThread) {

let last
for (var i = 0; i < workers.length; i++) {
last = db.put('key' + i, value);
last = db.put('key' + i, i % 2 === 1 ? bigValue : value);
}

last.then(() => {
Expand Down

0 comments on commit ef6befa

Please sign in to comment.