From 258e4db96c17007dbbb74b2f854340dee8f38764 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 22 Aug 2023 22:14:45 -0600 Subject: [PATCH 01/33] Broader check for closed env --- read.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/read.js b/read.js index cefabefe4..af8820049 100644 --- a/read.js +++ b/read.js @@ -700,14 +700,14 @@ export function addReadMethods(LMDBStore, { return new Uint8Array(buffer, offset, size); } function renewReadTxn(store) { + if (!env.address) { + throw new Error('Can not renew a transaction from a closed database'); + } if (!readTxn) { let retries = 0; let waitArray; do { try { - if (!env.address) { - throw new Error('Can not renew a transaction from a closed database'); - } let lastReadTxn = lastReadTxnRef && lastReadTxnRef.deref(); readTxn = new Txn(env, 0x20000, lastReadTxn && !lastReadTxn.isDone && lastReadTxn); if (readTxn.address == 0) { From 54a3bce70e075f50fe792ba12f09419d2088406b Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 22 Aug 2023 22:15:24 -0600 Subject: [PATCH 02/33] Add support for inserting timestamps --- src/lmdb-js.h | 1 + src/misc.cpp | 17 +++++++++++++++++ src/writer.cpp | 9 +++++++++ 3 files changed, 27 insertions(+) diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 40a14e8ce..342086633 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -18,6 +18,7 @@ using namespace Napi; // set the threshold of when to use shared buffers (for uncompressed entries larger than this value) const size_t SHARED_BUFFER_THRESHOLD = 0x4000; +const uint64_t REPLACE_WITH_TIMESTAMP = htonll(0x00f140a979fd0932); #ifndef __CPTHREAD_H__ #define __CPTHREAD_H__ diff --git a/src/misc.cpp b/src/misc.cpp index 35a65d513..947bf6737 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -436,6 +436,11 @@ int putWithVersion(MDB_txn * txn, int rc = mdb_put(txn, dbi, key, data, flags | MDB_RESERVE); if (rc == 0) { // if put is successful, data->mv_data will point into the database where we copy the data to + if (*(uint64_t*)key & 0xffffffffffffff === REPLACE_WITH_TIMESTAMP) { + *(uint64_t*)key = key & 0x100000000000000 ? last_time_double() : next_time_double(); + } + + if (*(uint64_t*)sourceData === REPLACE_WITH_TXN_ID) *(uint64_t*)sourceData = htonll(mdb_txn_id(txn)); memcpy((char*) data->mv_data + 8, sourceData, size); memcpy(data->mv_data, &version, 8); //*((double*) data->mv_data) = version; // this doesn't work on ARM v7 because it is not (guaranteed) memory-aligned @@ -566,6 +571,18 @@ uint64_t get_time64() { clock_gettime(CLOCK_MONOTONIC, &time); return time.tv_sec * 1000000000ll + time.tv_nsec; } +static double last_time; +uint64_t next_time_double() { + struct timespec time; + clock_gettime(CLOCK_MONOTONIC, &time); + double next_time = (double)time.tv_sec * 1000 + time.tv_nsec / 1000000; + if (next_time == last_time) + *((*uint64_t)&next_time)++; + return htonll(last_time = next_time); +} +uint64_t last_time_double() { + return htonll(last_time); +} #endif // This file contains code from the node-lmdb project diff --git a/src/writer.cpp b/src/writer.cpp index b8ce807a7..fb63dd57d 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -17,6 +17,12 @@ inline value? #ifndef _WIN32 #include #endif +#ifdef _WIN32 +#define ntohl _byteswap_ulong +#define htonl _byteswap_ulong +#else +#include +#endif // flags: const uint32_t NO_INSTRUCTION_YET = 0; @@ -281,6 +287,9 @@ next_inst: start = instruction++; } goto next_inst; case PUT: + if (*(uint64_t*)key & 0xffffffffffffff === REPLACE_WITH_TIMESTAMP) { + *(uint64_t*)key = key & 0x100000000000000 ? last_time_double() : next_time_double(); + } if (flags & SET_VERSION) rc = putWithVersion(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP), setVersion); else From 581fc6405295baf31a03d3872927f587e614a556 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 27 Aug 2023 16:33:20 -0600 Subject: [PATCH 03/33] Add ability to do generated timestamp insertion --- src/lmdb-js.h | 4 +++- src/misc.cpp | 36 +++++++++++++++++++++--------------- src/writer.cpp | 21 ++++++++++++++++++--- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 342086633..d6efbd887 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -18,7 +18,7 @@ using namespace Napi; // set the threshold of when to use shared buffers (for uncompressed entries larger than this value) const size_t SHARED_BUFFER_THRESHOLD = 0x4000; -const uint64_t REPLACE_WITH_TIMESTAMP = htonll(0x00f140a979fd0932); +const uint64_t REPLACE_WITH_TIMESTAMP = 0x3209fd79a940f100ull; #ifndef __CPTHREAD_H__ #define __CPTHREAD_H__ @@ -79,6 +79,8 @@ int pthread_cond_broadcast(pthread_cond_t *cond); const uint64_t TICKS_PER_SECOND = 1000000000; #endif uint64_t get_time64(); +uint64_t next_time_double(); +uint64_t last_time_double(); int cond_init(pthread_cond_t *cond); int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, uint64_t ns); diff --git a/src/misc.cpp b/src/misc.cpp index 947bf6737..47379f6dc 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -5,6 +5,13 @@ #include #include +#ifdef _WIN32 + #define bswap_64(x) _byteswap_uint64(x) + // may need to add apple: https://stackoverflow.com/questions/41770887/cross-platform-definition-of-byteswap-uint64-and-byteswap-ulong/46137633#46137633 +#else + #include // bswap_64 +#endif + using namespace Napi; static thread_local char* globalUnsafePtr; @@ -424,28 +431,27 @@ Napi::Value throwError(Napi::Env env, const char* message) { return env.Undefined(); } +const int ASSIGN_NEXT_TIMESTAMP = 0; +const int ASSGIN_LAST_TIMESTAMP = 1; +const int ASSGIN_NEXT_TIMESTAMP_AND_RECORD_PREVIOUS = 2; +const int ASSGIN_PREVIOUS_TIMESTAMP = 3; int putWithVersion(MDB_txn * txn, MDB_dbi dbi, MDB_val * key, MDB_val * data, unsigned int flags, double version) { // leave 8 header bytes available for version and copy in with reserved memory - char* sourceData = (char*) data->mv_data; + char* source_data = (char*) data->mv_data; int size = data->mv_size; data->mv_size = size + 8; int rc = mdb_put(txn, dbi, key, data, flags | MDB_RESERVE); if (rc == 0) { // if put is successful, data->mv_data will point into the database where we copy the data to - if (*(uint64_t*)key & 0xffffffffffffff === REPLACE_WITH_TIMESTAMP) { - *(uint64_t*)key = key & 0x100000000000000 ? last_time_double() : next_time_double(); - } - - if (*(uint64_t*)sourceData === REPLACE_WITH_TXN_ID) *(uint64_t*)sourceData = htonll(mdb_txn_id(txn)); - memcpy((char*) data->mv_data + 8, sourceData, size); + memcpy((char*) data->mv_data + 8, source_data, size); memcpy(data->mv_data, &version, 8); //*((double*) data->mv_data) = version; // this doesn't work on ARM v7 because it is not (guaranteed) memory-aligned } - data->mv_data = sourceData; // restore this so that if it points to data that needs to be freed, it points to the right place + data->mv_data = source_data; // restore this so that if it points to data that needs to be freed, it points to the right place return rc; } @@ -571,17 +577,17 @@ uint64_t get_time64() { clock_gettime(CLOCK_MONOTONIC, &time); return time.tv_sec * 1000000000ll + time.tv_nsec; } -static double last_time; +static uint64_t last_time; // actually encoded as double uint64_t next_time_double() { struct timespec time; - clock_gettime(CLOCK_MONOTONIC, &time); - double next_time = (double)time.tv_sec * 1000 + time.tv_nsec / 1000000; - if (next_time == last_time) - *((*uint64_t)&next_time)++; - return htonll(last_time = next_time); + clock_gettime(CLOCK_REALTIME, &time); + double next_time = (double)time.tv_sec * 1000 + (double)time.tv_nsec / 1000000; + uint64_t next_time_int = *((uint64_t*)&next_time); + if (next_time_int == last_time) next_time_int++; + return bswap_64(last_time = next_time_int); } uint64_t last_time_double() { - return htonll(last_time); + return bswap_64(last_time); } #endif diff --git a/src/writer.cpp b/src/writer.cpp index fb63dd57d..0d95976ae 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -57,7 +57,7 @@ const int IF_NO_EXISTS = MDB_NOOVERWRITE; //0x10; const int FAILED_CONDITION = 0x4000000; const int FINISHED_OPERATION = 0x1000000; const double ANY_VERSION = 3.542694326329068e-103; // special marker for any version - +static uint64_t previous_time; WriteWorker::~WriteWorker() { // TODO: Make sure this runs on the JS main thread, or we need to move it @@ -287,9 +287,24 @@ next_inst: start = instruction++; } goto next_inst; case PUT: - if (*(uint64_t*)key & 0xffffffffffffff === REPLACE_WITH_TIMESTAMP) { - *(uint64_t*)key = key & 0x100000000000000 ? last_time_double() : next_time_double(); + if ((*(uint64_t*)key.mv_data & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { + *(uint64_t*)key.mv_data = (*(uint64_t*)key.mv_data & 0x1) ? last_time_double() : next_time_double(); + } + { + uint64_t first_word = *(uint64_t*)value.mv_data; + if ((first_word & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { + fprintf(stdout, "replace with timestamp %u \n", first_word & 0xff); + if ((first_word & 3) == 2) { + // preserve last timestamp + MDB_val last_data; + mdb_get(txn, dbi, &key, &last_data); + previous_time = *(uint64_t*) last_data.mv_data; + } + uint64_t timestamp = (first_word & 1) ? (first_word & 2) ? previous_time : last_time_double() : next_time_double(); + *(uint64_t*)value.mv_data = timestamp ^ (first_word & 0xf8); + } } + if (flags & SET_VERSION) rc = putWithVersion(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP), setVersion); else From 515eeceef6ba5730f605d4b7dfb52b679bfb0783 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 29 Aug 2023 10:30:39 -0600 Subject: [PATCH 04/33] Make timestamp assignment be explicit --- src/writer.cpp | 23 ++++++++++++++++------- write.js | 2 ++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/writer.cpp b/src/writer.cpp index 0d95976ae..bd6917bad 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -43,6 +43,7 @@ const int CONDITIONAL = 8; const int CONDITIONAL_VERSION = 0x100; const int CONDITIONAL_VERSION_LESS_THAN = 0x800; const int CONDITIONAL_ALLOW_NOTFOUND = 0x1000; +const int ASSIGN_TIMESTAMP = 0x2000; const int SET_VERSION = 0x200; //const int HAS_INLINE_VALUE = 0x400; const int COMPRESSIBLE = 0x100000; @@ -183,7 +184,7 @@ next_inst: start = instruction++; MDB_dbi dbi = 0; //fprintf(stderr, "do %u %u\n", flags, get_time64()); bool validated = conditionDepth == validatedDepth; - if (flags & 0xf0c0) { + if (flags & 0xc0c0) { fprintf(stderr, "Unknown flag bits %u %p\n", flags, start); fprintf(stderr, "flags after message %u\n", *start); worker->resultCode = 22; @@ -287,21 +288,29 @@ next_inst: start = instruction++; } goto next_inst; case PUT: - if ((*(uint64_t*)key.mv_data & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { - *(uint64_t*)key.mv_data = (*(uint64_t*)key.mv_data & 0x1) ? last_time_double() : next_time_double(); - } - { + if (flags & ASSIGN_TIMESTAMP) { + if ((*(uint64_t*)key.mv_data & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { + *(uint64_t*)key.mv_data = (*(uint64_t*)key.mv_data & 0x1) ? last_time_double() : next_time_double(); + } uint64_t first_word = *(uint64_t*)value.mv_data; + // 0 assign new time + // 1 assign last assigned time + // 3 assign last recorded previous time + // 4 record previous time if ((first_word & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { - fprintf(stdout, "replace with timestamp %u \n", first_word & 0xff); - if ((first_word & 3) == 2) { + fprintf(stderr, "timestamping\n"); + if (first_word & 4) { // preserve last timestamp MDB_val last_data; mdb_get(txn, dbi, &key, &last_data); + if (flags & SET_VERSION) last_data.mv_data = (char*)last_data.mv_data + 8; previous_time = *(uint64_t*) last_data.mv_data; + fprintf(stderr, "previous time %llx \n", previous_time); } uint64_t timestamp = (first_word & 1) ? (first_word & 2) ? previous_time : last_time_double() : next_time_double(); + fprintf(stderr, "setting timestamp %llx\n", timestamp ^ (first_word & 0xf8)); *(uint64_t*)value.mv_data = timestamp ^ (first_word & 0xf8); + fprintf(stderr, "set time %llx \n", timestamp); } } diff --git a/write.js b/write.js index 5406e0687..5a13a36a6 100644 --- a/write.js +++ b/write.js @@ -597,6 +597,8 @@ export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, use flags |= 0x10; if (versionOrOptions.noDupData) flags |= 0x20; + if (versionOrOptions.assignTimestamp) + flags |= 0x2000; if (versionOrOptions.append) flags |= 0x20000; if (versionOrOptions.ifVersion != undefined) From 672ba8d6d597c4237f8a28c1f70b566249b22009 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 29 Aug 2023 10:30:53 -0600 Subject: [PATCH 05/33] Basic lmdb level support for direct writes --- dependencies/lmdb/libraries/liblmdb/lmdb.h | 1 + dependencies/lmdb/libraries/liblmdb/mdb.c | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/dependencies/lmdb/libraries/liblmdb/lmdb.h b/dependencies/lmdb/libraries/liblmdb/lmdb.h index 217acb619..66bafde28 100644 --- a/dependencies/lmdb/libraries/liblmdb/lmdb.h +++ b/dependencies/lmdb/libraries/liblmdb/lmdb.h @@ -1447,6 +1447,7 @@ int mdb_set_relctx(MDB_txn *txn, MDB_dbi dbi, void *ctx); * */ int mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, mdb_size_t *txn_id); +int mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data); int mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data); diff --git a/dependencies/lmdb/libraries/liblmdb/mdb.c b/dependencies/lmdb/libraries/liblmdb/mdb.c index 7c2e7b002..68fa6d8a4 100644 --- a/dependencies/lmdb/libraries/liblmdb/mdb.c +++ b/dependencies/lmdb/libraries/liblmdb/mdb.c @@ -7922,6 +7922,25 @@ mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi, MDB_CURSOR_UNREF(&mc, 1); return rc; } +int +mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, + MDB_val *key, MDB_val *data) +{ + MDB_val existing_data; + int rc = mdb_get_with_txn(txn, dbi, key, &existing_data, NULL); + if (rc == 0) { + if (data->mv_size > existing_data.mv_size) { + last_error = malloc(100); + sprintf(last_error, "Attempt to direct write beyond the size of the value"); + return EINVAL; + } + MDB_env* env = txn->mt_env; + mdb_size_t file_offset = (char*)existing_data.mv_data - env->me_map; + int written = pwrite(env->me_fd, data->mv_data, data->mv_size, file_offset); + if (written < 0) rc = written; + } + return rc; +} /** Find a sibling for a page. * Replaces the page at the top of the cursor's stack with the From c80aabefb68936c43dfaf2cff0adb7617031163f Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 3 Sep 2023 21:41:58 -0600 Subject: [PATCH 06/33] Take out logging --- src/writer.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/writer.cpp b/src/writer.cpp index bd6917bad..1a9fad03c 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -298,19 +298,17 @@ next_inst: start = instruction++; // 3 assign last recorded previous time // 4 record previous time if ((first_word & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { - fprintf(stderr, "timestamping\n"); if (first_word & 4) { // preserve last timestamp MDB_val last_data; mdb_get(txn, dbi, &key, &last_data); if (flags & SET_VERSION) last_data.mv_data = (char*)last_data.mv_data + 8; previous_time = *(uint64_t*) last_data.mv_data; - fprintf(stderr, "previous time %llx \n", previous_time); + //fprintf(stderr, "previous time %llx \n", previous_time); } uint64_t timestamp = (first_word & 1) ? (first_word & 2) ? previous_time : last_time_double() : next_time_double(); - fprintf(stderr, "setting timestamp %llx\n", timestamp ^ (first_word & 0xf8)); *(uint64_t*)value.mv_data = timestamp ^ (first_word & 0xf8); - fprintf(stderr, "set time %llx \n", timestamp); + //fprintf(stderr, "set time %llx \n", timestamp); } } From 99fc429c02d433ee07331655ab4867770a7c9d88 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 3 Sep 2023 22:24:57 -0600 Subject: [PATCH 07/33] Update version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 837954cdd..9e1a12ca7 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.8.4", + "version": "2.9.0.timestamp-1", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { From baa1ac116236c64675a8e15f37616ef7a98ef884 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 4 Sep 2023 08:16:21 -0600 Subject: [PATCH 08/33] Cross platform timestamp assignment and tests and plans for compression --- index.js | 1 + src/compression.cpp | 2 ++ src/lmdb-js.h | 1 + src/misc.cpp | 30 ++++++++++++++++++++++++++++-- test/index.test.js | 34 +++++++++++++++++++++++++++++++++- 5 files changed, 65 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index f181c14dd..a12872aa1 100644 --- a/index.js +++ b/index.js @@ -25,6 +25,7 @@ import { levelup } from './level.js'; export { clearKeptObjects } from './native.js'; import { nativeAddon } from './native.js'; export let { noop } = nativeAddon; +export const TIMESTAMP_PLACEHOLDER = new Uint8Array([0x00, 0xf1, 0x40, 0xa9, 0x79, 0xfd, 0x09, 0x32]); export { open, openAsClass, getLastVersion, allDbs, getLastTxnId } from './open.js'; import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary'; import { open, openAsClass, getLastVersion } from './open.js'; diff --git a/src/compression.cpp b/src/compression.cpp index 1fcd41a87..285c1449c 100644 --- a/src/compression.cpp +++ b/src/compression.cpp @@ -48,6 +48,7 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { int compressionHeaderSize; uint32_t compressedLength = data.mv_size; unsigned char* charData = (unsigned char*) data.mv_data; + // TODO: Use offset here if (charData[0] == 254) { uncompressedLength = ((uint32_t)charData[1] << 16) | ((uint32_t)charData[2] << 8) | (uint32_t)charData[3]; @@ -126,6 +127,7 @@ argtokey_callback_t Compression::compress(MDB_val* value, void (*freeValue)(MDB_ if (!stream) stream = LZ4_createStream(); LZ4_loadDict(stream, compressDictionary, dictionarySize); + // TODO: Add in offset here int compressedSize = LZ4_compress_fast_continue(stream, data, compressed + prefixSize, dataLength, maxCompressedSize, acceleration); if (compressedSize > 0) { if (freeValue) diff --git a/src/lmdb-js.h b/src/lmdb-js.h index d6efbd887..2a804bc6b 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -544,6 +544,7 @@ class Compression : public ObjectWrap { char* decompressTarget; unsigned int decompressSize; unsigned int compressionThreshold; + unsigned int startingOffset; // compression can be configured to start compression at a certain offset, so header bytes are left uncompressed. // compression acceleration (defaults to 1) int acceleration; static thread_local LZ4_stream_t* stream; diff --git a/src/misc.cpp b/src/misc.cpp index 47379f6dc..5ded61d7a 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -5,9 +5,12 @@ #include #include +// if we need to add others: https://stackoverflow.com/questions/41770887/cross-platform-definition-of-byteswap-uint64-and-byteswap-ulong/46137633#46137633 #ifdef _WIN32 #define bswap_64(x) _byteswap_uint64(x) - // may need to add apple: https://stackoverflow.com/questions/41770887/cross-platform-definition-of-byteswap-uint64-and-byteswap-ulong/46137633#46137633 +#elif defined(__APPLE__) + #include + #define bswap_64(x) OSSwapInt64(x) #else #include // bswap_64 #endif @@ -455,6 +458,7 @@ int putWithVersion(MDB_txn * txn, return rc; } +static uint64_t last_time; // actually encoded as double #ifdef _WIN32 @@ -545,6 +549,29 @@ int pthread_cond_broadcast(pthread_cond_t *cond) uint64_t get_time64() { return GetTickCount64(); } +// from: https://github.com/wadey/node-microtime/blob/master/src/microtime.cc#L19 +// Pick GetSystemTimePreciseAsFileTime or GetSystemTimeAsFileTime depending +// on which is available at runtime. +typedef VOID(WINAPI *WinGetSystemTime)(LPFILETIME); +static WinGetSystemTime getSystemTime = NULL; + +uint64_t next_time_double() { + FILETIME ft; + (*getSystemTime)(&ft); + unsigned long long t = ft.dwHighDateTime; + t <<= 32; + t |= ft.dwLowDateTime; + t /= 10; + t -= 11644473600000000ULL; + double next_time = (double)t/ 1000; + uint64_t next_time_int = *((uint64_t*)&next_time); + if (next_time_int == last_time) next_time_int++; + return bswap_64 (last_time = next_time_int); +} +uint64_t last_time_double() { + return _byteswap_uint64 (last_time); +} + #else int cond_init(pthread_cond_t *cond) { @@ -577,7 +604,6 @@ uint64_t get_time64() { clock_gettime(CLOCK_MONOTONIC, &time); return time.tv_sec * 1000000000ll + time.tv_nsec; } -static uint64_t last_time; // actually encoded as double uint64_t next_time_double() { struct timespec time; clock_gettime(CLOCK_REALTIME, &time); diff --git a/test/index.test.js b/test/index.test.js index d5012214c..27b7410b5 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -12,7 +12,7 @@ import inspector from 'inspector' //inspector.open(9229, null, true); debugger let nativeMethods, dirName = dirname(fileURLToPath(import.meta.url)) -import { open, levelup, bufferToKeyValue, keyValueToBuffer, asBinary, ABORT, IF_EXISTS } from '../node-index.js'; +import { open, levelup, bufferToKeyValue, keyValueToBuffer, asBinary, ABORT, IF_EXISTS, TIMESTAMP_PLACEHOLDER } from '../node-index.js'; import { createRequire } from 'module'; const require = createRequire(import.meta.url); // we don't always test CJS because it messes up debugging in webstorm (and I am not about to give the awesomeness @@ -1308,6 +1308,38 @@ describe('lmdb-js', function() { }) should.equal(dbRAS.get(3).name, 'three'); }) + + it('assign timestamps', async function() { + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-timestamp', + encoding: 'binary' + })); + let value = Buffer.alloc(16, 3); + TIMESTAMP_PLACEHOLDER[0] = 0; + value.set(TIMESTAMP_PLACEHOLDER); + await dbBinary.put(1, value, { + assignTimestamp: true, + }); + let returnedValue = dbBinary.get(1); + let dataView = new DataView(returnedValue.buffer, 0, 16); + let assignedTimestamp = dataView.getFloat64(0); + should.equal(assignedTimestamp + 100000 > Date.now(), true); + should.equal(assignedTimestamp - 100000 < Date.now(), true); + should.equal(returnedValue[9], 3); + + value = Buffer.alloc(16, 3); + TIMESTAMP_PLACEHOLDER[0] = 1; // assign previous + value.set(TIMESTAMP_PLACEHOLDER); + + await dbBinary.put(1, value, { + assignTimestamp: true, + }); + returnedValue = dbBinary.get(1); + dataView = new DataView(returnedValue.buffer, 0, 16); + should.equal(assignedTimestamp, dataView.getFloat64(0)); + should.equal(returnedValue[9], 3); + }) + it('can backup and use backup', async function() { if (options.encryptionKey) // it won't match the environment return; From 8169ef31830acea316a692ee05acf389a9767680 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 4 Sep 2023 08:17:55 -0600 Subject: [PATCH 09/33] Fix write file for Windows --- dependencies/lmdb/libraries/liblmdb/mdb.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dependencies/lmdb/libraries/liblmdb/mdb.c b/dependencies/lmdb/libraries/liblmdb/mdb.c index 68fa6d8a4..d251f885a 100644 --- a/dependencies/lmdb/libraries/liblmdb/mdb.c +++ b/dependencies/lmdb/libraries/liblmdb/mdb.c @@ -7936,7 +7936,16 @@ mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, } MDB_env* env = txn->mt_env; mdb_size_t file_offset = (char*)existing_data.mv_data - env->me_map; + +#ifdef _WIN32 + DWORD written; + OVERLAPPED ov; + memset(&ov, 0, sizeof(ov)); + ov.Offset = file_offset; + rc = WriteFile(env->me_fd, data->mv_data, data->mv_size, &written, &ov); +#else int written = pwrite(env->me_fd, data->mv_data, data->mv_size, file_offset); +#endif; if (written < 0) rc = written; } return rc; From ea84f1b3d7ae583809ccfb6df51b51501e6b8692 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 4 Sep 2023 08:30:10 -0600 Subject: [PATCH 10/33] Just use newer Windows API --- src/misc.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/misc.cpp b/src/misc.cpp index 5ded61d7a..eccc9dcc7 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -550,14 +550,10 @@ uint64_t get_time64() { return GetTickCount64(); } // from: https://github.com/wadey/node-microtime/blob/master/src/microtime.cc#L19 -// Pick GetSystemTimePreciseAsFileTime or GetSystemTimeAsFileTime depending -// on which is available at runtime. -typedef VOID(WINAPI *WinGetSystemTime)(LPFILETIME); -static WinGetSystemTime getSystemTime = NULL; uint64_t next_time_double() { FILETIME ft; - (*getSystemTime)(&ft); + GetSystemTimePreciseAsFileTime(&ft); unsigned long long t = ft.dwHighDateTime; t <<= 32; t |= ft.dwLowDateTime; From a1d5dcdf9b4ecea20f26afd1db87304f1f5cec5a Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 4 Sep 2023 11:01:36 -0600 Subject: [PATCH 11/33] Try separate builds --- .github/workflows/prebuild.yml | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/.github/workflows/prebuild.yml b/.github/workflows/prebuild.yml index fc9a29166..bd8cf3098 100644 --- a/.github/workflows/prebuild.yml +++ b/.github/workflows/prebuild.yml @@ -56,7 +56,7 @@ jobs: if: startsWith(github.ref, 'refs/tags/') with: files: prebuild-win32.tar - build-centos-7: + build-centos-7-musl: if: startsWith(github.ref, 'refs/tags/') env: LMDB_DATA_V1: ${{ contains(github.ref, '-v1') }} @@ -98,6 +98,30 @@ jobs: PREBUILD_ARCH: x64 CC: ${PWD}/x86_64-linux-musl-native/bin/x86_64-linux-musl-gcc CXX: ${PWD}/x86_64-linux-musl-native/bin/x86_64-linux-musl-g++ + - run: npm run build-js + - run: chmod 777 test + - run: npm test + if: ${{ !contains(github.ref, '-v1') }} + - run: tar --create --verbose --file=prebuild-linux.tar -C prebuilds . + - name: Prebuild + if: startsWith(github.ref, 'refs/tags/') + uses: softprops/action-gh-release@v1 + with: + files: prebuild-linux-musl.tar + build-centos-7: + if: startsWith(github.ref, 'refs/tags/') + env: + LMDB_DATA_V1: ${{ contains(github.ref, '-v1') }} + runs-on: ubuntu-20.04 + container: quay.io/pypa/manylinux2014_x86_64 + steps: + - uses: actions/checkout@v3 + - name: Setup node + uses: actions/setup-node@v3 + with: + node-version: 16 + - run: yum update -y && yum install -y python3 + - run: npm install - run: npm run prebuild-libc - run: ls prebuilds/linux-x64 #- run: cp prebuilds/linux-x64/node.abi93.glibc.node prebuilds/linux-x64/node.abi92.glibc.node From 63aa8eed9075ed4839cb72ad5248b407a19aa590 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 4 Sep 2023 18:17:27 -0600 Subject: [PATCH 12/33] Try prebuild docker image --- .github/workflows/prebuild.yml | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/.github/workflows/prebuild.yml b/.github/workflows/prebuild.yml index bd8cf3098..601812308 100644 --- a/.github/workflows/prebuild.yml +++ b/.github/workflows/prebuild.yml @@ -70,7 +70,7 @@ jobs: node-version: 16 - run: yum update -y && yum install -y python3 - run: curl https://raw.githubusercontent.com/kriszyp/musl-bins/main/aarch64-linux-musl-cross.tgz --insecure --output aarch64-linux-musl-cross.tgz - - run: tar -xf aarch64-linux-musl-cross.tgz && pwd && ls + - run: tar -xf aarch64-linux-musl-cross.tgz || cat aarch64-linux-musl-cross.tgz - run: curl https://raw.githubusercontent.com/kriszyp/musl-bins/main/armv7l-linux-musleabihf-cross.tgz --insecure --output armv7l-linux-musleabihf-cross.tgz - run: tar -xf armv7l-linux-musleabihf-cross.tgz && pwd && ls - run: curl https://raw.githubusercontent.com/kriszyp/musl-bins/main/x86_64-linux-musl-native.tgz --insecure --output x86_64-linux-musl-native.tgz @@ -108,6 +108,30 @@ jobs: uses: softprops/action-gh-release@v1 with: files: prebuild-linux-musl.tar + build-alpine: + #if: startsWith(github.ref, 'refs/tags/') + env: + LMDB_DATA_V1: ${{ contains(github.ref, '-v1') }} + runs-on: ubuntu-20.04 + container: ghcr.io/prebuild/alpine + steps: + - run: npm install + - run: npm run prebuild-libc-musl + - run: ls prebuilds/linux-x64 + #- run: cp prebuilds/linux-x64/node.abi93.glibc.node prebuilds/linux-x64/node.abi92.glibc.node + #- run: npm run prebuildify + # env: + # ENABLE_FAST_API_CALLS: true + - run: npm run build-js + - run: chmod 777 test + - run: npm test + if: ${{ !contains(github.ref, '-v1') }} + - run: tar --create --verbose --file=prebuild-linux.tar -C prebuilds . + - name: Prebuild + if: startsWith(github.ref, 'refs/tags/') + uses: softprops/action-gh-release@v1 + with: + files: prebuild-linux-alpine.tar build-centos-7: if: startsWith(github.ref, 'refs/tags/') env: From 910c058ca55afc668d52c800a8f27a8e052f8d0e Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 5 Sep 2023 09:27:29 -0600 Subject: [PATCH 13/33] Allow offset for compression --- src/compression.cpp | 28 ++++++++++++++++++---------- src/misc.cpp | 8 ++++---- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/compression.cpp b/src/compression.cpp index 285c1449c..7d3ca1892 100644 --- a/src/compression.cpp +++ b/src/compression.cpp @@ -9,6 +9,7 @@ Compression::Compression(const CallbackInfo& info) : ObjectWrap(inf unsigned int compressionThreshold = 1000; char* dictionary = nullptr; size_t dictSize = 0; + unsigned int startingOffset = 0; if (info[0].IsObject()) { auto dictionaryOption = info[0].As().Get("dictionary"); if (!dictionaryOption.IsUndefined()) { @@ -20,10 +21,13 @@ Compression::Compression(const CallbackInfo& info) : ObjectWrap(inf dictSize = (dictSize >> 3) << 3; // make sure it is word-aligned } auto thresholdOption = info[0].As().Get("threshold"); - if (thresholdOption.IsNumber()) { + if (thresholdOption.IsNumber()) compressionThreshold = thresholdOption.As(); - } + auto offsetOption = info[0].As().Get("startingOffset"); + if (offsetOption.IsNumber()) + startingOffset = offsetOption.As(); } + this->startingOffset = startingOffset; this->dictionary = this->compressDictionary = dictionary; this->dictionarySize = dictSize; this->decompressTarget = dictionary + dictSize; @@ -47,8 +51,8 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { uint32_t uncompressedLength; int compressionHeaderSize; uint32_t compressedLength = data.mv_size; - unsigned char* charData = (unsigned char*) data.mv_data; - // TODO: Use offset here + void* originalData = data.mv_data; + unsigned char* charData = (unsigned char*) data.mv_data + startingOffset; if (charData[0] == 254) { uncompressedLength = ((uint32_t)charData[1] << 16) | ((uint32_t)charData[2] << 8) | (uint32_t)charData[3]; @@ -66,7 +70,7 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { return; } data.mv_data = decompressTarget; - data.mv_size = uncompressedLength; + data.mv_size = uncompressedLength + startingOffset; //TODO: For larger blocks with known encoding, it might make sense to allocate space for it and use an ExternalString //fprintf(stdout, "compressed size %u uncompressedLength %u, first byte %u\n", data.mv_size, uncompressedLength, charData[compressionHeaderSize]); if (uncompressedLength > decompressSize) { @@ -74,8 +78,8 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { return; } int written = LZ4_decompress_safe_usingDict( - (char*)charData + compressionHeaderSize, decompressTarget, - compressedLength - compressionHeaderSize, decompressSize, + (char*)charData + compressionHeaderSize, decompressTarget + startingOffset, + compressedLength - compressionHeaderSize - startingOffset, decompressSize, dictionary, dictionarySize); //fprintf(stdout, "first uncompressed byte %X %X %X %X %X %X\n", uncompressedData[0], uncompressedData[1], uncompressedData[2], uncompressedData[3], uncompressedData[4], uncompressedData[5]); if (written < 0) { @@ -88,6 +92,8 @@ void Compression::decompress(MDB_val& data, bool &isValid, bool canAllocate) { isValid = false; return; } + if (startingOffset) + memcpy(decompressTarget, originalData, startingOffset); isValid = true; } @@ -115,12 +121,12 @@ int Compression::compressInstruction(EnvWrap* env, double* compressionAddress) { } argtokey_callback_t Compression::compress(MDB_val* value, void (*freeValue)(MDB_val&)) { - size_t dataLength = value->mv_size; + size_t dataLength = value->mv_size - startingOffset; char* data = (char*)value->mv_data; if (value->mv_size < compressionThreshold && !(value->mv_size > 0 && ((uint8_t*)data)[0] >= 250)) return freeValue; // don't compress if less than threshold (but we must compress if the first byte is the compression indicator) bool longSize = dataLength >= 0x1000000; - int prefixSize = (longSize ? 8 : 4); + int prefixSize = (longSize ? 8 : 4) + startingOffset; int maxCompressedSize = LZ4_COMPRESSBOUND(dataLength); char* compressed = new char[maxCompressedSize + prefixSize]; //fprintf(stdout, "compressing %u\n", dataLength); @@ -130,9 +136,11 @@ argtokey_callback_t Compression::compress(MDB_val* value, void (*freeValue)(MDB_ // TODO: Add in offset here int compressedSize = LZ4_compress_fast_continue(stream, data, compressed + prefixSize, dataLength, maxCompressedSize, acceleration); if (compressedSize > 0) { + if (startingOffset > 0) // copy the uncompressed prefix + memcpy(compressed, data, startingOffset); if (freeValue) freeValue(*value); - uint8_t* compressedData = (uint8_t*)compressed; + uint8_t* compressedData = (uint8_t*)compressed + startingOffset; if (longSize) { compressedData[0] = 255; compressedData[2] = (uint8_t)(dataLength >> 40u); diff --git a/src/misc.cpp b/src/misc.cpp index eccc9dcc7..c124077a0 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -123,7 +123,7 @@ int getVersionAndUncompress(MDB_val &data, DbiWrap* dw) { if (data.mv_size == 0) { return 1;// successFunc(data); } - unsigned char statusByte = dw->compression ? charData[0] : 0; + unsigned char statusByte = dw->compression ? charData[dw->compression->startingOffset] : 0; //fprintf(stdout, "uncompressing status %X\n", statusByte); if (statusByte >= 250) { bool isValid; @@ -435,9 +435,9 @@ Napi::Value throwError(Napi::Env env, const char* message) { } const int ASSIGN_NEXT_TIMESTAMP = 0; -const int ASSGIN_LAST_TIMESTAMP = 1; -const int ASSGIN_NEXT_TIMESTAMP_AND_RECORD_PREVIOUS = 2; -const int ASSGIN_PREVIOUS_TIMESTAMP = 3; +const int ASSIGN_LAST_TIMESTAMP = 1; +const int ASSIGN_NEXT_TIMESTAMP_AND_RECORD_PREVIOUS = 2; +const int ASSIGN_PREVIOUS_TIMESTAMP = 3; int putWithVersion(MDB_txn * txn, MDB_dbi dbi, MDB_val * key, From 50d942c1a5addeb376d0e270529fe790586c2b89 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 5 Sep 2023 09:28:10 -0600 Subject: [PATCH 14/33] Preliminary support for direct write --- dependencies/lmdb/libraries/liblmdb/lmdb.h | 2 +- dependencies/lmdb/libraries/liblmdb/mdb.c | 9 ++-- index.js | 4 +- src/lmdb-js.h | 4 +- src/writer.cpp | 59 +++++++++++++++++----- test/index.test.js | 56 +++++++++++++++++--- write.js | 2 +- 7 files changed, 111 insertions(+), 25 deletions(-) diff --git a/dependencies/lmdb/libraries/liblmdb/lmdb.h b/dependencies/lmdb/libraries/liblmdb/lmdb.h index 66bafde28..d1218de3a 100644 --- a/dependencies/lmdb/libraries/liblmdb/lmdb.h +++ b/dependencies/lmdb/libraries/liblmdb/lmdb.h @@ -1447,7 +1447,7 @@ int mdb_set_relctx(MDB_txn *txn, MDB_dbi dbi, void *ctx); * */ int mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data, mdb_size_t *txn_id); -int mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data); +int mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, unsigned int offset, MDB_val *data); int mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data); diff --git a/dependencies/lmdb/libraries/liblmdb/mdb.c b/dependencies/lmdb/libraries/liblmdb/mdb.c index d251f885a..aa7029cce 100644 --- a/dependencies/lmdb/libraries/liblmdb/mdb.c +++ b/dependencies/lmdb/libraries/liblmdb/mdb.c @@ -7924,7 +7924,7 @@ mdb_get_with_txn(MDB_txn *txn, MDB_dbi dbi, } int mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, - MDB_val *key, MDB_val *data) + MDB_val *key, unsigned int offset, MDB_val *data) { MDB_val existing_data; int rc = mdb_get_with_txn(txn, dbi, key, &existing_data, NULL); @@ -7935,8 +7935,11 @@ mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, return EINVAL; } MDB_env* env = txn->mt_env; - mdb_size_t file_offset = (char*)existing_data.mv_data - env->me_map; - + mdb_size_t file_offset = (char*)existing_data.mv_data - env->me_map + offset; + // a direct write can only be safely atomically applied to a memory map if it fits into + // single word, verify that here: + if (file_offset >> 3 != (file_offset + data->mv_size - 1) >> 3) + return -1; #ifdef _WIN32 DWORD written; OVERLAPPED ov; diff --git a/index.js b/index.js index a12872aa1..69b957233 100644 --- a/index.js +++ b/index.js @@ -25,13 +25,15 @@ import { levelup } from './level.js'; export { clearKeptObjects } from './native.js'; import { nativeAddon } from './native.js'; export let { noop } = nativeAddon; -export const TIMESTAMP_PLACEHOLDER = new Uint8Array([0x00, 0xf1, 0x40, 0xa9, 0x79, 0xfd, 0x09, 0x32]); +export const TIMESTAMP_PLACEHOLDER = new Uint8Array([1,1,1,1,0,0,0,0]); +export const DIRECT_WRITE_PLACEHOLDER = new Uint8Array([1,1,1,2,0,0,0,0]); export { open, openAsClass, getLastVersion, allDbs, getLastTxnId } from './open.js'; import { toBufferKey as keyValueToBuffer, compareKeys as compareKey, fromBufferKey as bufferToKeyValue } from 'ordered-binary'; import { open, openAsClass, getLastVersion } from './open.js'; export const TransactionFlags = { ABORTABLE: 1, SYNCHRONOUS_COMMIT: 2, + NO_SYNC_FLUSH: 0x10000, }; export default { diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 2a804bc6b..97d69451b 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -18,7 +18,9 @@ using namespace Napi; // set the threshold of when to use shared buffers (for uncompressed entries larger than this value) const size_t SHARED_BUFFER_THRESHOLD = 0x4000; -const uint64_t REPLACE_WITH_TIMESTAMP = 0x3209fd79a940f100ull; +const uint32_t SPECIAL_WRITE = 0x10101; +const uint32_t REPLACE_WITH_TIMESTAMP = 0x1000000; +const uint32_t DIRECT_WRITE = 0x2000000; #ifndef __CPTHREAD_H__ #define __CPTHREAD_H__ diff --git a/src/writer.cpp b/src/writer.cpp index 1a9fad03c..d0efcedd9 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -289,26 +289,61 @@ next_inst: start = instruction++; goto next_inst; case PUT: if (flags & ASSIGN_TIMESTAMP) { - if ((*(uint64_t*)key.mv_data & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { - *(uint64_t*)key.mv_data = (*(uint64_t*)key.mv_data & 0x1) ? last_time_double() : next_time_double(); + if ((*(uint64_t*)key.mv_data & 0xfffffffful) == REPLACE_WITH_TIMESTAMP) { + *(uint64_t*)key.mv_data = ((*(uint64_t*)key.mv_data >> 32) & 0x1) ? last_time_double() : next_time_double(); } uint64_t first_word = *(uint64_t*)value.mv_data; // 0 assign new time // 1 assign last assigned time // 3 assign last recorded previous time // 4 record previous time - if ((first_word & 0xffffffffffffff00ull) == REPLACE_WITH_TIMESTAMP) { - if (first_word & 4) { - // preserve last timestamp + if ((first_word & 0xffffff) == SPECIAL_WRITE) { + if (first_word & REPLACE_WITH_TIMESTAMP) { + uint32_t next_32 = first_word >> 32; + if (next_32 & 4) { + // preserve last timestamp + MDB_val last_data; + mdb_get(txn, dbi, &key, &last_data); + if (flags & SET_VERSION) last_data.mv_data = (char *) last_data.mv_data + 8; + previous_time = *(uint64_t *) last_data.mv_data; + fprintf(stderr, "previous time %llx \n", previous_time); + } + uint64_t timestamp = (next_32 & 1) ? (next_32 & 2) ? previous_time : last_time_double() + : next_time_double(); + if (first_word & DIRECT_WRITE) { + // write to second word, which is used by the direct write + *((uint64_t *) value.mv_data + 1) = timestamp ^ (next_32 >> 8); + first_word = first_word & 0xffffffff; // clear out the offset so it is just zero (always must be at the beginning) + } else + *(uint64_t *) value.mv_data = timestamp ^ (next_32 >> 8); + fprintf(stderr, "set time %llx \n", timestamp); + } + if (first_word & DIRECT_WRITE) { + // direct in-place write + unsigned int offset = first_word >> 32; + if (flags & SET_VERSION) + offset += 8; + MDB_val bytes_to_write; + bytes_to_write.mv_data = (char*)value.mv_data + 8; + bytes_to_write.mv_size = value.mv_size - 8; + rc = mdb_direct_write(txn, dbi, &key, offset, &bytes_to_write); + if (!rc) break; // success + // if no success, this means we probably weren't able to write to a single + // word safely, so we need to do a real put MDB_val last_data; - mdb_get(txn, dbi, &key, &last_data); - if (flags & SET_VERSION) last_data.mv_data = (char*)last_data.mv_data + 8; - previous_time = *(uint64_t*) last_data.mv_data; - //fprintf(stderr, "previous time %llx \n", previous_time); + rc = mdb_get(txn, dbi, &key, &last_data); + if (rc) break; // failed to get + bytes_to_write.mv_size = last_data.mv_size; + // attempt a put, using reserve (so we can efficiently copy data in) + rc = mdb_put(txn, dbi, &key, &bytes_to_write, (flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP)) | MDB_RESERVE); + if (!rc) { + // copy the existing data + memcpy(bytes_to_write.mv_data, last_data.mv_data, last_data.mv_size); + // copy the changes + memcpy((char*)bytes_to_write.mv_data + offset, (char*)value.mv_data + 8, value.mv_size - 8); + } + break; // done } - uint64_t timestamp = (first_word & 1) ? (first_word & 2) ? previous_time : last_time_double() : next_time_double(); - *(uint64_t*)value.mv_data = timestamp ^ (first_word & 0xf8); - //fprintf(stderr, "set time %llx \n", timestamp); } } diff --git a/test/index.test.js b/test/index.test.js index 27b7410b5..b2d49dd78 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -12,7 +12,7 @@ import inspector from 'inspector' //inspector.open(9229, null, true); debugger let nativeMethods, dirName = dirname(fileURLToPath(import.meta.url)) -import { open, levelup, bufferToKeyValue, keyValueToBuffer, asBinary, ABORT, IF_EXISTS, TIMESTAMP_PLACEHOLDER } from '../node-index.js'; +import { open, levelup, bufferToKeyValue, keyValueToBuffer, asBinary, ABORT, IF_EXISTS, TIMESTAMP_PLACEHOLDER, DIRECT_WRITE_PLACEHOLDER } from '../node-index.js'; import { createRequire } from 'module'; const require = createRequire(import.meta.url); // we don't always test CJS because it messes up debugging in webstorm (and I am not about to give the awesomeness @@ -1315,10 +1315,10 @@ describe('lmdb-js', function() { encoding: 'binary' })); let value = Buffer.alloc(16, 3); - TIMESTAMP_PLACEHOLDER[0] = 0; value.set(TIMESTAMP_PLACEHOLDER); + value[4] = 0; await dbBinary.put(1, value, { - assignTimestamp: true, + instructedWrite: true, }); let returnedValue = dbBinary.get(1); let dataView = new DataView(returnedValue.buffer, 0, 16); @@ -1328,17 +1328,61 @@ describe('lmdb-js', function() { should.equal(returnedValue[9], 3); value = Buffer.alloc(16, 3); - TIMESTAMP_PLACEHOLDER[0] = 1; // assign previous value.set(TIMESTAMP_PLACEHOLDER); + value[4] = 1; // assign previous await dbBinary.put(1, value, { - assignTimestamp: true, + instructedWrite: true, }); returnedValue = dbBinary.get(1); dataView = new DataView(returnedValue.buffer, 0, 16); should.equal(assignedTimestamp, dataView.getFloat64(0)); should.equal(returnedValue[9], 3); - }) + }); + + it('direct write', async function() { + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-direct', + encoding: 'binary', + compression: { // options.trackMetrics: true, + threshold: 40, + startingOffset: 16, + } + })); + let value = Buffer.alloc(100, 4); + await dbBinary.put(1, value, { + instructedWrite: true, + }); + + // this should usually accomplish in-place write + let returnedValue = dbBinary.get(1); + should.equal(returnedValue[2], 4); + value = Buffer.alloc(12, 3); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + value.set([1,2,3,4], 8); + + await dbBinary.put(1, value, { + instructedWrite: true, + }); + returnedValue = dbBinary.get(1); + const expected = Buffer.alloc(100, 4); + expected.set([1,2,3,4], 2); + returnedValue.should.deep.equal(expected); + + // this should always trigger the full put operation + value = Buffer.alloc(18, 3); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + value.set([1,2,3,4,5,6,7,8,9,10], 8); + + await dbBinary.put(1, value, { + instructedWrite: true, + }); + returnedValue = dbBinary.get(1); + expected.set([1,2,3,4,5,6,7,8,9,10], 2); + returnedValue.should.deep.equal(expected); + }); it('can backup and use backup', async function() { if (options.encryptionKey) // it won't match the environment diff --git a/write.js b/write.js index 5a13a36a6..b53dca252 100644 --- a/write.js +++ b/write.js @@ -597,7 +597,7 @@ export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, use flags |= 0x10; if (versionOrOptions.noDupData) flags |= 0x20; - if (versionOrOptions.assignTimestamp) + if (versionOrOptions.instructedWrite) flags |= 0x2000; if (versionOrOptions.append) flags |= 0x20000; From d877e7f01873f02493adf367d401cdd7d3bb6484 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 5 Sep 2023 09:58:36 -0600 Subject: [PATCH 15/33] Remove failed prebuild attempt --- .github/workflows/prebuild.yml | 24 ------------------------ package.json | 6 +++--- 2 files changed, 3 insertions(+), 27 deletions(-) diff --git a/.github/workflows/prebuild.yml b/.github/workflows/prebuild.yml index 601812308..f0628066c 100644 --- a/.github/workflows/prebuild.yml +++ b/.github/workflows/prebuild.yml @@ -108,30 +108,6 @@ jobs: uses: softprops/action-gh-release@v1 with: files: prebuild-linux-musl.tar - build-alpine: - #if: startsWith(github.ref, 'refs/tags/') - env: - LMDB_DATA_V1: ${{ contains(github.ref, '-v1') }} - runs-on: ubuntu-20.04 - container: ghcr.io/prebuild/alpine - steps: - - run: npm install - - run: npm run prebuild-libc-musl - - run: ls prebuilds/linux-x64 - #- run: cp prebuilds/linux-x64/node.abi93.glibc.node prebuilds/linux-x64/node.abi92.glibc.node - #- run: npm run prebuildify - # env: - # ENABLE_FAST_API_CALLS: true - - run: npm run build-js - - run: chmod 777 test - - run: npm test - if: ${{ !contains(github.ref, '-v1') }} - - run: tar --create --verbose --file=prebuild-linux.tar -C prebuilds . - - name: Prebuild - if: startsWith(github.ref, 'refs/tags/') - uses: softprops/action-gh-release@v1 - with: - files: prebuild-linux-alpine.tar build-centos-7: if: startsWith(github.ref, 'refs/tags/') env: diff --git a/package.json b/package.json index 9e1a12ca7..897238136 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.9.0.timestamp-1", + "version": "2.9.0-timestamp.1", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { @@ -68,7 +68,7 @@ "prebuild-win32": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.15.0 && prebuildify-platform-packages --target 16.18.0 && set ENABLE_V8_FUNCTIONS=false&& prebuildify-platform-packages --napi --platform-packages --target 16.18.0", "prebuild-libc-arm7": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 16.18.0", "prebuildify": "prebuildify-platform-packages --napi --target 18.15.0", - "full-publish": "cd prebuilds/win32-x64 && npm publish --access public && cd ../darwin-x64 && npm publish --access public && cd ../darwin-arm64 && npm publish --access public && cd ../linux-x64 && npm publish --access public && cd ../linux-arm64 && npm publish --access public && cd ../linux-arm && npm publish --access public && cd ../.. && npm publish", + "full-publish": "cd prebuilds/win32-x64 && npm publish --tag dev --access public && cd ../darwin-x64 && npm publish --tag dev --access public && cd ../darwin-arm64 && npm publish --tag dev --access public && cd ../linux-x64 && npm publish --tag dev --access public && cd ../linux-arm64 && npm publish --tag dev --access public && cd ../linux-arm && npm publish --tag dev --access public && cd ../.. && npm publish --tag dev", "recompile": "node-gyp clean && node-gyp configure && node-gyp build", "test": "mocha test/**.test.js --expose-gc --recursive", "deno-test": "deno run --allow-ffi --allow-write --allow-read --allow-env --allow-net --unstable test/deno.ts", @@ -105,5 +105,5 @@ "test": "tests" }, "optionalDependencies": { - } + } } \ No newline at end of file From ab2dc3d67db7572800c0e93856f859fc8e1c721c Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Tue, 5 Sep 2023 20:09:40 -0600 Subject: [PATCH 16/33] Fix test for key timestamp assignment --- src/lmdb-js.h | 3 ++- src/writer.cpp | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 97d69451b..da7a58c5e 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -19,7 +19,8 @@ using namespace Napi; // set the threshold of when to use shared buffers (for uncompressed entries larger than this value) const size_t SHARED_BUFFER_THRESHOLD = 0x4000; const uint32_t SPECIAL_WRITE = 0x10101; -const uint32_t REPLACE_WITH_TIMESTAMP = 0x1000000; +const uint32_t REPLACE_WITH_TIMESTAMP_FLAG = 0x1000000; +const uint32_t REPLACE_WITH_TIMESTAMP = 0x1010101; const uint32_t DIRECT_WRITE = 0x2000000; #ifndef __CPTHREAD_H__ diff --git a/src/writer.cpp b/src/writer.cpp index d0efcedd9..7561b4180 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -298,7 +298,7 @@ next_inst: start = instruction++; // 3 assign last recorded previous time // 4 record previous time if ((first_word & 0xffffff) == SPECIAL_WRITE) { - if (first_word & REPLACE_WITH_TIMESTAMP) { + if (first_word & REPLACE_WITH_TIMESTAMP_FLAG) { uint32_t next_32 = first_word >> 32; if (next_32 & 4) { // preserve last timestamp @@ -306,7 +306,7 @@ next_inst: start = instruction++; mdb_get(txn, dbi, &key, &last_data); if (flags & SET_VERSION) last_data.mv_data = (char *) last_data.mv_data + 8; previous_time = *(uint64_t *) last_data.mv_data; - fprintf(stderr, "previous time %llx \n", previous_time); + //fprintf(stderr, "previous time %llx \n", previous_time); } uint64_t timestamp = (next_32 & 1) ? (next_32 & 2) ? previous_time : last_time_double() : next_time_double(); @@ -316,7 +316,7 @@ next_inst: start = instruction++; first_word = first_word & 0xffffffff; // clear out the offset so it is just zero (always must be at the beginning) } else *(uint64_t *) value.mv_data = timestamp ^ (next_32 >> 8); - fprintf(stderr, "set time %llx \n", timestamp); + //fprintf(stderr, "set time %llx \n", timestamp); } if (first_word & DIRECT_WRITE) { // direct in-place write From dbdf2185e19bdf74fecb64a763bf711a4692169f Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Wed, 6 Sep 2023 21:06:10 -0600 Subject: [PATCH 17/33] Update version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 897238136..305420ef7 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.9.0-timestamp.1", + "version": "2.9.0-timestamp.2", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { From cac72e16cfc91502c92c39801616e2693cf5e999 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 7 Sep 2023 13:46:15 -0600 Subject: [PATCH 18/33] Make sure we don't attempt direct writes on encrypted stores, and more trust in atomic file writing --- dependencies/lmdb/libraries/liblmdb/mdb.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dependencies/lmdb/libraries/liblmdb/mdb.c b/dependencies/lmdb/libraries/liblmdb/mdb.c index aa7029cce..d85c5512e 100644 --- a/dependencies/lmdb/libraries/liblmdb/mdb.c +++ b/dependencies/lmdb/libraries/liblmdb/mdb.c @@ -7926,6 +7926,7 @@ int mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, unsigned int offset, MDB_val *data) { + if (txn->mt_env->me_flags & MDB_REMAP_CHUNKS) return -1; MDB_val existing_data; int rc = mdb_get_with_txn(txn, dbi, key, &existing_data, NULL); if (rc == 0) { @@ -7936,10 +7937,10 @@ mdb_direct_write(MDB_txn *txn, MDB_dbi dbi, } MDB_env* env = txn->mt_env; mdb_size_t file_offset = (char*)existing_data.mv_data - env->me_map + offset; - // a direct write can only be safely atomically applied to a memory map if it fits into - // single word, verify that here: - if (file_offset >> 3 != (file_offset + data->mv_size - 1) >> 3) - return -1; + // if we discover that a direct write can only be safely atomically applied to a memory map if it fits into + // single word, verify that here on some OSes, we can apply logic here: + //if (file_offset >> 3 != (file_offset + data->mv_size - 1) >> 3) + // return -1; #ifdef _WIN32 DWORD written; OVERLAPPED ov; From 56aa5b2d2670d06f84bdcbc5bc1536afe154f27e Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 7 Sep 2023 13:49:54 -0600 Subject: [PATCH 19/33] Add tearing tests --- src/writer.cpp | 4 +- test/index.test.js | 95 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/src/writer.cpp b/src/writer.cpp index 7561b4180..7e3f90272 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -326,8 +326,10 @@ next_inst: start = instruction++; MDB_val bytes_to_write; bytes_to_write.mv_data = (char*)value.mv_data + 8; bytes_to_write.mv_size = value.mv_size - 8; +#ifdef MDB_RPAGE_CACHE rc = mdb_direct_write(txn, dbi, &key, offset, &bytes_to_write); if (!rc) break; // success +#endif // if no success, this means we probably weren't able to write to a single // word safely, so we need to do a real put MDB_val last_data; @@ -346,14 +348,12 @@ next_inst: start = instruction++; } } } - if (flags & SET_VERSION) rc = putWithVersion(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP), setVersion); else rc = mdb_put(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP)); if (flags & COMPRESSIBLE) delete value.mv_data; - //fprintf(stdout, "put %u \n", key.mv_size); break; case DEL: rc = mdb_del(txn, dbi, &key, nullptr); diff --git a/test/index.test.js b/test/index.test.js index b2d49dd78..6983dba34 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1384,6 +1384,101 @@ describe('lmdb-js', function() { returnedValue.should.deep.equal(expected); }); + it.skip('large direct write tearing', async function() { + // this test is for checking whether direct reads and writes cause memory "tearing" + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-direct-big', + encoding: 'binary', + compression: false, + })); + let value = Buffer.alloc(0x5000, 4); + await dbBinary.put(1, value); + let f64 = new Float64Array(1); + let u8 = new Uint8Array(f64.buffer, 0, 8); + for (let i = 0; i < 10000; i++) { + // this should usually accomplish in-place write + let returnedValue = dbBinary.get(1); + let updated_byte = i % 200; + value = Buffer.alloc(32, updated_byte); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + let promise = dbBinary.put(1, value, { + instructedWrite: true, + }); + await new Promise(resolve => setImmediate(resolve)); + returnedValue = dbBinary.get(1); + let dataView = new DataView(returnedValue.buffer, returnedValue.byteOffset, returnedValue.byteLength); + //let livef64 = new Float64Array(returnedValue.buffer, returnedValue.byteOffset, + // returnedValue.byteLength/8); + let j = 0; + let k = 0; + detect_change: do { + j++; + while(true) { + let a = dataView.getFloat64(6); + let b = dataView.getFloat64(6); + if (a === b) { + f64[0] = a; + break; + } + } + + for (k = 0; k < 8; k++) { + if (u8[k] === updated_byte) + break detect_change; + } + }while(j < 1000); + if (u8[0] !== u8[7]) + console.log(j, k, u8); + } + }); + + it.skip('small direct write tearing', async function() { + // this test is for checking whether direct reads and writes cause memory "tearing" + let dbBinary = db.openDB(Object.assign({ + name: 'mydb-direct-small', + encoding: 'binary', + compression: false, + })); + let f64 = new Float64Array(1); + let u8 = new Uint8Array(f64.buffer, 0, 8); + for (let i = 0; i < 100000; i++) { + /*for (let j = 0; j < 100;j++) { + dbBinary.put(Math.random(), Buffer.alloc(Math.random() * 10)); // keep the offset random + }*/ + let value = Buffer.alloc(16, 4); + await dbBinary.put(1, value); + + // this should usually accomplish in-place write + let returnedValue = dbBinary.get(1); + let updated_byte = i % 200; + value = Buffer.alloc(16, updated_byte); + value.set(DIRECT_WRITE_PLACEHOLDER); + value[4] = 2; + let promise = dbBinary.put(1, value, { + instructedWrite: true, + }); + await new Promise(resolve => setImmediate(resolve)); + let j = 0; + let k = 0; + returnedValue = dbBinary.getBinaryFast(1); + let dataView = new DataView(returnedValue.buffer, returnedValue.byteOffset, returnedValue.byteLength); + detect_change: do { + returnedValue = dbBinary.getBinaryFast(1); + //let livef64 = new Float64Array(returnedValue.buffer, returnedValue.byteOffset, + // returnedValue.byteLength/8); + j++; + for (k = 2; k < 10; k++) { + if (returnedValue[k] === updated_byte) + break detect_change; + } + }while(j < 1000); + if (returnedValue[2] !== returnedValue[9]) + console.log(j, k, returnedValue); + } + }); + + it('can backup and use backup', async function() { if (options.encryptionKey) // it won't match the environment return; From d202101a5c0139319727870b3c106b83e56ba340 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sat, 9 Sep 2023 07:18:27 -0600 Subject: [PATCH 20/33] Add direct write function --- native.js | 3 ++- package.json | 4 ++-- read.js | 13 ++++++++++++- src/dbi.cpp | 30 ++++++++++++++++++++++++++++++ src/env.cpp | 4 ++-- src/writer.cpp | 3 ++- 6 files changed, 50 insertions(+), 7 deletions(-) diff --git a/native.js b/native.js index fe462e7e5..16a51054f 100644 --- a/native.js +++ b/native.js @@ -1,7 +1,7 @@ import { dirname, join, default as pathModule } from 'path'; import { fileURLToPath } from 'url'; import loadNAPI from 'node-gyp-build-optional-packages'; -export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress; +export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress, directWrite; path = pathModule; let dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, ''); export let nativeAddon = loadNAPI(dirName); @@ -61,6 +61,7 @@ export function setNativeFunctions(externals) { iterate = externals.iterate; position = externals.position; resetTxn = externals.resetTxn; + directWrite = externals.directWrite; getCurrentValue = externals.getCurrentValue; getCurrentShared = externals.getCurrentShared; getStringByBinary = externals.getStringByBinary; diff --git a/package.json b/package.json index 305420ef7..641c8d10f 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.9.0-timestamp.2", + "version": "2.9.0-timestamp.3", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { @@ -78,7 +78,7 @@ }, "gypfile": true, "dependencies": { - "msgpackr": "^1.9.5", + "msgpackr": "^1.9.9", "node-addon-api": "^6.1.0", "node-gyp-build-optional-packages": "5.1.1", "ordered-binary": "^1.4.1", diff --git a/read.js b/read.js index af8820049..5e893dd9e 100644 --- a/read.js +++ b/read.js @@ -1,5 +1,5 @@ import { RangeIterable } from './util/RangeIterable.js'; -import { getAddress, Cursor, Txn, orderedBinary, lmdbError, getByBinary, setGlobalBuffer, prefetch, iterate, position as doPosition, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, globalBuffer, getSharedBuffer, startRead, setReadCallback } from './native.js'; +import { getAddress, Cursor, Txn, orderedBinary, lmdbError, getByBinary, setGlobalBuffer, prefetch, iterate, position as doPosition, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, globalBuffer, getSharedBuffer, startRead, setReadCallback, directWrite } from './native.js'; import { saveKey } from './keys.js'; const IF_EXISTS = 3.542694326329068e-103; const ITERATOR_DONE = { done: true, value: undefined }; @@ -259,6 +259,17 @@ export function addReadMethods(LMDBStore, { }; } }, + + directWrite(id, options) { + let rc; + let txn = env.writeTxn || (options && options.transaction) || (readTxnRenewed ? readTxn : renewReadTxn(this)); + let keySize = this.writeKey(id, keyBytes, 0); + let dataOffset = (((keySize >> 3) + 1) << 3); + keyBytes.set(options.bytes, dataOffset) + rc = directWrite(this.dbAddress, keySize, options.offset, options.bytes.length, txn.address || 0); + if (rc < 0) lmdbError(rc); + }, + resetReadTxn() { resetReadTxn(); }, diff --git a/src/dbi.cpp b/src/dbi.cpp index 65b5cf910..13b455591 100644 --- a/src/dbi.cpp +++ b/src/dbi.cpp @@ -174,6 +174,35 @@ int32_t DbiWrap::doGetByBinary(uint32_t keySize, uint32_t ifNotTxnId, int64_t tx } } +NAPI_FUNCTION(directWrite) { + ARGS(5) + GET_INT64_ARG(0); + DbiWrap* dw = (DbiWrap*) i64; + uint32_t keySize; + GET_UINT32_ARG(keySize, 1); + uint32_t offset; + GET_UINT32_ARG(offset, 2); + uint32_t dataSize; + GET_UINT32_ARG(dataSize, 3); + int64_t txnAddress = 0; + napi_status status = napi_get_value_int64(env, args[4], &txnAddress); + if (dw->hasVersions) offset += 8; + EnvWrap* ew = dw->ew; + char* keyBuffer = ew->keyBuffer; + MDB_txn* txn = ew->getReadTxn(txnAddress); + MDB_val key, data; + key.mv_size = keySize; + key.mv_data = (void*) keyBuffer; + data.mv_size = dataSize; + data.mv_data = (void*) (keyBuffer + (((keySize >> 3) + 1) << 3)); +#ifdef MDB_RPAGE_CACHE + int result = mdb_direct_write(txn, dw->dbi, &key, offset, &data); +#else + int result = -1; +#endif + RETURN_INT32(result); +} + NAPI_FUNCTION(getByBinary) { ARGS(4) GET_INT64_ARG(0); @@ -346,6 +375,7 @@ void DbiWrap::setupExports(Napi::Env env, Object exports) { DbiWrap::InstanceMethod("stat", &DbiWrap::stat), }); exports.Set("Dbi", DbiClass); + EXPORT_NAPI_FUNCTION("directWrite", directWrite); EXPORT_NAPI_FUNCTION("getByBinary", getByBinary); EXPORT_NAPI_FUNCTION("prefetch", prefetchNapi); EXPORT_NAPI_FUNCTION("getStringByBinary", getStringByBinary); diff --git a/src/env.cpp b/src/env.cpp index c4d9e083c..8459b094b 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -529,7 +529,7 @@ NAPI_FUNCTION(getTestRef) { return returnValue; } -NAPI_FUNCTION(directWrite) { +/*NAPI_FUNCTION(directWrite) { ARGS(4) GET_INT64_ARG(0); EnvWrap* ew = (EnvWrap*) i64; @@ -556,7 +556,7 @@ NAPI_FUNCTION(directWrite) { } RETURN_UNDEFINED; } - +*/ int32_t EnvWrap::toSharedBuffer(MDB_env* env, uint32_t* keyBuffer, MDB_val data) { unsigned int flags; mdb_env_get_flags(env, (unsigned int*) &flags); diff --git a/src/writer.cpp b/src/writer.cpp index 7e3f90272..f6ce4ad3b 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -303,7 +303,8 @@ next_inst: start = instruction++; if (next_32 & 4) { // preserve last timestamp MDB_val last_data; - mdb_get(txn, dbi, &key, &last_data); + rc = mdb_get(txn, dbi, &key, &last_data); + if (rc) break; if (flags & SET_VERSION) last_data.mv_data = (char *) last_data.mv_data + 8; previous_time = *(uint64_t *) last_data.mv_data; //fprintf(stderr, "previous time %llx \n", previous_time); From 94bf5ddb803b6b5f29b155e7f0acdd30f1326cf7 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 10 Sep 2023 20:23:52 -0600 Subject: [PATCH 21/33] Add support record-level locks --- dependencies/lmdb/libraries/liblmdb/lmdb.h | 20 +++- dependencies/lmdb/libraries/liblmdb/mdb.c | 32 ++++-- native.js | 4 +- package.json | 6 +- read.js | 43 +++++++- src/env.cpp | 122 ++++++++++++++++++--- src/lmdb-js.h | 14 +++ src/misc.cpp | 4 +- test/index.test.js | 84 ++++++++++++++ test/lock-test.js | 21 ++++ 10 files changed, 311 insertions(+), 39 deletions(-) create mode 100644 test/lock-test.js diff --git a/dependencies/lmdb/libraries/liblmdb/lmdb.h b/dependencies/lmdb/libraries/liblmdb/lmdb.h index d1218de3a..0acd5c594 100644 --- a/dependencies/lmdb/libraries/liblmdb/lmdb.h +++ b/dependencies/lmdb/libraries/liblmdb/lmdb.h @@ -1040,12 +1040,20 @@ int mdb_env_set_userctx(MDB_env *env, void *ctx); */ void *mdb_env_get_userctx(MDB_env *env); - /** @brief A callback function for most LMDB assert() failures, - * called before printing the message and aborting. - * - * @param[in] env An environment handle returned by #mdb_env_create(). - * @param[in] msg The assertion message, not including newline. - */ +/** @brief Get the metrics information associated with the #MDB_env. + * + * @param[in] env An environment handle returned by #mdb_env_create() + * @return The pointer set by #mdb_env_set_userctx(). + */ +MDB_metrics *mdb_env_get_metrics(MDB_env *env); + +/** @brief A callback function for most LMDB assert() failures, + * called before printing the message and aborting. + * + * @param[in] env An environment handle returned by #mdb_env_create(). + * @param[in] msg The assertion message, not including newline. + */ + typedef void MDB_assert_func(MDB_env *env, const char *msg); /** Set or reset the assert() callback of the environment. diff --git a/dependencies/lmdb/libraries/liblmdb/mdb.c b/dependencies/lmdb/libraries/liblmdb/mdb.c index d85c5512e..bbf38f223 100644 --- a/dependencies/lmdb/libraries/liblmdb/mdb.c +++ b/dependencies/lmdb/libraries/liblmdb/mdb.c @@ -1691,6 +1691,7 @@ struct MDB_env { MDB_val me_enckey; /**< key for env encryption */ #endif void *me_userctx; /**< User-settable context */ + MDB_metrics me_metrics; /**< Metrics tracking */ MDB_assert_func *me_assert_func; /**< Callback for assertion failures */ void *me_callback; /**< General callback */ int64_t boot_id; @@ -3120,7 +3121,7 @@ mdb_env_sync0(MDB_env *env, int force, pgno_t numpgs) } } if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->time_sync += get_time64() - start; + env->me_metrics.time_sync += get_time64() - start; } return rc; } @@ -3390,7 +3391,7 @@ mdb_txn_renew0(MDB_txn *txn) } else { /* Not yet touching txn == env->me_txn0, it may be active */ if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->clock_txn = get_time64(); + env->me_metrics.clock_txn = get_time64(); } if (ti) { if (LOCK_MUTEX(rc, env, env->me_wmutex)) @@ -3403,8 +3404,8 @@ mdb_txn_renew0(MDB_txn *txn) } if (env->me_flags & MDB_TRACK_METRICS) { uint64_t now = get_time64(); - ((MDB_metrics*) env->me_userctx)->time_start_txns += now - ((MDB_metrics*) env->me_userctx)->clock_txn; - ((MDB_metrics*) env->me_userctx)->clock_txn = now; + env->me_metrics.time_start_txns += now - env->me_metrics.clock_txn; + env->me_metrics.clock_txn = now; } txn->mt_txnid++; #if MDB_DEBUG @@ -4347,9 +4348,9 @@ mdb_page_flush(MDB_txn *txn, int keep) */ CACHEFLUSH(env->me_map, txn->mt_next_pgno * env->me_psize, DCACHE); if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->writes += write_i; - ((MDB_metrics*) env->me_userctx)->page_flushes++; - ((MDB_metrics*) env->me_userctx)->pages_written += pagecount - keep; + env->me_metrics.writes += write_i; + env->me_metrics.page_flushes++; + env->me_metrics.pages_written += pagecount - keep; } #ifdef _WIN32 @@ -4393,7 +4394,7 @@ mdb_page_flush(MDB_txn *txn, int keep) txn->mt_dirty_room += i - j; dl[0].mid = j; if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->time_page_flushes += get_time64() - start; + env->me_metrics.time_page_flushes += get_time64() - start; } return MDB_SUCCESS; } @@ -4663,8 +4664,8 @@ mdb_txn_commit(MDB_txn *txn) done: // if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->time_during_txns += get_time64() - ((MDB_metrics*) env->me_userctx)->clock_txn; - ((MDB_metrics*) env->me_userctx)->txns++; + env->me_metrics.time_during_txns += get_time64() - env->me_metrics.clock_txn; + env->me_metrics.txns++; } if ((txn->mt_flags & MDB_NOSYNC) && (env->me_flags & MDB_OVERLAPPINGSYNC)) { MDB_txn sync_txn; @@ -10739,7 +10740,7 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, } MDB_env* env = txn->mt_env; if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->deletes++; + env->me_metrics.deletes++; } return mdb_del0(txn, dbi, key, data, 0); } @@ -11234,7 +11235,7 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; MDB_env* env = txn->mt_env; if (env->me_flags & MDB_TRACK_METRICS) { - ((MDB_metrics*) env->me_userctx)->puts++; + env->me_metrics.puts++; } mdb_cursor_init(&mc, txn, dbi, &mx); @@ -11835,12 +11836,19 @@ mdb_env_set_userctx(MDB_env *env, void *ctx) return MDB_SUCCESS; } + void * ESECT mdb_env_get_userctx(MDB_env *env) { return env ? env->me_userctx : NULL; } +MDB_metrics * ESECT +mdb_env_get_metrics(MDB_env *env) +{ + return env ? &env->me_metrics : NULL; +} + int ESECT mdb_env_set_assert(MDB_env *env, MDB_assert_func *func) { diff --git a/native.js b/native.js index 16a51054f..65d93e81b 100644 --- a/native.js +++ b/native.js @@ -1,7 +1,7 @@ import { dirname, join, default as pathModule } from 'path'; import { fileURLToPath } from 'url'; import loadNAPI from 'node-gyp-build-optional-packages'; -export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress, directWrite; +export let Env, Txn, Dbi, Compression, Cursor, getAddress, getBufferAddress, createBufferForAddress, clearKeptObjects, globalBuffer, setGlobalBuffer, arch, fs, os, onExit, tmpdir, lmdbError, path, EventEmitter, orderedBinary, MsgpackrEncoder, WeakLRUCache, setEnvMap, getEnvMap, getByBinary, detachBuffer, startRead, setReadCallback, write, position, iterate, prefetch, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, getSharedByBinary, getSharedBuffer, compress, directWrite, attemptLock, unlock; path = pathModule; let dirName = dirname(fileURLToPath(import.meta.url)).replace(/dist$/, ''); export let nativeAddon = loadNAPI(dirName); @@ -62,6 +62,8 @@ export function setNativeFunctions(externals) { position = externals.position; resetTxn = externals.resetTxn; directWrite = externals.directWrite; + attemptLock = externals.attemptLock; + unlock = externals.unlock; getCurrentValue = externals.getCurrentValue; getCurrentShared = externals.getCurrentShared; getStringByBinary = externals.getStringByBinary; diff --git a/package.json b/package.json index 641c8d10f..6174b6487 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.9.0-timestamp.3", + "version": "2.9.0-locks.1", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { @@ -105,5 +105,5 @@ "test": "tests" }, "optionalDependencies": { - } -} \ No newline at end of file + } +} diff --git a/read.js b/read.js index 5e893dd9e..69d28ad94 100644 --- a/read.js +++ b/read.js @@ -1,5 +1,27 @@ import { RangeIterable } from './util/RangeIterable.js'; -import { getAddress, Cursor, Txn, orderedBinary, lmdbError, getByBinary, setGlobalBuffer, prefetch, iterate, position as doPosition, resetTxn, getCurrentValue, getCurrentShared, getStringByBinary, globalBuffer, getSharedBuffer, startRead, setReadCallback, directWrite } from './native.js'; +import { + getAddress, + Cursor, + Txn, + orderedBinary, + lmdbError, + getByBinary, + setGlobalBuffer, + prefetch, + iterate, + position as doPosition, + resetTxn, + getCurrentValue, + getCurrentShared, + getStringByBinary, + globalBuffer, + getSharedBuffer, + startRead, + setReadCallback, + directWrite, + attemptLock, + unlock +} from './native.js'; import { saveKey } from './keys.js'; const IF_EXISTS = 3.542694326329068e-103; const ITERATOR_DONE = { done: true, value: undefined }; @@ -270,6 +292,25 @@ export function addReadMethods(LMDBStore, { if (rc < 0) lmdbError(rc); }, + attemptLock(id, version, callback) { + keyBytes.dataView.setUint32(0, this.db.dbi); + keyBytes.dataView.setFloat64(4, version); + let keySize = this.writeKey(id, keyBytes, 12); + return attemptLock(env.address, keySize, callback); + }, + + unlock(id, version, onlyCheck) { + keyBytes.dataView.setUint32(0, this.db.dbi); + keyBytes.dataView.setFloat64(4, version); + let keySize = this.writeKey(id, keyBytes, 12); + return unlock(env.address, keySize, onlyCheck); + }, + hasLock(id, version) { + return this.unlock(id, version, true); + }, + + + resetReadTxn() { resetReadTxn(); }, diff --git a/src/env.cpp b/src/env.cpp index 8459b094b..25f0feaf5 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -293,7 +293,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, this->compression = compression; this->jsFlags = jsFlags; #ifdef MDB_OVERLAPPINGSYNC - MDB_metrics* metrics; + RecordLocks* resolutions; #endif int rc; rc = mdb_env_set_maxdbs(env, maxDbs); @@ -327,13 +327,8 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, flags |= MDB_PREVSNAPSHOT; } mdb_env_set_callback(env, checkExistingEnvs); - trackMetrics = !!(flags & MDB_TRACK_METRICS); - if (trackMetrics) { - metrics = new MDB_metrics; - memset(metrics, 0, sizeof(MDB_metrics)); - rc = mdb_env_set_userctx(env, (void*) metrics); - if (rc) goto fail; - } + resolutions = new RecordLocks(); + mdb_env_set_userctx(env, resolutions); #endif timeTxnWaiting = 0; @@ -346,9 +341,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, if (rc != 0) { #ifdef MDB_OVERLAPPINGSYNC - if (trackMetrics) { - delete metrics; - } + delete resolutions; #endif if (rc == EXISTING_ENV_FOUND) { mdb_env_close(env); @@ -615,9 +608,24 @@ int32_t EnvWrap::toSharedBuffer(MDB_env* env, uint32_t* keyBuffer, MDB_val data return -30001; } +void notifyCallbacks(std::vector callbacks); + void EnvWrap::closeEnv(bool hasLock) { if (!env) return; + // unlock any record locks held by this thread/EnvWrap + RecordLocks* record_locks = (RecordLocks*) mdb_env_get_userctx(env); + pthread_mutex_lock(&record_locks->modification_lock); + auto it = record_locks->lock_callbacks.begin(); + while (it != record_locks->lock_callbacks.end()) + { + if (it->second.ew == this) { + notifyCallbacks(it->second.callbacks); + it = record_locks->lock_callbacks.erase(it); + } else ++it; + } + pthread_mutex_unlock(&record_locks->modification_lock); + if (openEnvWraps) { for (auto ewRef = openEnvWraps->begin(); ewRef != openEnvWraps->end(); ) { if (*ewRef == this) { @@ -644,9 +652,7 @@ void EnvWrap::closeEnv(bool hasLock) { if ((envFlags & MDB_OVERLAPPINGSYNC) && envPath->hasWrites) { mdb_env_sync(env, 1); } - if (envFlags & MDB_TRACK_METRICS) { - delete (MDB_metrics*) mdb_env_get_userctx(env); - } + delete (RecordLocks*) mdb_env_get_userctx(env); #endif char* path; mdb_env_get_path(env, (const char**)&path); @@ -751,7 +757,7 @@ Napi::Value EnvWrap::info(const CallbackInfo& info) { stats.Set("numReaders", Number::New(info.Env(), envinfo.me_numreaders)); #ifdef MDB_OVERLAPPINGSYNC if (this->trackMetrics) { - MDB_metrics* metrics = (MDB_metrics*) mdb_env_get_userctx(this->env); + MDB_metrics* metrics = (MDB_metrics*) mdb_env_get_metrics(this->env); stats.Set("timeStartTxns", Number::New(info.Env(), (double) metrics->time_start_txns / TICKS_PER_SECOND)); stats.Set("timeDuringTxns", Number::New(info.Env(), (double) metrics->time_during_txns / TICKS_PER_SECOND)); stats.Set("timePageFlushes", Number::New(info.Env(), (double) metrics->time_page_flushes / TICKS_PER_SECOND)); @@ -969,6 +975,90 @@ int32_t writeFFI(double ewPointer, uint64_t instructionAddress) { return rc; } +RecordLocks::RecordLocks() { + pthread_mutex_init(&modification_lock, nullptr); +} +bool RecordLocks::attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew) { + pthread_mutex_lock(&modification_lock); + lock_callbacks.find(key); + auto resolution = lock_callbacks.find(key); + bool found; + if (resolution == lock_callbacks.end()) { + callback_holder_t callbacks; + callbacks.ew = ew; + lock_callbacks.emplace(key, callbacks); + found = true; + } else { + if (has_callback) { + napi_threadsafe_function callback; + napi_value resource; + napi_status status; + status = napi_create_object(env, &resource); + napi_value resource_name; + status = napi_create_string_latin1(env, "lock", NAPI_AUTO_LENGTH, &resource_name); + napi_create_threadsafe_function(env, func, resource, resource_name, 0, 1, nullptr, nullptr, nullptr, nullptr, + &callback); + napi_unref_threadsafe_function(env, callback); + resolution->second.callbacks.push_back(callback); + } + found = false; + } + pthread_mutex_unlock(&modification_lock); + return found; +} +NAPI_FUNCTION(attemptLock) { + ARGS(3) + GET_INT64_ARG(0) + EnvWrap* ew = (EnvWrap*) i64; + uint32_t size; + GET_UINT32_ARG(size, 1); + napi_value as_bool; + napi_coerce_to_bool(env, args[2], &as_bool); + bool has_callback; + napi_get_value_bool(env, as_bool, &has_callback); + RecordLocks* lock_callbacks = (RecordLocks*) mdb_env_get_userctx(ew->env); + std::string key(ew->keyBuffer, size); + bool result = lock_callbacks->attemptLock(key, env, args[2], has_callback, ew); + napi_value return_value; + napi_get_boolean(env, result, &return_value); + return return_value; +} +bool RecordLocks::unlock(std::string key, bool only_check) { + pthread_mutex_lock(&modification_lock); + auto resolution = lock_callbacks.find(key); + if (resolution == lock_callbacks.end()) { + pthread_mutex_unlock(&modification_lock); + return false; + } + if (!only_check) { + notifyCallbacks(resolution->second.callbacks); + lock_callbacks.erase(resolution); + } + pthread_mutex_unlock(&modification_lock); + return true; +} +void notifyCallbacks(std::vector callbacks) { + for (auto callback = callbacks.begin(); callback != callbacks.end();) { + napi_call_threadsafe_function(*callback, nullptr, napi_tsfn_blocking); + napi_release_threadsafe_function(*callback, napi_tsfn_release); + callback++; + } +} +NAPI_FUNCTION(unlock) { + ARGS(3) + GET_INT64_ARG(0); + EnvWrap* ew = (EnvWrap*) i64; + uint32_t size; + GET_UINT32_ARG(size, 1); + bool only_check = false; + napi_get_value_bool(env, args[2], &only_check); + RecordLocks* lock_callbacks = (RecordLocks*) mdb_env_get_userctx(ew->env); + std::string key(ew->keyBuffer, size); + bool result = lock_callbacks->unlock(key, only_check); + napi_value return_value; + napi_get_boolean(env, result, &return_value); + return return_value; +} void EnvWrap::setupExports(Napi::Env env, Object exports) { // EnvWrap: Prepare constructor template @@ -1000,6 +1090,8 @@ void EnvWrap::setupExports(Napi::Env env, Object exports) { EXPORT_NAPI_FUNCTION("getSharedBuffer", getSharedBuffer); EXPORT_NAPI_FUNCTION("setTestRef", setTestRef); EXPORT_NAPI_FUNCTION("getTestRef", getTestRef); + EXPORT_NAPI_FUNCTION("attemptLock", attemptLock); + EXPORT_NAPI_FUNCTION("unlock", unlock); EXPORT_FUNCTION_ADDRESS("writePtr", writeFFI); //envTpl->InstanceTemplate()->SetInternalFieldCount(1); exports.Set("Env", EnvClass); diff --git a/src/lmdb-js.h b/src/lmdb-js.h index da7a58c5e..f25e78dbd 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -264,6 +264,20 @@ typedef struct js_buffers_t { // there is one instance of this for each JS (work pthread_mutex_t modification_lock; } js_buffers_t; + +typedef struct callback_holder_t { + EnvWrap* ew; + std::vector callbacks; +} callback_holder_t; +class RecordLocks { +public: + RecordLocks(); + std::unordered_map lock_callbacks; + pthread_mutex_t modification_lock; + bool attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew); + bool unlock(std::string key, bool only_check); +}; + class EnvWrap : public ObjectWrap { private: // List of open read transactions diff --git a/src/misc.cpp b/src/misc.cpp index c124077a0..ce0ff7ea2 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -123,10 +123,12 @@ int getVersionAndUncompress(MDB_val &data, DbiWrap* dw) { if (data.mv_size == 0) { return 1;// successFunc(data); } - unsigned char statusByte = dw->compression ? charData[dw->compression->startingOffset] : 0; + unsigned char statusByte = (dw->compression && dw->compression->startingOffset < data.mv_size) + ? charData[dw->compression->startingOffset] : 0; //fprintf(stdout, "uncompressing status %X\n", statusByte); if (statusByte >= 250) { bool isValid; + dw->compression->decompress(data, isValid, !dw->getFast); return isValid ? 2 : 0; } diff --git a/test/index.test.js b/test/index.test.js index 6983dba34..b357fc059 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -7,7 +7,9 @@ import { spawn } from 'child_process'; import { unlinkSync } from 'fs' import { fileURLToPath } from 'url' import { dirname } from 'path' +import { Worker } from 'worker_threads'; import { encoder as orderedBinaryEncoder } from 'ordered-binary/index.js' +import why from 'why-is-node-still-running'; import inspector from 'inspector' //inspector.open(9229, null, true); debugger let nativeMethods, dirName = dirname(fileURLToPath(import.meta.url)) @@ -1340,6 +1342,88 @@ describe('lmdb-js', function() { should.equal(returnedValue[9], 3); }); + it.only('lock/unlock notifications', async function() { + let listener_called = 0; + should.equal(db.attemptLock(3.2, 55555, () => { + listener_called++; + }), true); + should.equal(db.attemptLock(3.2, 55555, () => { + listener_called++; + }), false); + let finished_locks = new Promise((resolve) => { + should.equal(db.attemptLock(3.2, 55555, () => { + listener_called++; + resolve(); + }), false); + }); + should.equal(db.hasLock('hi', 55555), false); + should.equal(db.hasLock(3.2, 3), false); + should.equal(db.hasLock(3.2, 55555), true); + should.equal(db.hasLock(3.2, 55555), true); + should.equal(db.unlock(3.2, 55555), true); + should.equal(db.hasLock(3.2, 55555), false); + await finished_locks; + should.equal(listener_called, 2); + should.equal(db.hasLock(3.2, 55555), false); + }); + + it.only('lock/unlock with worker', async function() { + let listener_called = 0; + should.equal(db.attemptLock(4, 1, () => { + listener_called++; + }), true); + let worker = new Worker('./test/lock-test.js', { + workerData: { + path: db.path, + } + }); + let onworkerlock, onworkerunlock; + worker.on('error', (error) => { + console.log(error); + }) + await new Promise((resolve, reject) => { + worker.on('error', (error) => { + reject(error); + }) + worker.on('message', (event) => { + if (event.started) { + should.equal(event.hasLock, true); + resolve(); + } + if (event.locked) onworkerlock(); + //if (event.unlocked) onworkerunlock(); + }); + }); + db.unlock(4, 1); + await new Promise(resolve => { + onworkerlock = resolve; + }); + should.equal(db.attemptLock(4, 1, () => { + listener_called++; + onworkerunlock(); + }), false); + worker.postMessage({unlock: true}); + await new Promise(resolve => { + onworkerunlock = resolve; + }); + should.equal(listener_called, 1); + worker.postMessage({lock: true}); + await new Promise(resolve => { + onworkerlock = resolve; + }); + await new Promise(resolve => { + should.equal(db.attemptLock(4, 1, () => { + listener_called++; + should.equal(listener_called, 2); + resolve(); + }), false); + worker.terminate(); + }); + setTimeout(() => { + why.whyIsNodeStillRunning(); + }, 10000).unref(); + }); + it('direct write', async function() { let dbBinary = db.openDB(Object.assign({ name: 'mydb-direct', diff --git a/test/lock-test.js b/test/lock-test.js new file mode 100644 index 000000000..561ffc5ea --- /dev/null +++ b/test/lock-test.js @@ -0,0 +1,21 @@ +import { open } from '../node-index.js'; +import { parentPort, workerData } from 'worker_threads'; +let db = open({ + name: 'mydb1', + useVersions: true, + path: workerData.path, +}); +function getLock() { + if (db.attemptLock(4, 1, getLock)) + parentPort.postMessage({ locked: true }); +} +getLock(); + +parentPort.on('message', (event) => { + if (event.unlock) { + db.unlock(4, 1); + parentPort.postMessage({ unlocked: true }); + } + if (event.lock) getLock(); +}); +parentPort.postMessage({ started: true, hasLock: db.hasLock(4, 1) }); From b492fbdb0d6c723f34b8b98cbe60734db2fbc0ab Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sun, 10 Sep 2023 20:49:46 -0600 Subject: [PATCH 22/33] Fix track metrics flag and some tests --- read.js | 2 -- src/env.cpp | 4 +++- src/lmdb-js.h | 1 - src/writer.cpp | 10 +++++----- test/index.test.js | 8 ++------ 5 files changed, 10 insertions(+), 15 deletions(-) diff --git a/read.js b/read.js index 69d28ad94..b0f245de0 100644 --- a/read.js +++ b/read.js @@ -309,8 +309,6 @@ export function addReadMethods(LMDBStore, { return this.unlock(id, version, true); }, - - resetReadTxn() { resetReadTxn(); }, diff --git a/src/env.cpp b/src/env.cpp index 25f0feaf5..3c2f2a80a 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -756,7 +756,9 @@ Napi::Value EnvWrap::info(const CallbackInfo& info) { stats.Set("maxReaders", Number::New(info.Env(), envinfo.me_maxreaders)); stats.Set("numReaders", Number::New(info.Env(), envinfo.me_numreaders)); #ifdef MDB_OVERLAPPINGSYNC - if (this->trackMetrics) { + unsigned int envFlags; + mdb_env_get_flags(env, &envFlags); + if (envFlags & MDB_TRACK_METRICS) { MDB_metrics* metrics = (MDB_metrics*) mdb_env_get_metrics(this->env); stats.Set("timeStartTxns", Number::New(info.Env(), (double) metrics->time_start_txns / TICKS_PER_SECOND)); stats.Set("timeDuringTxns", Number::New(info.Env(), (double) metrics->time_during_txns / TICKS_PER_SECOND)); diff --git a/src/lmdb-js.h b/src/lmdb-js.h index f25e78dbd..015cbbe9e 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -314,7 +314,6 @@ class EnvWrap : public ObjectWrap { WriteWorker* writeWorker; bool readTxnRenewed; bool hasWrites; - bool trackMetrics; uint64_t timeTxnWaiting; unsigned int jsFlags; char* keyBuffer; diff --git a/src/writer.cpp b/src/writer.cpp index f6ce4ad3b..c148efe97 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -122,7 +122,9 @@ int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* tar pthread_cond_signal(envForTxn->writingCond); interruptionStatus = WORKER_WAITING; uint64_t start; - if (envForTxn->trackMetrics) + unsigned int envFlags; + mdb_env_get_flags(env, &envFlags); + if (envFlags & MDB_TRACK_METRICS) start = get_time64(); if (target) { uint64_t delay = 1; @@ -131,9 +133,8 @@ int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* tar delay = delay << 1ll; if ((*target & 0xf) || (allowCommit && finishedProgress)) { // we are in position to continue writing or commit, so forward progress can be made without interrupting yet - if (envForTxn->trackMetrics) { + if (envFlags & MDB_TRACK_METRICS) envForTxn->timeTxnWaiting += get_time64() - start; - } interruptionStatus = 0; return 0; } @@ -141,9 +142,8 @@ int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* tar } else { pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock); } - if (envForTxn->trackMetrics) { + if (envFlags & MDB_TRACK_METRICS) envForTxn->timeTxnWaiting += get_time64() - start; - } if (interruptionStatus == INTERRUPT_BATCH) { // interrupted by JS code that wants to run a synchronous transaction interruptionStatus = RESTART_WORKER_TXN; rc = mdb_txn_commit(*txn); diff --git a/test/index.test.js b/test/index.test.js index b357fc059..110dafece 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -9,7 +9,6 @@ import { fileURLToPath } from 'url' import { dirname } from 'path' import { Worker } from 'worker_threads'; import { encoder as orderedBinaryEncoder } from 'ordered-binary/index.js' -import why from 'why-is-node-still-running'; import inspector from 'inspector' //inspector.open(9229, null, true); debugger let nativeMethods, dirName = dirname(fileURLToPath(import.meta.url)) @@ -1342,7 +1341,7 @@ describe('lmdb-js', function() { should.equal(returnedValue[9], 3); }); - it.only('lock/unlock notifications', async function() { + it('lock/unlock notifications', async function() { let listener_called = 0; should.equal(db.attemptLock(3.2, 55555, () => { listener_called++; @@ -1367,7 +1366,7 @@ describe('lmdb-js', function() { should.equal(db.hasLock(3.2, 55555), false); }); - it.only('lock/unlock with worker', async function() { + it('lock/unlock with worker', async function() { let listener_called = 0; should.equal(db.attemptLock(4, 1, () => { listener_called++; @@ -1419,9 +1418,6 @@ describe('lmdb-js', function() { }), false); worker.terminate(); }); - setTimeout(() => { - why.whyIsNodeStillRunning(); - }, 10000).unref(); }); it('direct write', async function() { From d0bdca2a45fbf4ca7b367d478287b8f18099da83 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 11 Sep 2023 14:54:14 -0600 Subject: [PATCH 23/33] Treat puts with flags as conditional puts --- write.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/write.js b/write.js index b53dca252..c3a419d17 100644 --- a/write.js +++ b/write.js @@ -337,7 +337,8 @@ export function addWriteMethods(LMDBStore, { env, fixedBuffer, resetReadTxn, use return; } } - if (ifVersion === undefined) { + // if it is not conditional because of ifVersion or has any flags that can make the write conditional + if (ifVersion === undefined && !(flags & 0x22030)) { if (writtenBatchDepth > 1) { if (!resolution.flag && !store.cache) resolution.flag = NO_RESOLVE; From 6ccd407ee1bc97ac394c4e4e33cced315a1ccd5b Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 11 Sep 2023 16:31:48 -0600 Subject: [PATCH 24/33] Revert to using CI for musl builds --- .github/workflows/prebuild.yml | 28 ++-------------------------- package.json | 4 ++-- 2 files changed, 4 insertions(+), 28 deletions(-) diff --git a/.github/workflows/prebuild.yml b/.github/workflows/prebuild.yml index f0628066c..fc9a29166 100644 --- a/.github/workflows/prebuild.yml +++ b/.github/workflows/prebuild.yml @@ -56,7 +56,7 @@ jobs: if: startsWith(github.ref, 'refs/tags/') with: files: prebuild-win32.tar - build-centos-7-musl: + build-centos-7: if: startsWith(github.ref, 'refs/tags/') env: LMDB_DATA_V1: ${{ contains(github.ref, '-v1') }} @@ -70,7 +70,7 @@ jobs: node-version: 16 - run: yum update -y && yum install -y python3 - run: curl https://raw.githubusercontent.com/kriszyp/musl-bins/main/aarch64-linux-musl-cross.tgz --insecure --output aarch64-linux-musl-cross.tgz - - run: tar -xf aarch64-linux-musl-cross.tgz || cat aarch64-linux-musl-cross.tgz + - run: tar -xf aarch64-linux-musl-cross.tgz && pwd && ls - run: curl https://raw.githubusercontent.com/kriszyp/musl-bins/main/armv7l-linux-musleabihf-cross.tgz --insecure --output armv7l-linux-musleabihf-cross.tgz - run: tar -xf armv7l-linux-musleabihf-cross.tgz && pwd && ls - run: curl https://raw.githubusercontent.com/kriszyp/musl-bins/main/x86_64-linux-musl-native.tgz --insecure --output x86_64-linux-musl-native.tgz @@ -98,30 +98,6 @@ jobs: PREBUILD_ARCH: x64 CC: ${PWD}/x86_64-linux-musl-native/bin/x86_64-linux-musl-gcc CXX: ${PWD}/x86_64-linux-musl-native/bin/x86_64-linux-musl-g++ - - run: npm run build-js - - run: chmod 777 test - - run: npm test - if: ${{ !contains(github.ref, '-v1') }} - - run: tar --create --verbose --file=prebuild-linux.tar -C prebuilds . - - name: Prebuild - if: startsWith(github.ref, 'refs/tags/') - uses: softprops/action-gh-release@v1 - with: - files: prebuild-linux-musl.tar - build-centos-7: - if: startsWith(github.ref, 'refs/tags/') - env: - LMDB_DATA_V1: ${{ contains(github.ref, '-v1') }} - runs-on: ubuntu-20.04 - container: quay.io/pypa/manylinux2014_x86_64 - steps: - - uses: actions/checkout@v3 - - name: Setup node - uses: actions/setup-node@v3 - with: - node-version: 16 - - run: yum update -y && yum install -y python3 - - run: npm install - run: npm run prebuild-libc - run: ls prebuilds/linux-x64 #- run: cp prebuilds/linux-x64/node.abi93.glibc.node prebuilds/linux-x64/node.abi92.glibc.node diff --git a/package.json b/package.json index 6174b6487..1d98269f1 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.9.0-locks.1", + "version": "2.9.0-locks.2", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { @@ -106,4 +106,4 @@ }, "optionalDependencies": { } -} +} \ No newline at end of file From 7aeac5a57672c1e56e936902e9a4ccd6bf48796f Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 14 Sep 2023 14:31:05 -0600 Subject: [PATCH 25/33] Remove node v16 builds --- package.json | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index 1d98269f1..6c3e896ce 100644 --- a/package.json +++ b/package.json @@ -62,12 +62,12 @@ "build-js": "rollup -c", "prepare": "rollup -c", "before-publish": "rollup -c && prebuildify-ci download && node util/set-optional-deps.cjs && npm run test", - "prebuild-libc-musl": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --tag-libc --napi --platform-packages --target 16.18.0", - "prebuild-libc": "prebuildify-platform-packages --tag-libc --target 20.0.0 || true && prebuildify-platform-packages --tag-libc --target 18.15.0 && prebuildify-platform-packages --platform-packages --tag-libc --target 16.18.0 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 16.18.0", - "prebuild-macos": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.15.0 && prebuildify-platform-packages --platform-packages --target 16.18.0 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --target 16.18.0", - "prebuild-win32": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.15.0 && prebuildify-platform-packages --target 16.18.0 && set ENABLE_V8_FUNCTIONS=false&& prebuildify-platform-packages --napi --platform-packages --target 16.18.0", - "prebuild-libc-arm7": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 16.18.0", - "prebuildify": "prebuildify-platform-packages --napi --target 18.15.0", + "prebuild-libc-musl": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --tag-libc --napi --platform-packages --target 18.17.1", + "prebuild-libc": "prebuildify-platform-packages --tag-libc --target 20.0.0 || true && prebuildify-platform-packages --platform-packages --tag-libc --target 18.17.1 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 18.17.1", + "prebuild-macos": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --platform-packages --target 18.17.1 && ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --target 18.17.1", + "prebuild-win32": "prebuildify-platform-packages --target 20.0.0 && prebuildify-platform-packages --target 18.17.1 && set ENABLE_V8_FUNCTIONS=false&& prebuildify-platform-packages --napi --platform-packages --target 18.17.1", + "prebuild-libc-arm7": "ENABLE_V8_FUNCTIONS=false prebuildify-platform-packages --napi --platform-packages --tag-libc --target 18.17.1", + "prebuildify": "prebuildify-platform-packages --napi --target 18.17.1", "full-publish": "cd prebuilds/win32-x64 && npm publish --tag dev --access public && cd ../darwin-x64 && npm publish --tag dev --access public && cd ../darwin-arm64 && npm publish --tag dev --access public && cd ../linux-x64 && npm publish --tag dev --access public && cd ../linux-arm64 && npm publish --tag dev --access public && cd ../linux-arm && npm publish --tag dev --access public && cd ../.. && npm publish --tag dev", "recompile": "node-gyp clean && node-gyp configure && node-gyp build", "test": "mocha test/**.test.js --expose-gc --recursive", @@ -105,5 +105,11 @@ "test": "tests" }, "optionalDependencies": { + "@lmdb/lmdb-darwin-arm64": "2.9.0-locks.2", + "@lmdb/lmdb-darwin-x64": "2.9.0-locks.2", + "@lmdb/lmdb-linux-arm": "2.9.0-locks.2", + "@lmdb/lmdb-linux-arm64": "2.9.0-locks.2", + "@lmdb/lmdb-linux-x64": "2.9.0-locks.2", + "@lmdb/lmdb-win32-x64": "2.9.0-locks.2" } } \ No newline at end of file From a0bae536a45ce60add69cfbe560ff79eaf5d9034 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Thu, 14 Sep 2023 14:31:29 -0600 Subject: [PATCH 26/33] Timestamps should be scope to environments to allow parallel environment execution --- src/env.cpp | 45 ++++++++++++++++++++++++++++----------------- src/lmdb-js.h | 21 ++++++++++++++++++--- src/misc.cpp | 25 ++----------------------- src/writer.cpp | 12 +++++++----- 4 files changed, 55 insertions(+), 48 deletions(-) diff --git a/src/env.cpp b/src/env.cpp index 3c2f2a80a..954e19fd9 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -293,7 +293,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, this->compression = compression; this->jsFlags = jsFlags; #ifdef MDB_OVERLAPPINGSYNC - RecordLocks* resolutions; + ExtendedEnv* resolutions; #endif int rc; rc = mdb_env_set_maxdbs(env, maxDbs); @@ -327,7 +327,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, flags |= MDB_PREVSNAPSHOT; } mdb_env_set_callback(env, checkExistingEnvs); - resolutions = new RecordLocks(); + resolutions = new ExtendedEnv(); mdb_env_set_userctx(env, resolutions); #endif @@ -614,8 +614,8 @@ void EnvWrap::closeEnv(bool hasLock) { if (!env) return; // unlock any record locks held by this thread/EnvWrap - RecordLocks* record_locks = (RecordLocks*) mdb_env_get_userctx(env); - pthread_mutex_lock(&record_locks->modification_lock); + ExtendedEnv* record_locks = (ExtendedEnv*) mdb_env_get_userctx(env); + pthread_mutex_lock(&record_locks->locksModificationLock); auto it = record_locks->lock_callbacks.begin(); while (it != record_locks->lock_callbacks.end()) { @@ -624,7 +624,7 @@ void EnvWrap::closeEnv(bool hasLock) { it = record_locks->lock_callbacks.erase(it); } else ++it; } - pthread_mutex_unlock(&record_locks->modification_lock); + pthread_mutex_unlock(&record_locks->locksModificationLock); if (openEnvWraps) { for (auto ewRef = openEnvWraps->begin(); ewRef != openEnvWraps->end(); ) { @@ -652,7 +652,7 @@ void EnvWrap::closeEnv(bool hasLock) { if ((envFlags & MDB_OVERLAPPINGSYNC) && envPath->hasWrites) { mdb_env_sync(env, 1); } - delete (RecordLocks*) mdb_env_get_userctx(env); + delete (ExtendedEnv*) mdb_env_get_userctx(env); #endif char* path; mdb_env_get_path(env, (const char**)&path); @@ -977,11 +977,22 @@ int32_t writeFFI(double ewPointer, uint64_t instructionAddress) { return rc; } -RecordLocks::RecordLocks() { - pthread_mutex_init(&modification_lock, nullptr); +ExtendedEnv::ExtendedEnv() { + pthread_mutex_init(&locksModificationLock, nullptr); } -bool RecordLocks::attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew) { - pthread_mutex_lock(&modification_lock); +ExtendedEnv::~ExtendedEnv() { + pthread_mutex_destroy(&locksModificationLock); +} +uint64_t ExtendedEnv::getNextTime() { + uint64_t next_time_int = next_time_double(); + if (next_time_int == lastTime) next_time_int++; + return bswap_64(lastTime = next_time_int); +} +uint64_t ExtendedEnv::getLastTime() { + return bswap_64(lastTime); +} +bool ExtendedEnv::attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew) { + pthread_mutex_lock(&locksModificationLock); lock_callbacks.find(key); auto resolution = lock_callbacks.find(key); bool found; @@ -1005,7 +1016,7 @@ bool RecordLocks::attemptLock(std::string key, napi_env env, napi_value func, bo } found = false; } - pthread_mutex_unlock(&modification_lock); + pthread_mutex_unlock(&locksModificationLock); return found; } NAPI_FUNCTION(attemptLock) { @@ -1018,25 +1029,25 @@ NAPI_FUNCTION(attemptLock) { napi_coerce_to_bool(env, args[2], &as_bool); bool has_callback; napi_get_value_bool(env, as_bool, &has_callback); - RecordLocks* lock_callbacks = (RecordLocks*) mdb_env_get_userctx(ew->env); + ExtendedEnv* lock_callbacks = (ExtendedEnv*) mdb_env_get_userctx(ew->env); std::string key(ew->keyBuffer, size); bool result = lock_callbacks->attemptLock(key, env, args[2], has_callback, ew); napi_value return_value; napi_get_boolean(env, result, &return_value); return return_value; } -bool RecordLocks::unlock(std::string key, bool only_check) { - pthread_mutex_lock(&modification_lock); +bool ExtendedEnv::unlock(std::string key, bool only_check) { + pthread_mutex_lock(&locksModificationLock); auto resolution = lock_callbacks.find(key); if (resolution == lock_callbacks.end()) { - pthread_mutex_unlock(&modification_lock); + pthread_mutex_unlock(&locksModificationLock); return false; } if (!only_check) { notifyCallbacks(resolution->second.callbacks); lock_callbacks.erase(resolution); } - pthread_mutex_unlock(&modification_lock); + pthread_mutex_unlock(&locksModificationLock); return true; } void notifyCallbacks(std::vector callbacks) { @@ -1054,7 +1065,7 @@ NAPI_FUNCTION(unlock) { GET_UINT32_ARG(size, 1); bool only_check = false; napi_get_value_bool(env, args[2], &only_check); - RecordLocks* lock_callbacks = (RecordLocks*) mdb_env_get_userctx(ew->env); + ExtendedEnv* lock_callbacks = (ExtendedEnv*) mdb_env_get_userctx(ew->env); std::string key(ew->keyBuffer, size); bool result = lock_callbacks->unlock(key, only_check); napi_value return_value; diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 015cbbe9e..5e436674c 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -88,6 +88,16 @@ uint64_t last_time_double(); int cond_init(pthread_cond_t *cond); int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, uint64_t ns); +// if we need to add others: https://stackoverflow.com/questions/41770887/cross-platform-definition-of-byteswap-uint64-and-byteswap-ulong/46137633#46137633 +#ifdef _WIN32 + #define bswap_64(x) _byteswap_uint64(x) +#elif defined(__APPLE__) + #include + #define bswap_64(x) OSSwapInt64(x) +#else + #include // bswap_64 +#endif + #endif /* __CPTHREAD_H__ */ class Logging { @@ -269,13 +279,18 @@ typedef struct callback_holder_t { EnvWrap* ew; std::vector callbacks; } callback_holder_t; -class RecordLocks { +class ExtendedEnv { public: - RecordLocks(); + ExtendedEnv(); + ~ExtendedEnv(); std::unordered_map lock_callbacks; - pthread_mutex_t modification_lock; + pthread_mutex_t locksModificationLock; + uint64_t lastTime; // actually encoded as double + uint64_t previousTime; // actually encoded as double bool attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew); bool unlock(std::string key, bool only_check); + uint64_t getNextTime(); + uint64_t getLastTime(); }; class EnvWrap : public ObjectWrap { diff --git a/src/misc.cpp b/src/misc.cpp index ce0ff7ea2..14102cf8c 100644 --- a/src/misc.cpp +++ b/src/misc.cpp @@ -5,16 +5,6 @@ #include #include -// if we need to add others: https://stackoverflow.com/questions/41770887/cross-platform-definition-of-byteswap-uint64-and-byteswap-ulong/46137633#46137633 -#ifdef _WIN32 - #define bswap_64(x) _byteswap_uint64(x) -#elif defined(__APPLE__) - #include - #define bswap_64(x) OSSwapInt64(x) -#else - #include // bswap_64 -#endif - using namespace Napi; static thread_local char* globalUnsafePtr; @@ -562,14 +552,8 @@ uint64_t next_time_double() { t /= 10; t -= 11644473600000000ULL; double next_time = (double)t/ 1000; - uint64_t next_time_int = *((uint64_t*)&next_time); - if (next_time_int == last_time) next_time_int++; - return bswap_64 (last_time = next_time_int); + return *((uint64_t*)&next_time); } -uint64_t last_time_double() { - return _byteswap_uint64 (last_time); -} - #else int cond_init(pthread_cond_t *cond) { @@ -606,12 +590,7 @@ uint64_t next_time_double() { struct timespec time; clock_gettime(CLOCK_REALTIME, &time); double next_time = (double)time.tv_sec * 1000 + (double)time.tv_nsec / 1000000; - uint64_t next_time_int = *((uint64_t*)&next_time); - if (next_time_int == last_time) next_time_int++; - return bswap_64(last_time = next_time_int); -} -uint64_t last_time_double() { - return bswap_64(last_time); + return *((uint64_t*)&next_time); } #endif diff --git a/src/writer.cpp b/src/writer.cpp index c148efe97..c440b0ac5 100644 --- a/src/writer.cpp +++ b/src/writer.cpp @@ -58,7 +58,6 @@ const int IF_NO_EXISTS = MDB_NOOVERWRITE; //0x10; const int FAILED_CONDITION = 0x4000000; const int FINISHED_OPERATION = 0x1000000; const double ANY_VERSION = 3.542694326329068e-103; // special marker for any version -static uint64_t previous_time; WriteWorker::~WriteWorker() { // TODO: Make sure this runs on the JS main thread, or we need to move it @@ -290,7 +289,9 @@ next_inst: start = instruction++; case PUT: if (flags & ASSIGN_TIMESTAMP) { if ((*(uint64_t*)key.mv_data & 0xfffffffful) == REPLACE_WITH_TIMESTAMP) { - *(uint64_t*)key.mv_data = ((*(uint64_t*)key.mv_data >> 32) & 0x1) ? last_time_double() : next_time_double(); + ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(envForTxn->env); + *(uint64_t*)key.mv_data = ((*(uint64_t*)key.mv_data >> 32) & 0x1) ? + extended_env->getLastTime() : extended_env->getNextTime(); } uint64_t first_word = *(uint64_t*)value.mv_data; // 0 assign new time @@ -299,6 +300,7 @@ next_inst: start = instruction++; // 4 record previous time if ((first_word & 0xffffff) == SPECIAL_WRITE) { if (first_word & REPLACE_WITH_TIMESTAMP_FLAG) { + ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(envForTxn->env); uint32_t next_32 = first_word >> 32; if (next_32 & 4) { // preserve last timestamp @@ -306,11 +308,11 @@ next_inst: start = instruction++; rc = mdb_get(txn, dbi, &key, &last_data); if (rc) break; if (flags & SET_VERSION) last_data.mv_data = (char *) last_data.mv_data + 8; - previous_time = *(uint64_t *) last_data.mv_data; + extended_env->previousTime = *(uint64_t *) last_data.mv_data; //fprintf(stderr, "previous time %llx \n", previous_time); } - uint64_t timestamp = (next_32 & 1) ? (next_32 & 2) ? previous_time : last_time_double() - : next_time_double(); + uint64_t timestamp = (next_32 & 1) ? (next_32 & 2) ? extended_env->previousTime : extended_env->getLastTime() + : extended_env->getNextTime(); if (first_word & DIRECT_WRITE) { // write to second word, which is used by the direct write *((uint64_t *) value.mv_data + 1) = timestamp ^ (next_32 >> 8); From b27f30da2ede3eac9e9ac0f55f2c077c6f69f197 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 15 Sep 2023 09:04:35 -0600 Subject: [PATCH 27/33] Reuse thread local read transactions for prefetches --- src/dbi.cpp | 10 ++++++---- src/env.cpp | 38 ++++++++++++++++++++++++++++++-------- src/lmdb-js.h | 2 ++ 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/src/dbi.cpp b/src/dbi.cpp index 13b455591..b139c8737 100644 --- a/src/dbi.cpp +++ b/src/dbi.cpp @@ -276,8 +276,8 @@ NAPI_FUNCTION(getStringByBinary) { } int DbiWrap::prefetch(uint32_t* keys) { - MDB_txn* txn; - mdb_txn_begin(ew->env, nullptr, MDB_RDONLY, &txn); + MDB_txn* txn = ExtendedEnv::getReadTxn(ew->env, true); + mdb_txn_renew(txn); MDB_val key; MDB_val data; unsigned int flags; @@ -287,8 +287,10 @@ int DbiWrap::prefetch(uint32_t* keys) { bool findDataValue = false; MDB_cursor *cursor; int rc = mdb_cursor_open(txn, dbi, &cursor); - if (rc) + if (rc) { + mdb_txn_reset(txn); return rc; + } while((key.mv_size = *keys++) > 0) { if (key.mv_size == 0xffffffff) { @@ -327,7 +329,7 @@ int DbiWrap::prefetch(uint32_t* keys) { } } mdb_cursor_close(cursor); - mdb_txn_abort(txn); + mdb_txn_reset(txn); return effected; } diff --git a/src/env.cpp b/src/env.cpp index 954e19fd9..6a51a43c4 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -614,17 +614,21 @@ void EnvWrap::closeEnv(bool hasLock) { if (!env) return; // unlock any record locks held by this thread/EnvWrap - ExtendedEnv* record_locks = (ExtendedEnv*) mdb_env_get_userctx(env); - pthread_mutex_lock(&record_locks->locksModificationLock); - auto it = record_locks->lock_callbacks.begin(); - while (it != record_locks->lock_callbacks.end()) + ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(env); + + pthread_mutex_lock(&extended_env->locksModificationLock); + auto it = extended_env->lock_callbacks.begin(); + while (it != extended_env->lock_callbacks.end()) { if (it->second.ew == this) { notifyCallbacks(it->second.callbacks); - it = record_locks->lock_callbacks.erase(it); + it = extended_env->lock_callbacks.erase(it); } else ++it; } - pthread_mutex_unlock(&record_locks->locksModificationLock); + pthread_mutex_unlock(&extended_env->locksModificationLock); + MDB_txn* txn = ExtendedEnv::getReadTxn(env, false); + if (txn) + mdb_txn_abort(txn); if (openEnvWraps) { for (auto ewRef = openEnvWraps->begin(); ewRef != openEnvWraps->end(); ) { @@ -976,7 +980,7 @@ int32_t writeFFI(double ewPointer, uint64_t instructionAddress) { } return rc; } - +thread_local std::unordered_map* ExtendedEnv::prefetchTxns = nullptr; ExtendedEnv::ExtendedEnv() { pthread_mutex_init(&locksModificationLock, nullptr); } @@ -993,7 +997,6 @@ uint64_t ExtendedEnv::getLastTime() { } bool ExtendedEnv::attemptLock(std::string key, napi_env env, napi_value func, bool has_callback, EnvWrap* ew) { pthread_mutex_lock(&locksModificationLock); - lock_callbacks.find(key); auto resolution = lock_callbacks.find(key); bool found; if (resolution == lock_callbacks.end()) { @@ -1073,6 +1076,25 @@ NAPI_FUNCTION(unlock) { return return_value; } +MDB_txn* ExtendedEnv::getReadTxn(MDB_env* env, bool begin_if_none) { + auto extended_env = (ExtendedEnv*) mdb_env_get_userctx(env); + if (!prefetchTxns) { + if (begin_if_none) + prefetchTxns = new std::unordered_map(); + else return nullptr; + } + auto txn_ref = prefetchTxns->find(extended_env); + if (txn_ref == prefetchTxns->end()) { + if (begin_if_none) { + MDB_txn *txn; + mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); + prefetchTxns->emplace(extended_env, txn); + return txn; + } else return nullptr; + } + return txn_ref->second; +} + void EnvWrap::setupExports(Napi::Env env, Object exports) { // EnvWrap: Prepare constructor template Function EnvClass = ObjectWrap::DefineClass(env, "Env", { diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 5e436674c..9ffcf5551 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -283,6 +283,7 @@ class ExtendedEnv { public: ExtendedEnv(); ~ExtendedEnv(); + static thread_local std::unordered_map* prefetchTxns; std::unordered_map lock_callbacks; pthread_mutex_t locksModificationLock; uint64_t lastTime; // actually encoded as double @@ -291,6 +292,7 @@ class ExtendedEnv { bool unlock(std::string key, bool only_check); uint64_t getNextTime(); uint64_t getLastTime(); + static MDB_txn* getReadTxn(MDB_env* env, bool begin_if_none); }; class EnvWrap : public ObjectWrap { From b6040d7b2d9e2649c79fccb913ec470ed41a96dc Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 15 Sep 2023 09:21:04 -0600 Subject: [PATCH 28/33] Update version --- package.json | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/package.json b/package.json index 6c3e896ce..4852ac2c1 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lmdb", "author": "Kris Zyp", - "version": "2.9.0-locks.2", + "version": "2.9.0-alpha.1", "description": "Simple, efficient, scalable, high-performance LMDB interface", "license": "MIT", "repository": { @@ -105,11 +105,5 @@ "test": "tests" }, "optionalDependencies": { - "@lmdb/lmdb-darwin-arm64": "2.9.0-locks.2", - "@lmdb/lmdb-darwin-x64": "2.9.0-locks.2", - "@lmdb/lmdb-linux-arm": "2.9.0-locks.2", - "@lmdb/lmdb-linux-arm64": "2.9.0-locks.2", - "@lmdb/lmdb-linux-x64": "2.9.0-locks.2", - "@lmdb/lmdb-win32-x64": "2.9.0-locks.2" } } \ No newline at end of file From 172b24b6f05691a3f91d11006e56860afb1b7cea Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Fri, 15 Sep 2023 14:37:41 -0600 Subject: [PATCH 29/33] Redo the prefetch transactions as a fixed cache --- src/dbi.cpp | 7 ++--- src/env.cpp | 77 +++++++++++++++++++++++++++++++++++---------------- src/lmdb-js.h | 7 +++-- 3 files changed, 61 insertions(+), 30 deletions(-) diff --git a/src/dbi.cpp b/src/dbi.cpp index b139c8737..a0730c0d5 100644 --- a/src/dbi.cpp +++ b/src/dbi.cpp @@ -276,8 +276,7 @@ NAPI_FUNCTION(getStringByBinary) { } int DbiWrap::prefetch(uint32_t* keys) { - MDB_txn* txn = ExtendedEnv::getReadTxn(ew->env, true); - mdb_txn_renew(txn); + MDB_txn* txn = ExtendedEnv::getPrefetchReadTxn(ew->env); MDB_val key; MDB_val data; unsigned int flags; @@ -288,7 +287,7 @@ int DbiWrap::prefetch(uint32_t* keys) { MDB_cursor *cursor; int rc = mdb_cursor_open(txn, dbi, &cursor); if (rc) { - mdb_txn_reset(txn); + ExtendedEnv::donePrefetchReadTxn(txn); return rc; } @@ -329,7 +328,7 @@ int DbiWrap::prefetch(uint32_t* keys) { } } mdb_cursor_close(cursor); - mdb_txn_reset(txn); + ExtendedEnv::donePrefetchReadTxn(txn); return effected; } diff --git a/src/env.cpp b/src/env.cpp index 6a51a43c4..c213971e9 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -9,6 +9,9 @@ using namespace Napi; #define IGNORE_NOTFOUND (1) + +MDB_txn* ExtendedEnv::prefetchTxns[20]; +pthread_mutex_t* ExtendedEnv::prefetchTxnsLock; env_tracking_t* EnvWrap::envTracking = EnvWrap::initTracking(); thread_local std::vector* EnvWrap::openEnvWraps = nullptr; thread_local js_buffers_t* EnvWrap::sharedBuffers = nullptr; @@ -18,6 +21,8 @@ void* getSharedBuffers() { } env_tracking_t* EnvWrap::initTracking() { + ExtendedEnv::prefetchTxnsLock = new pthread_mutex_t; + pthread_mutex_init(ExtendedEnv::prefetchTxnsLock, nullptr); env_tracking_t* tracking = new env_tracking_t; tracking->envsLock = new pthread_mutex_t; pthread_mutex_init(tracking->envsLock, nullptr); @@ -293,7 +298,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, this->compression = compression; this->jsFlags = jsFlags; #ifdef MDB_OVERLAPPINGSYNC - ExtendedEnv* resolutions; + ExtendedEnv* extended_env; #endif int rc; rc = mdb_env_set_maxdbs(env, maxDbs); @@ -327,8 +332,8 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, flags |= MDB_PREVSNAPSHOT; } mdb_env_set_callback(env, checkExistingEnvs); - resolutions = new ExtendedEnv(); - mdb_env_set_userctx(env, resolutions); + extended_env = new ExtendedEnv(); + mdb_env_set_userctx(env, extended_env); #endif timeTxnWaiting = 0; @@ -341,7 +346,7 @@ int EnvWrap::openEnv(int flags, int jsFlags, const char* path, char* keyBuffer, if (rc != 0) { #ifdef MDB_OVERLAPPINGSYNC - delete resolutions; + delete extended_env; #endif if (rc == EXISTING_ENV_FOUND) { mdb_env_close(env); @@ -615,7 +620,6 @@ void EnvWrap::closeEnv(bool hasLock) { return; // unlock any record locks held by this thread/EnvWrap ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(env); - pthread_mutex_lock(&extended_env->locksModificationLock); auto it = extended_env->lock_callbacks.begin(); while (it != extended_env->lock_callbacks.end()) @@ -626,9 +630,6 @@ void EnvWrap::closeEnv(bool hasLock) { } else ++it; } pthread_mutex_unlock(&extended_env->locksModificationLock); - MDB_txn* txn = ExtendedEnv::getReadTxn(env, false); - if (txn) - mdb_txn_abort(txn); if (openEnvWraps) { for (auto ewRef = openEnvWraps->begin(); ewRef != openEnvWraps->end(); ) { @@ -650,6 +651,7 @@ void EnvWrap::closeEnv(bool hasLock) { envPath->hasWrites = true; if (envPath->count <= 0) { // last thread using it, we can really close it now + ExtendedEnv::removeReadTxns(env); unsigned int envFlags; // This is primarily useful for detecting termination of threads and sync'ing on their termination mdb_env_get_flags(env, &envFlags); #ifdef MDB_OVERLAPPINGSYNC @@ -980,7 +982,6 @@ int32_t writeFFI(double ewPointer, uint64_t instructionAddress) { } return rc; } -thread_local std::unordered_map* ExtendedEnv::prefetchTxns = nullptr; ExtendedEnv::ExtendedEnv() { pthread_mutex_init(&locksModificationLock, nullptr); } @@ -1076,23 +1077,51 @@ NAPI_FUNCTION(unlock) { return return_value; } -MDB_txn* ExtendedEnv::getReadTxn(MDB_env* env, bool begin_if_none) { - auto extended_env = (ExtendedEnv*) mdb_env_get_userctx(env); - if (!prefetchTxns) { - if (begin_if_none) - prefetchTxns = new std::unordered_map(); - else return nullptr; - } - auto txn_ref = prefetchTxns->find(extended_env); - if (txn_ref == prefetchTxns->end()) { - if (begin_if_none) { - MDB_txn *txn; - mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); - prefetchTxns->emplace(extended_env, txn); +MDB_txn* ExtendedEnv::getPrefetchReadTxn(MDB_env* env) { + MDB_txn* txn; + pthread_mutex_lock(prefetchTxnsLock); + // try to find an existing txn for this env + for (int i = 0; i < 20; i++) { + txn = prefetchTxns[i]; + if (txn && mdb_txn_env(txn) == env) { + mdb_txn_renew(txn); + prefetchTxns[i] = nullptr; // remove it, no one else can use it + pthread_mutex_unlock(prefetchTxnsLock); return txn; - } else return nullptr; + } + } + pthread_mutex_unlock(prefetchTxnsLock); + // couldn't find one, need to create a new transaction + mdb_txn_begin(env, nullptr, MDB_RDONLY, &txn); + return txn; +} +void ExtendedEnv::donePrefetchReadTxn(MDB_txn* txn) { + mdb_txn_reset(txn); + pthread_mutex_lock(prefetchTxnsLock); + // reinsert this transaction + MDB_txn* moving; + for (int i = 0; i < 20; i++) { + moving = prefetchTxns[i]; + prefetchTxns[i] = txn; + if (!moving) break; + txn = moving; + } + // if we are full and one has to be removed, abort it + if (moving) mdb_txn_abort(moving); + pthread_mutex_unlock(prefetchTxnsLock); +} + +void ExtendedEnv::removeReadTxns(MDB_env* env) { + pthread_mutex_lock(prefetchTxnsLock); + MDB_txn* txn; + for (int i = 0; i < 20; i++) { + txn = prefetchTxns[i]; + if (txn && mdb_txn_env(txn) == env) { + mdb_txn_abort(txn); + prefetchTxns[i] = nullptr; + } } - return txn_ref->second; + pthread_mutex_unlock(prefetchTxnsLock); } void EnvWrap::setupExports(Napi::Env env, Object exports) { diff --git a/src/lmdb-js.h b/src/lmdb-js.h index 9ffcf5551..3dbe92039 100644 --- a/src/lmdb-js.h +++ b/src/lmdb-js.h @@ -283,7 +283,8 @@ class ExtendedEnv { public: ExtendedEnv(); ~ExtendedEnv(); - static thread_local std::unordered_map* prefetchTxns; + static MDB_txn* prefetchTxns[20]; + static pthread_mutex_t* prefetchTxnsLock; std::unordered_map lock_callbacks; pthread_mutex_t locksModificationLock; uint64_t lastTime; // actually encoded as double @@ -292,7 +293,9 @@ class ExtendedEnv { bool unlock(std::string key, bool only_check); uint64_t getNextTime(); uint64_t getLastTime(); - static MDB_txn* getReadTxn(MDB_env* env, bool begin_if_none); + static MDB_txn* getPrefetchReadTxn(MDB_env* env); + static void donePrefetchReadTxn(MDB_txn* txn); + static void removeReadTxns(MDB_env* env); }; class EnvWrap : public ObjectWrap { From 879fae3d4330ac74d4840fefc9ab2922cd18d552 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 23 Oct 2023 06:33:07 -0600 Subject: [PATCH 30/33] Better handling of caching in sync transaction --- caching.js | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/caching.js b/caching.js index 68af834ba..fae9a915a 100644 --- a/caching.js +++ b/caching.js @@ -125,9 +125,18 @@ export const CachingStore = (Store, env) => { // don't cache binary data, since it will be decoded on get this.cache.delete(id); return result; - } - // sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed - let entry = this.cache.setValue(id, value, !result || result.isSync ? 0 : -1); + } + let entry; + if (result?.isSync) { + // sync operation, immediately add to cache + if (result.result) // if it succeeds + entry = this.cache.setValue(id, value, 0); + else { + this.cache.delete(id); + return result; + } // sync failure + // otherwise keep it pinned in memory until it is committed + } else entry = this.cache.setValue(id, value, -1); if (childTxnChanges) childTxnChanges.add(id); if (version !== undefined) @@ -136,19 +145,20 @@ export const CachingStore = (Store, env) => { return result; } putSync(id, value, version, ifVersion) { + let result = super.putSync(id, value, version, ifVersion); if (id !== 'object') { // sync operation, immediately add to cache, otherwise keep it pinned in memory until it is committed - if (value && typeof value === 'object') { + if (value && typeof value === 'object' || !result) { let entry = this.cache.setValue(id, value); if (childTxnChanges) childTxnChanges.add(id); if (version !== undefined) { entry.version = typeof version === 'object' ? version.version : version; } - } else // it is possible that a value used to exist here + } else // it is possible that a value used to exist here this.cache.delete(id); } - return super.putSync(id, value, version, ifVersion); + return result; } remove(id, ifVersion) { this.cache.delete(id); From bf20b4f645c808716c0b546440b5bf3702c2af8d Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 23 Oct 2023 06:33:42 -0600 Subject: [PATCH 31/33] Ensure that there is not multi-threaded access to reader list string that can cause a crash --- src/env.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/env.cpp b/src/env.cpp index c213971e9..afee5b3e5 100644 --- a/src/env.cpp +++ b/src/env.cpp @@ -795,9 +795,9 @@ Napi::Value EnvWrap::readerCheck(const CallbackInfo& info) { return Number::New(info.Env(), dead); } -Array readerStrings; +thread_local Array* readerStrings = nullptr; MDB_msg_func* printReaders = ([](const char* message, void* env) -> int { - readerStrings.Set(readerStrings.Length(), String::New(*(Env*)env, message)); + readerStrings->Set(readerStrings->Length(), String::New(*(Env*)env, message)); return 0; }); @@ -805,14 +805,15 @@ Napi::Value EnvWrap::readerList(const CallbackInfo& info) { if (!this->env) { return throwError(info.Env(), "The environment is already closed."); } - readerStrings = Array::New(info.Env()); + Array reader_strings = Array::New(info.Env()); + readerStrings = &reader_strings; int rc; Napi::Env env = info.Env(); rc = mdb_reader_list(this->env, printReaders, &env); if (rc != 0) { return throwLmdbError(info.Env(), rc); } - return readerStrings; + return reader_strings; } From 4d32eba0abb5ef4cc72cab8ee8f66783c04429dd Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 23 Oct 2023 06:34:06 -0600 Subject: [PATCH 32/33] Mark resolutions for sync access to results --- write.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/write.js b/write.js index c3a419d17..1cf2232b3 100644 --- a/write.js +++ b/write.js @@ -19,7 +19,9 @@ const CONDITIONAL_ALLOW_NOTFOUND = 0x800; const SYNC_PROMISE_SUCCESS = Promise.resolve(true); const SYNC_PROMISE_FAIL = Promise.resolve(false); SYNC_PROMISE_SUCCESS.isSync = true; +SYNC_PROMISE_SUCCESS.result = true; SYNC_PROMISE_FAIL.isSync = true; +SYNC_PROMISE_FAIL.result = false; const PROMISE_SUCCESS = Promise.resolve(true); export const ABORT = 4.452694326329068e-106; // random/unguessable numbers, which work across module/versions and native export const IF_EXISTS = 3.542694326329068e-103; From aa4b8caed1fb5492e4f6f714c67dfe3e6799fe85 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Mon, 23 Oct 2023 06:34:26 -0600 Subject: [PATCH 33/33] Properly distinguish sync and async iterators --- util/RangeIterable.js | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/util/RangeIterable.js b/util/RangeIterable.js index b3e1fccaf..d9573a429 100644 --- a/util/RangeIterable.js +++ b/util/RangeIterable.js @@ -17,7 +17,7 @@ export class RangeIterable { let source = this; let iterable = new RangeIterable(); iterable.iterate = (async) => { - let iterator = source[Symbol.iterator](async); + let iterator = source[async ? Symbol.asyncIterator : Symbol.iterator](); let i = 0; return { next(resolvedResult) { @@ -30,6 +30,7 @@ export class RangeIterable { } else { iteratorResult = iterator.next(); if (iteratorResult.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); return iteratorResult.then(iteratorResult => this.next(iteratorResult)); } } @@ -40,6 +41,7 @@ export class RangeIterable { } result = func(iteratorResult.value, i++); if (result && result.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); return result.then(result => result === SKIP ? this.next() : @@ -69,7 +71,7 @@ export class RangeIterable { return iterable; } [Symbol.asyncIterator]() { - return this.iterator = this.iterate(); + return this.iterator = this.iterate(true); } [Symbol.iterator]() { return this.iterator = this.iterate(); @@ -101,11 +103,12 @@ export class RangeIterable { iterator = secondIterable[Symbol.iterator](async); result = iterator.next(); if (concatIterable.onDone) { - if (result.then) + if (result.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); result.then((result) => { if (result.done()) concatIterable.onDone(); }); - else if (result.done) concatIterable.onDone(); + } else if (result.done) concatIterable.onDone(); } } else { if (concatIterable.onDone) concatIterable.onDone(); @@ -115,11 +118,13 @@ export class RangeIterable { return { next() { let result = iterator.next(); - if (result.then) + if (result.then) { + if (!async) throw new Error('Can synchronously iterate with asynchronous values'); return result.then((result) => { if (result.done) return iteratorDone(result); return result; }); + } if (result.done) return iteratorDone(result); return result; },