From 74afa740c302e27bd4775de2434d03a270e6ddfa Mon Sep 17 00:00:00 2001 From: Pasin Suriyentrakorn Date: Tue, 9 Apr 2024 13:05:01 -0700 Subject: [PATCH] CBL-5606 : Fix lock caused by saving doc and notifying change * Issue : When notify collection change, the code needs to obtain the database lock in order to get the database instance from the collection object. The deadlock could occurred if a document save happens at the same time on the other thread as the document save will need to wait to open its transaction while the other transaction is still opened by the notification thead. * Solution : Keep the database pointer without resetting its to null inside the collection objects. As the collection objects cannot retain the database object to avoid retain cycle (collections are cached inside the database object), the database object will need to retain else where where it is being used. Now, the database is explicity retained by Document, Listener Token (Collection and Document Change Listener), and ReplicatorConfiguration objects. * In addtion, moved the logic to generate the effective replication collections, retain Collection objects and retain Database object from CBLReplicator class into the ReplicatorConfiguration class so those logics are done together in a single place. Added missing document ids filter tests to ensure that the moved logic work correctly for the document ids filter. --- src/CBLCollection.cc | 56 ++++++++---- src/CBLCollection_CAPI.cc | 2 +- src/CBLCollection_Internal.hh | 39 +++------ src/CBLDocument.cc | 9 +- src/CBLDocument_Internal.hh | 1 + src/CBLPrivate.h | 4 +- src/CBLReplicatorConfig.hh | 158 +++++++++++++++++++++------------- src/CBLReplicator_Internal.hh | 49 ++--------- src/Listener.hh | 2 +- test/DatabaseTest.cc | 66 ++++++++++++-- test/ReplicatorEETest.cc | 38 ++++++++ 11 files changed, 266 insertions(+), 158 deletions(-) diff --git a/src/CBLCollection.cc b/src/CBLCollection.cc index f7b91481..ce96b49c 100644 --- a/src/CBLCollection.cc +++ b/src/CBLCollection.cc @@ -23,6 +23,31 @@ using namespace fleece; using namespace cbl_internal; namespace cbl_internal { + template<> + struct ListenerToken : public CBLListenerToken { + public: + ListenerToken(CBLCollection *collection, CBLCollectionChangeListener callback, void *context) + :CBLListenerToken((const void*)callback, context) + ,_collection(collection) + ,_database(collection->database()) { } + + ~ListenerToken() { } + + CBLCollectionChangeListener _cbl_nullable callback() const { + return (CBLCollectionChangeListener)_callback; + } + + void call(const CBLCollectionChange* change) { + std::lock_guard lock(_mutex); + auto cb = callback(); + if (cb) { + cb(_context, change); + } + } + private: + Retained _collection; + Retained _database; + }; template<> struct ListenerToken : public CBLListenerToken { @@ -31,6 +56,7 @@ namespace cbl_internal { CBLCollectionDocumentChangeListener callback, void *context) :CBLListenerToken((const void*)callback, context) ,_collection(collection) + ,_database(collection->database()) ,_docID(docID) { _c4obs = _collection->useLocked()->observeDocument(docID, [this](C4DocumentObserver*, @@ -69,32 +95,30 @@ namespace cbl_internal { CBLDocumentChange change = {}; change.collection = _collection; change.docID = _docID; - - Retained db; - try { - db = _collection->database(); - } catch (...) { - C4Error error = C4Error::fromCurrentException(); - CBL_Log(kCBLLogDomainDatabase, kCBLLogWarning, - "Document changed notification failed: %s", error.description().c_str()); - } - - if (db) { - db->notify(this, change); - } + _database->notify(this, change); } Retained _collection; + Retained _database; alloc_slice _docID; std::unique_ptr _c4obs; }; +} +Retained CBLCollection::addChangeListener(CBLCollectionChangeListener listener, + void* _cbl_nullable ctx) +{ + auto lock =_c4col.useLocked(); // Ensure the database lifetime while creating the Listener oken + auto token = addListener([&] { return new ListenerToken(this, listener, ctx); }); + _listeners.add((ListenerToken*)token.get()); + return token; } -Retained -CBLCollection::addDocumentListener(slice docID, CBLCollectionDocumentChangeListener listener, - void* _cbl_nullable ctx) +Retained CBLCollection::addDocumentListener(slice docID, + CBLCollectionDocumentChangeListener listener, + void* _cbl_nullable ctx) { + auto lock =_c4col.useLocked(); // // Ensure the database lifetime while creating the Listener oken auto token = new ListenerToken(this, docID, listener, ctx); _docListeners.add(token); return token; diff --git a/src/CBLCollection_CAPI.cc b/src/CBLCollection_CAPI.cc index 89d2e8a5..034d1033 100644 --- a/src/CBLCollection_CAPI.cc +++ b/src/CBLCollection_CAPI.cc @@ -113,7 +113,7 @@ uint64_t CBLCollection_Count(const CBLCollection* collection) noexcept { } catchAndWarn() } -/** Private API */ +/** Private API used in tests. */ CBLDatabase* CBLCollection_Database(const CBLCollection* collection) noexcept { try { return collection->database(); diff --git a/src/CBLCollection_Internal.hh b/src/CBLCollection_Internal.hh index bc6817fc..0c9ea994 100644 --- a/src/CBLCollection_Internal.hh +++ b/src/CBLCollection_Internal.hh @@ -36,6 +36,7 @@ public: CBLCollection(C4Collection* c4col, CBLScope* scope, CBLDatabase* database) :_c4col(c4col, database) ,_scope(scope) + ,_db(database) ,_name(c4col->getName()) { } @@ -47,9 +48,7 @@ public: bool isValid() const noexcept {return _c4col.isValid();} uint64_t count() const {return _c4col.useLocked()->getDocumentCount();} uint64_t lastSequence() const {return static_cast(_c4col.useLocked()->getLastSequence());} - - /** Throw NotOpen if the collection or database is invalid */ - CBLDatabase* database() const {return _c4col.database();} + CBLDatabase* database() const {return _db;} #pragma mark - DOCUMENTS: @@ -134,10 +133,7 @@ public: #pragma mark - LISTENERS Retained addChangeListener(CBLCollectionChangeListener listener, - void* _cbl_nullable ctx) - { - return addListener([&]{ return _listeners.add(listener, ctx); }); - } + void* _cbl_nullable ctx); Retained addDocumentListener(slice docID, CBLCollectionDocumentChangeListener listener, @@ -196,26 +192,16 @@ private: Retained addListener(fleece::function_ref()> cb) { Retained token = cb(); - if (!_observer) + if (!_observer) { _observer = _c4col.useLocked()->observe([this](C4CollectionObserver*) { this->collectionChanged(); }); + } return token; } void collectionChanged() { - Retained db; - try { - db = database(); - } catch (...) { - C4Error error = C4Error::fromCurrentException(); - CBL_Log(kCBLLogDomainDatabase, kCBLLogWarning, - "Collection changed notification failed: %s", error.description().c_str()); - } - - if (db) { - db->notify(std::bind(&CBLCollection::callCollectionChangeListeners, this)); - } + _db->notify(std::bind(&CBLCollection::callCollectionChangeListeners, this)); } void callCollectionChangeListeners() { @@ -253,7 +239,6 @@ private: :shared_access_lock(std::move(c4col), *database->c4db()) ,_c4db(database->c4db()) ,_col(c4col) - ,_db(database) { _sentry = [this](C4Collection* c4col) { if (!_isValid()) { @@ -268,28 +253,24 @@ private: return _isValid(); } - CBLDatabase* database() const { - auto lock = useLocked(); - return _db; - } - /** Invalidate the database pointer */ void close() noexcept { LOCK_GUARD lock(getMutex()); - _db = nullptr; + _isClosed = true; } private: - bool _isValid() const noexcept { return _db && _col->isValid(); } + bool _isValid() const noexcept { return !_isClosed && _col->isValid(); } CBLDatabase::SharedC4DatabaseAccessLock _c4db; // For retaining the shared lock - CBLDatabase* _cbl_nullable _db; C4Collection* _col; + bool _isClosed {false}; }; #pragma mark - VARIABLES : C4CollectionAccessLock _c4col; // Shared lock with _c4db + CBLDatabase* _db; alloc_slice _name; Retained _scope; diff --git a/src/CBLDocument.cc b/src/CBLDocument.cc index b2650a3d..fea36243 100644 --- a/src/CBLDocument.cc +++ b/src/CBLDocument.cc @@ -47,6 +47,7 @@ using namespace cbl_internal; CBLDocument::CBLDocument(slice docID, CBLCollection *collection, C4Document *c4doc, bool isMutable) :_docID(docID) ,_collection(collection) +,_database(collection ? collection->database() : nullptr) ,_c4doc(c4doc) ,_mutable(isMutable) { @@ -68,8 +69,7 @@ CBLDocument::~CBLDocument() { CBLDatabase* _cbl_nullable CBLDocument::database() const { - // Could throw kC4ErrorNotOpen if the collection is deleted, or database is released. - return _collection ? _collection->database() : nullptr; + return _database; } @@ -113,6 +113,7 @@ bool CBLDocument::save(CBLCollection* collection, const SaveOptions &opt) { alloc_slice body; C4RevisionFlags revFlags; + if (!opt.deleting) { body = encodeBody(collection->database(), c4db, false, revFlags); } else { @@ -143,6 +144,8 @@ bool CBLDocument::save(CBLCollection* collection, const SaveOptions &opt) { // Success: t.commit(); _collection = collection; + _database = collection->database(); + // HACK: Replace the inner reference of the c4doc with the one from newDoc. c4doc.get() = std::move(newDoc); _revID = c4doc->selectedRev().revID; @@ -234,7 +237,7 @@ bool CBLDocument::resolveConflict(Resolution resolution, const CBLDocument * _cb // is true, the remote revision will be kept as is and the losing branch will be pruned. if (resolution != Resolution::useRemote) { if (resolveDoc) { - mergeBody = resolveDoc->encodeBody(_collection->database(), c4db, true, mergeFlags); + mergeBody = resolveDoc->encodeBody(_database, c4db, true, mergeFlags); } else { mergeBody = alloc_slice(size_t(0)); mergeFlags = kRevDeleted; diff --git a/src/CBLDocument_Internal.hh b/src/CBLDocument_Internal.hh index 104ff4a8..b460a7f8 100644 --- a/src/CBLDocument_Internal.hh +++ b/src/CBLDocument_Internal.hh @@ -319,6 +319,7 @@ private: #endif Retained _collection; // Collection (null for new doc) + Retained _database; // Database (null for new doc) litecore::access_lock> _c4doc; // LiteCore doc (null for new doc) alloc_slice const _docID; // Document ID (never empty) mutable alloc_slice _revID; // Revision ID diff --git a/src/CBLPrivate.h b/src/CBLPrivate.h index 355bc19d..36962066 100644 --- a/src/CBLPrivate.h +++ b/src/CBLPrivate.h @@ -28,8 +28,8 @@ CBL_CAPI_BEGIN void CBLLog_BeginExpectingExceptions() CBLAPI; void CBLLog_EndExpectingExceptions() CBLAPI; -/** Returns the collection's database, or NULL if the collection is invalid, or the database is released. */ - CBLDatabase* _cbl_nullable CBLCollection_Database(const CBLCollection*) CBLAPI; +/** Returns the collection's database pointer which is unretained. This is used by tests. */ + CBLDatabase* CBLCollection_Database(const CBLCollection*) CBLAPI; /** Returns the last sequence number assigned in the database (default collection). This starts at zero and increments every time a document is saved or deleted. */ diff --git a/src/CBLReplicatorConfig.hh b/src/CBLReplicatorConfig.hh index 2ee82d8a..4fde110b 100644 --- a/src/CBLReplicatorConfig.hh +++ b/src/CBLReplicatorConfig.hh @@ -175,23 +175,18 @@ namespace cbl_internal using Dict = fleece::Dict; using slice = fleece::slice; using Array = fleece::Array; + template using Retained = fleece::Retained; public: ReplicatorConfiguration(const CBLReplicatorConfiguration &conf) { *(CBLReplicatorConfiguration*)this = conf; - retain(database); - if (endpoint) - endpoint = endpoint->clone(); - if (collections) { - // Copy collections and retain the collection object inside: - for (int i = 0; i < collectionCount; i++) { - retain(collections[i].collection); - _collections.push_back(collections[i]); - } - collections = _collections.data(); + validate(); + + if (endpoint) { + endpoint = endpoint->clone(); } - + authenticator = authenticator ? authenticator->clone() : nullptr; headers = FLDict_MutableCopy(headers, kFLDeepCopyImmutables); channels = FLArray_MutableCopy(channels, kFLDeepCopyImmutables); @@ -214,57 +209,54 @@ namespace cbl_internal Dict headersDict = Dict(headers); fleece::Value userAgent = headersDict[kCBLReplicatorUserAgent]; _userAgent = userAgent ? userAgent.asstring() : createUserAgentHeader(); - } - - ~ReplicatorConfiguration() { - release(database); - for (int i = 0; i < collectionCount; i++) { - release(_collections[i].collection); + if (collections) { + // Copy replication collections, channels, and document ids: + for (int i = 0; i < collectionCount; i++) { + CBLReplicationCollection col = collections[i]; + col.channels = FLArray_MutableCopy(col.channels, kFLDeepCopyImmutables); + col.documentIDs = FLArray_MutableCopy(col.documentIDs, kFLDeepCopyImmutables); + _effectiveCollections.push_back(col); + } + collections = _effectiveCollections.data(); + } else { + // Create a replication collection using the default collection: + CBLReplicationCollection col {}; + col.collection = database->getDefaultCollection(true).get(); + col.conflictResolver = conflictResolver; + col.pushFilter = pushFilter; + col.pullFilter = pullFilter; + col.channels = FLArray_Retain(channels); // Already copied + col.documentIDs = FLArray_Retain(documentIDs); // Already copied + _effectiveCollections.push_back(col); } + // Check valid & retain the collections and database: + for (auto& col : _effectiveCollections) { + if (!col.collection->isValid()) { + C4Error::raise(LiteCoreDomain, kC4ErrorInvalidParameter, + "An invalid collection was found in the configuration."); + } + _retainedCollections.push_back(col.collection); + if (!_retainedDatabase) { + _retainedDatabase = col.collection->database(); + } + } + } + + ~ReplicatorConfiguration() { CBLEndpoint_Free(endpoint); CBLAuth_Free(authenticator); FLDict_Release(headers); FLArray_Release(channels); FLArray_Release(documentIDs); + + for (auto& col : _effectiveCollections) { + FLArray_Release(col.channels); + FLArray_Release(col.documentIDs); + } } - - void validate() const { - const char *problem = nullptr; - if (!database && !collections) - problem = "Invalid config: Missing both database and collections"; - else if (database && collections) - problem = "Invalid config: Both database and collections are set at same time"; - else if (collections && collectionCount == 0) - problem = "Invalid config: collectionCount is zero"; - else if ((documentIDs || channels || pushFilter || pullFilter) && !database) - problem = "Invalid config: Cannot use documentIDs, channels, pushFilter or " - "pullFilter when collections is set. Set the properties in " - "CBLReplicationCollection instead."; - else if (conflictResolver && !database) - problem = "Invalid config: Cannot use conflictResolver when collections is set. " - "Set the property in CBLReplicationCollection instead."; - #ifdef COUCHBASE_ENTERPRISE - else if ((propertyEncryptor || propertyDecryptor ) && !database) - problem = "Invalid config: Cannot use propertyEncryptor or propertyDecryptor " - "when collections is set. Use documentPropertyEncryptor or " - "documentPropertyDecryptor instead."; - #endif - else if (!endpoint || replicatorType > kCBLReplicatorTypePull) - problem = "Invalid config: Missing endpoints or bad type"; - else if (!endpoint->valid()) - problem = "Invalid endpoint"; - else if (proxy && (proxy->type > kCBLProxyHTTPS || - !proxy->hostname.buf || !proxy->port)) - problem = "Invalid replicator proxy settings"; - - if (problem) - C4Error::raise(LiteCoreDomain, kC4ErrorInvalidParameter, "%s", problem); - } - - // Writes a LiteCore replicator optionsDict void writeOptions(Encoder &enc) const { fleece::MutableDict mHeaders = headers ? FLDict_AsMutable(headers) : FLMutableDict_New(); @@ -345,17 +337,63 @@ namespace cbl_internal writeOptionalKey(enc, kC4ReplicatorOptionChannels, Array(collection.channels)); } - slice getUserAgent() const { - return slice(_userAgent); - } - + slice getUserAgent() const { return slice(_userAgent); } + CBLDatabase* effectiveDatabase() const { return _retainedDatabase; } + const std::vector& effectiveCollections() const { return _effectiveCollections; } + ReplicatorConfiguration(const ReplicatorConfiguration&) =delete; ReplicatorConfiguration& operator=(const ReplicatorConfiguration&) =delete; - + private: using string = std::string; using alloc_slice = fleece::alloc_slice; + void validate() const { + const char *problem = nullptr; + if (!database && !collections) + problem = "Invalid config: Missing both database and collections"; + else if (database && collections) + problem = "Invalid config: Both database and collections are set at same time"; + else if (collections && collectionCount == 0) + problem = "Invalid config: collectionCount is zero"; + else if ((documentIDs || channels || pushFilter || pullFilter) && !database) + problem = "Invalid config: Cannot use documentIDs, channels, pushFilter or " + "pullFilter when collections is set. Set the properties in " + "CBLReplicationCollection instead."; + else if (conflictResolver && !database) + problem = "Invalid config: Cannot use conflictResolver when collections is set. " + "Set the property in CBLReplicationCollection instead."; + #ifdef COUCHBASE_ENTERPRISE + else if ((propertyEncryptor || propertyDecryptor ) && !database) + problem = "Invalid config: Cannot use propertyEncryptor or propertyDecryptor " + "when collections is set. Use documentPropertyEncryptor or " + "documentPropertyDecryptor instead."; + #endif + else if (!endpoint || replicatorType > kCBLReplicatorTypePull) + problem = "Invalid config: Missing endpoints or bad type"; + else if (!endpoint->valid()) + problem = "Invalid endpoint"; + else if (proxy && (proxy->type > kCBLProxyHTTPS || + !proxy->hostname.buf || !proxy->port)) + problem = "Invalid replicator proxy settings"; + + if (collections) { + CBLDatabase* db = nullptr; + for (int i = 0; i < collectionCount; i++) { + if (!db) { + db = collections[i].collection->database(); + } else { + if (db != collections[i].collection->database()) { + problem = "Invalid config: collections are not from the same database instance."; + } + } + } + } + + if (problem) + C4Error::raise(LiteCoreDomain, kC4ErrorInvalidParameter, "%s", problem); + } + static slice copyString(slice str, alloc_slice &allocated) { allocated = alloc_slice(str); @@ -363,7 +401,11 @@ namespace cbl_internal } string _userAgent; - std::vector _collections; + std::vector _effectiveCollections; + + std::vector> _retainedCollections; + Retained _retainedDatabase; + alloc_slice _pinnedServerCert, _trustedRootCerts; CBLProxySettings _proxy; alloc_slice _proxyHostname, _proxyUsername, _proxyPassword; diff --git a/src/CBLReplicator_Internal.hh b/src/CBLReplicator_Internal.hh index 9ddf7435..ecc05a5a 100644 --- a/src/CBLReplicator_Internal.hh +++ b/src/CBLReplicator_Internal.hh @@ -71,55 +71,20 @@ public: static once_flag once; call_once(once, std::bind(&C4RegisterBuiltInWebSocket)); - if (_conf.database) { - _defaultCollection = _conf.database->getDefaultCollection(true); - } - - _conf.validate(); - // Set up the LiteCore replicator parameters: C4ReplicatorParameters params = { }; - // Construct params.collections and validate if collections - // are from the same database instance: auto type = _conf.continuous ? kC4Continuous : kC4OneShot; - size_t colsCount = _conf.collections ? _conf.collectionCount : 1; + auto effectiveCollections = _conf.effectiveCollections(); std::vector c4ReplCols; - c4ReplCols.reserve(colsCount); + c4ReplCols.reserve(effectiveCollections.size()); std::vector optionDicts; - optionDicts.reserve(colsCount); + optionDicts.reserve(effectiveCollections.size()); - for (int i = 0; i < colsCount; i++) { - CBLReplicationCollection replCol; - if (_conf.database) { - // If using .database, a C4ReplicationCollection with the default collection - // and the outer conflict resolver and filters will be construct: - assert(colsCount == 1); - replCol.collection = _defaultCollection.get(); - replCol.conflictResolver = _conf.conflictResolver; - replCol.pushFilter = _conf.pushFilter; - replCol.pullFilter = _conf.pullFilter; - replCol.channels = _conf.channels; - replCol.documentIDs = _conf.documentIDs; - } else { - replCol = _conf.collections[i]; - } - - if (!replCol.collection->isValid()) { - C4Error::raise(LiteCoreDomain, kC4ErrorInvalidParameter, - "An invalid collection was found in the configuration."); - } - - if (!_db) { - _db = replCol.collection->database(); - } else if (_db != replCol.collection->database()) { - C4Error::raise(LiteCoreDomain, kC4ErrorInvalidParameter, - "The collections are not from the same database object."); - } - + for (CBLReplicationCollection& replCol : effectiveCollections) { auto& col = c4ReplCols.emplace_back(); auto spec = replCol.collection->spec(); @@ -164,7 +129,7 @@ public: } params.collections = c4ReplCols.data(); - params.collectionCount = colsCount; + params.collectionCount = c4ReplCols.size(); params.callbackContext = this; params.onStatusChanged = [](C4Replicator* c4repl, C4ReplicatorStatus status, void *ctx) { @@ -217,6 +182,7 @@ public: params.optionsDictFleece = options; // Create the LiteCore replicator: + _db = _conf.effectiveDatabase(); _db->useLocked([&](C4Database *c4db) { #ifdef COUCHBASE_ENTERPRISE if (_conf.endpoint->otherLocalDB()) { @@ -527,8 +493,7 @@ private: recursive_mutex _mutex; ReplicatorConfiguration const _conf; - Retained _db; - Retained _defaultCollection; + CBLDatabase* _db; // Retained by _conf Retained _c4repl; unique_ptr _stoppable; ReplicationCollectionsMap _collections; // For filters and conflict resolver diff --git a/src/Listener.hh b/src/Listener.hh index f6977161..6d4cd605 100644 --- a/src/Listener.hh +++ b/src/Listener.hh @@ -177,7 +177,7 @@ namespace cbl_internal { return t; } - void add(ListenerToken* _cbl_nonnull token) {ListenersBase::add(token);} + void add(ListenerToken* _cbl_nonnull token) {ListenersBase::add(token);} void clear() {ListenersBase::clear();} bool empty() const {return ListenersBase::empty();} diff --git a/test/DatabaseTest.cc b/test/DatabaseTest.cc index b8afb914..259c2917 100644 --- a/test/DatabaseTest.cc +++ b/test/DatabaseTest.cc @@ -20,19 +20,34 @@ #include "CBLPrivate.h" #include "fleece/Fleece.hh" #include "fleece/Mutable.hh" +#include #include #include using namespace std; using namespace fleece; - static constexpr const slice kOtherDBName = "CBLTest_OtherDB"; static int dbListenerCalls = 0; static int fooListenerCalls = 0; +static mutex _sListenerMutex; static void dbListener(void *context, const CBLDatabase *db, unsigned nDocs, FLString *docIDs) { + lock_guard lock(_sListenerMutex); + + ++dbListenerCalls; + auto test = (CBLTest*)context; + CHECK(test->db == db); + CHECK(nDocs == 1); + CHECK(slice(docIDs[0]) == "foo"_sl); +} + +static void dbListenerWithDelay(void *context, const CBLDatabase *db, unsigned nDocs, FLString *docIDs) { + lock_guard lock(_sListenerMutex); + + this_thread::sleep_for(1000ms); + ++dbListenerCalls; auto test = (CBLTest*)context; CHECK(test->db == db); @@ -41,6 +56,8 @@ static void dbListener(void *context, const CBLDatabase *db, unsigned nDocs, FLS } static void fooListener(void *context, const CBLDatabase *db, FLString docID) { + lock_guard lock(_sListenerMutex); + ++fooListenerCalls; auto test = (CBLTest*)context; CHECK(test->db == db); @@ -1420,7 +1437,7 @@ TEST_CASE_METHOD(DatabaseTest, "Transaction Abort") { #pragma mark - LISTENERS: -TEST_CASE_METHOD(DatabaseTest, "Legacy -Database notifications") { +TEST_CASE_METHOD(DatabaseTest, "Legacy - Database notifications") { // Add a listener: dbListenerCalls = fooListenerCalls = 0; auto token = CBLDatabase_AddChangeListener(db, dbListener, this); @@ -1466,16 +1483,19 @@ TEST_CASE_METHOD(DatabaseTest, "Legacy - Remove Database Listener after releasin CBLListener_Remove(docToken); } - static int notificationsReadyCalls = 0; static void notificationsReady(void *context, CBLDatabase* db) { + lock_guard lock(_sListenerMutex); + ++notificationsReadyCalls; auto test = (CBLTest*)context; CHECK(test->db == db); } -static void dbListener2(void *context, const CBLDatabase *db, unsigned nDocs, FLString *docIDs) { +static void dbListenerForBufferNotification(void *context, const CBLDatabase *db, unsigned nDocs, FLString *docIDs) { + lock_guard lock(_sListenerMutex); + ++dbListenerCalls; auto test = (CBLTest*)context; CHECK(test->db == db); @@ -1487,17 +1507,18 @@ static void dbListener2(void *context, const CBLDatabase *db, unsigned nDocs, FL int barListenerCalls = 0; static void barListener(void *context, const CBLDatabase *db, FLString docID) { + lock_guard lock(_sListenerMutex); + ++barListenerCalls; auto test = (CBLTest*)context; CHECK(test->db == db); CHECK(docID == "bar"_sl); } - TEST_CASE_METHOD(DatabaseTest, "Scheduled database notifications") { // Add a listener: dbListenerCalls = fooListenerCalls = barListenerCalls = 0; - auto token = CBLDatabase_AddChangeListener(db, dbListener2, this); + auto token = CBLDatabase_AddChangeListener(db, dbListenerForBufferNotification, this); auto fooToken = CBLDatabase_AddDocumentChangeListener(db, "foo"_sl, fooListener, this); auto barToken = CBLDatabase_AddDocumentChangeListener(db, "bar"_sl, barListener, this); CBLDatabase_BufferNotifications(db, notificationsReady, this); @@ -1532,6 +1553,39 @@ TEST_CASE_METHOD(DatabaseTest, "Scheduled database notifications") { CBLListener_Remove(barToken); } +// CBSE-16738 +TEST_CASE_METHOD(DatabaseTest, "Legacy - Database change notifications from different db threads") { + CBLError error {}; + auto config = databaseConfig(); + auto anotherDB = CBLDatabase_Open(kDatabaseName, &config, &error); + REQUIRE(anotherDB); + + // Add a listener: + dbListenerCalls = fooListenerCalls = 0; + auto token = CBLDatabase_AddChangeListener(db, dbListenerWithDelay, this); + + auto createDoc = [&] (CBLDatabase* database) + { + CBLError error {}; + CBLDocument* doc = CBLDocument_CreateWithID("foo"_sl); + MutableDict props = CBLDocument_MutableProperties(doc); + props["greeting"] = "hello"; + CBLDatabase_SaveDocument(database, doc, &error); + CBLDocument_Release(doc); + }; + + thread t1([=]() { createDoc(db); }); + thread t2([=]() { createDoc(anotherDB); }); + + t1.join(); + t2.join(); + + CHECK(dbListenerCalls == 2); + CBLListener_Remove(token); + + CBLDatabase_Close(anotherDB, &error); + CBLDatabase_Release(anotherDB); +} #pragma mark - BLOBS: diff --git a/test/ReplicatorEETest.cc b/test/ReplicatorEETest.cc index 6411c1a6..5536a8ec 100644 --- a/test/ReplicatorEETest.cc +++ b/test/ReplicatorEETest.cc @@ -601,6 +601,44 @@ TEST_CASE_METHOD(ReplicatorLocalTest, "Document Replication Listener", "[Replica CHECK(replicatedDocIDs.empty()); } +TEST_CASE_METHOD(ReplicatorLocalTest, "DocIDs Push Filters", "[Replicator]") { + MutableDocument doc1("foo1"); + doc1["greeting"] = "Howdy!"; + db.saveDocument(doc1); + + MutableDocument doc2("foo2"); + doc2["greeting"] = "Howdy!"; + db.saveDocument(doc2); + + auto docIDs = FLMutableArray_NewFromJSON("[\"foo1\"]"_sl, NULL);; + config.replicatorType = kCBLReplicatorTypePush; + config.documentIDs = FLMutableArray_NewFromJSON("[\"foo1\"]"_sl, NULL);; + expectedDocumentCount = 1; + replicate(); + CHECK(asVector(replicatedDocIDs) == vector{"foo1"}); + + FLMutableArray_Release(docIDs); +} + +TEST_CASE_METHOD(ReplicatorLocalTest, "DocIDs Pull Filters", "[Replicator]") { + MutableDocument doc1("foo1"); + doc1["greeting"] = "Howdy!"; + otherDB.saveDocument(doc1); + + MutableDocument doc2("foo2"); + doc2["greeting"] = "Howdy!"; + otherDB.saveDocument(doc2); + + auto docIDs = FLMutableArray_NewFromJSON("[\"foo1\"]"_sl, NULL);; + config.replicatorType = kCBLReplicatorTypePull; + config.documentIDs = FLMutableArray_NewFromJSON("[\"foo1\"]"_sl, NULL);; + expectedDocumentCount = 1; + replicate(); + CHECK(asVector(replicatedDocIDs) == vector{"foo1"}); + + FLMutableArray_Release(docIDs); +} + class ReplicatorFilterTest : public ReplicatorLocalTest { public: