Skip to content

Commit

Permalink
CBL-5606 : Fix lock caused by saving doc and notifying change
Browse files Browse the repository at this point in the history
* Issue : When notify collection change, the code needs to obtain the database lock in order to get the database instance from the collection object. The deadlock could occurred if a document save happens at the same time on the other thread as the document save will need to wait to open its transaction while the other transaction is still opened by the notification thead.

* Solution : Keep the database pointer without resetting its to null inside the collection objects. As the collection objects cannot retain the database object to avoid retain cycle (collections are cached inside the database object), the database object will need to retain else where where it is being used. Now, the database is explicity retained by Document, Listener Token (Collection and Document Change Listener), and ReplicatorConfiguration objects.

* In addtion, moved the logic to generate the effective replication collections, retain Collection objects and retain Database object from CBLReplicator class into the ReplicatorConfiguration class so those logics are done together in a single place. Added missing document ids filter tests to ensure that the moved logic work correctly for the document ids filter.
  • Loading branch information
pasin committed Apr 9, 2024
1 parent 093eaca commit 74afa74
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 158 deletions.
56 changes: 40 additions & 16 deletions src/CBLCollection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,31 @@ using namespace fleece;
using namespace cbl_internal;

namespace cbl_internal {
template<>
struct ListenerToken<CBLCollectionChangeListener> : public CBLListenerToken {
public:
ListenerToken(CBLCollection *collection, CBLCollectionChangeListener callback, void *context)
:CBLListenerToken((const void*)callback, context)
,_collection(collection)
,_database(collection->database()) { }

~ListenerToken() { }

CBLCollectionChangeListener _cbl_nullable callback() const {
return (CBLCollectionChangeListener)_callback;
}

void call(const CBLCollectionChange* change) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
auto cb = callback();
if (cb) {
cb(_context, change);
}
}
private:
Retained<CBLCollection> _collection;
Retained<CBLDatabase> _database;
};

template<>
struct ListenerToken<CBLCollectionDocumentChangeListener> : public CBLListenerToken {
Expand All @@ -31,6 +56,7 @@ namespace cbl_internal {
CBLCollectionDocumentChangeListener callback, void *context)
:CBLListenerToken((const void*)callback, context)
,_collection(collection)
,_database(collection->database())
,_docID(docID)
{
_c4obs = _collection->useLocked()->observeDocument(docID, [this](C4DocumentObserver*,
Expand Down Expand Up @@ -69,32 +95,30 @@ namespace cbl_internal {
CBLDocumentChange change = {};
change.collection = _collection;
change.docID = _docID;

Retained<CBLDatabase> db;
try {
db = _collection->database();
} catch (...) {
C4Error error = C4Error::fromCurrentException();
CBL_Log(kCBLLogDomainDatabase, kCBLLogWarning,
"Document changed notification failed: %s", error.description().c_str());
}

if (db) {
db->notify(this, change);
}
_database->notify(this, change);
}

Retained<CBLCollection> _collection;
Retained<CBLDatabase> _database;
alloc_slice _docID;
std::unique_ptr<C4DocumentObserver> _c4obs;
};
}

Retained<CBLListenerToken> CBLCollection::addChangeListener(CBLCollectionChangeListener listener,
void* _cbl_nullable ctx)
{
auto lock =_c4col.useLocked(); // Ensure the database lifetime while creating the Listener oken
auto token = addListener([&] { return new ListenerToken<CBLCollectionChangeListener>(this, listener, ctx); });
_listeners.add((ListenerToken<CBLCollectionChangeListener>*)token.get());
return token;
}

Retained<CBLListenerToken>
CBLCollection::addDocumentListener(slice docID, CBLCollectionDocumentChangeListener listener,
void* _cbl_nullable ctx)
Retained<CBLListenerToken> CBLCollection::addDocumentListener(slice docID,
CBLCollectionDocumentChangeListener listener,
void* _cbl_nullable ctx)
{
auto lock =_c4col.useLocked(); // // Ensure the database lifetime while creating the Listener oken
auto token = new ListenerToken<CBLCollectionDocumentChangeListener>(this, docID, listener, ctx);
_docListeners.add(token);
return token;
Expand Down
2 changes: 1 addition & 1 deletion src/CBLCollection_CAPI.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ uint64_t CBLCollection_Count(const CBLCollection* collection) noexcept {
} catchAndWarn()
}

/** Private API */
/** Private API used in tests. */
CBLDatabase* CBLCollection_Database(const CBLCollection* collection) noexcept {
try {
return collection->database();
Expand Down
39 changes: 10 additions & 29 deletions src/CBLCollection_Internal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public:
CBLCollection(C4Collection* c4col, CBLScope* scope, CBLDatabase* database)
:_c4col(c4col, database)
,_scope(scope)
,_db(database)
,_name(c4col->getName())
{ }

Expand All @@ -47,9 +48,7 @@ public:
bool isValid() const noexcept {return _c4col.isValid();}
uint64_t count() const {return _c4col.useLocked()->getDocumentCount();}
uint64_t lastSequence() const {return static_cast<uint64_t>(_c4col.useLocked()->getLastSequence());}

/** Throw NotOpen if the collection or database is invalid */
CBLDatabase* database() const {return _c4col.database();}
CBLDatabase* database() const {return _db;}

#pragma mark - DOCUMENTS:

Expand Down Expand Up @@ -134,10 +133,7 @@ public:
#pragma mark - LISTENERS

Retained<CBLListenerToken> addChangeListener(CBLCollectionChangeListener listener,
void* _cbl_nullable ctx)
{
return addListener([&]{ return _listeners.add(listener, ctx); });
}
void* _cbl_nullable ctx);

Retained<CBLListenerToken> addDocumentListener(slice docID,
CBLCollectionDocumentChangeListener listener,
Expand Down Expand Up @@ -196,26 +192,16 @@ private:

Retained<CBLListenerToken> addListener(fleece::function_ref<Retained<CBLListenerToken>()> cb) {
Retained<CBLListenerToken> token = cb();
if (!_observer)
if (!_observer) {
_observer = _c4col.useLocked()->observe([this](C4CollectionObserver*) {
this->collectionChanged();
});
}
return token;
}

void collectionChanged() {
Retained<CBLDatabase> db;
try {
db = database();
} catch (...) {
C4Error error = C4Error::fromCurrentException();
CBL_Log(kCBLLogDomainDatabase, kCBLLogWarning,
"Collection changed notification failed: %s", error.description().c_str());
}

if (db) {
db->notify(std::bind(&CBLCollection::callCollectionChangeListeners, this));
}
_db->notify(std::bind(&CBLCollection::callCollectionChangeListeners, this));
}

void callCollectionChangeListeners() {
Expand Down Expand Up @@ -253,7 +239,6 @@ private:
:shared_access_lock(std::move(c4col), *database->c4db())
,_c4db(database->c4db())
,_col(c4col)
,_db(database)
{
_sentry = [this](C4Collection* c4col) {
if (!_isValid()) {
Expand All @@ -268,28 +253,24 @@ private:
return _isValid();
}

CBLDatabase* database() const {
auto lock = useLocked();
return _db;
}

/** Invalidate the database pointer */
void close() noexcept {
LOCK_GUARD lock(getMutex());
_db = nullptr;
_isClosed = true;
}

private:
bool _isValid() const noexcept { return _db && _col->isValid(); }
bool _isValid() const noexcept { return !_isClosed && _col->isValid(); }

CBLDatabase::SharedC4DatabaseAccessLock _c4db; // For retaining the shared lock
CBLDatabase* _cbl_nullable _db;
C4Collection* _col;
bool _isClosed {false};
};

#pragma mark - VARIABLES :

C4CollectionAccessLock _c4col; // Shared lock with _c4db
CBLDatabase* _db;

alloc_slice _name;
Retained<CBLScope> _scope;
Expand Down
9 changes: 6 additions & 3 deletions src/CBLDocument.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ using namespace cbl_internal;
CBLDocument::CBLDocument(slice docID, CBLCollection *collection, C4Document *c4doc, bool isMutable)
:_docID(docID)
,_collection(collection)
,_database(collection ? collection->database() : nullptr)
,_c4doc(c4doc)
,_mutable(isMutable)
{
Expand All @@ -68,8 +69,7 @@ CBLDocument::~CBLDocument() {


CBLDatabase* _cbl_nullable CBLDocument::database() const {
// Could throw kC4ErrorNotOpen if the collection is deleted, or database is released.
return _collection ? _collection->database() : nullptr;
return _database;
}


Expand Down Expand Up @@ -113,6 +113,7 @@ bool CBLDocument::save(CBLCollection* collection, const SaveOptions &opt) {

alloc_slice body;
C4RevisionFlags revFlags;

if (!opt.deleting) {
body = encodeBody(collection->database(), c4db, false, revFlags);
} else {
Expand Down Expand Up @@ -143,6 +144,8 @@ bool CBLDocument::save(CBLCollection* collection, const SaveOptions &opt) {
// Success:
t.commit();
_collection = collection;
_database = collection->database();

// HACK: Replace the inner reference of the c4doc with the one from newDoc.
c4doc.get() = std::move(newDoc);
_revID = c4doc->selectedRev().revID;
Expand Down Expand Up @@ -234,7 +237,7 @@ bool CBLDocument::resolveConflict(Resolution resolution, const CBLDocument * _cb
// is true, the remote revision will be kept as is and the losing branch will be pruned.
if (resolution != Resolution::useRemote) {
if (resolveDoc) {
mergeBody = resolveDoc->encodeBody(_collection->database(), c4db, true, mergeFlags);
mergeBody = resolveDoc->encodeBody(_database, c4db, true, mergeFlags);
} else {
mergeBody = alloc_slice(size_t(0));
mergeFlags = kRevDeleted;
Expand Down
1 change: 1 addition & 0 deletions src/CBLDocument_Internal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ private:
#endif

Retained<CBLCollection> _collection; // Collection (null for new doc)
Retained<CBLDatabase> _database; // Database (null for new doc)
litecore::access_lock<Retained<C4Document>> _c4doc; // LiteCore doc (null for new doc)
alloc_slice const _docID; // Document ID (never empty)
mutable alloc_slice _revID; // Revision ID
Expand Down
4 changes: 2 additions & 2 deletions src/CBLPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ CBL_CAPI_BEGIN
void CBLLog_BeginExpectingExceptions() CBLAPI;
void CBLLog_EndExpectingExceptions() CBLAPI;

/** Returns the collection's database, or NULL if the collection is invalid, or the database is released. */
CBLDatabase* _cbl_nullable CBLCollection_Database(const CBLCollection*) CBLAPI;
/** Returns the collection's database pointer which is unretained. This is used by tests. */
CBLDatabase* CBLCollection_Database(const CBLCollection*) CBLAPI;

/** Returns the last sequence number assigned in the database (default collection).
This starts at zero and increments every time a document is saved or deleted. */
Expand Down
Loading

0 comments on commit 74afa74

Please sign in to comment.