Skip to content

Commit

Permalink
Fix: handle replication notifications on domain reconfigure
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Nov 25, 2024
1 parent b31d995 commit b9f5335
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
domainCfg.storage().queueLimits().bytes());

if (isReconfigure) {
if (domainCfg.consistency().isStrongValue()) {
/// We register notifications for weak consistency queues to
/// deliver messages after any replication. For strong consistency
/// queues, we start to PUSH only on receipt. We have to check and
/// possibly remove from notifications the queues that changed
/// their consistency level from weak to strong.
d_state_p->storageManager()->cancelReplicationNotification(
d_state_p->partitionId(),
d_state_p->key());
}
if (domainCfg.mode().isFanoutValue()) {
d_state_p->stats().updateDomainAppIds(
domainCfg.mode().fanout().appIDs());
Expand Down
15 changes: 15 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2130,6 +2130,21 @@ void StorageManager::gcUnrecognizedDomainQueues()
d_unrecognizedDomains);
}

void StorageManager::cancelReplicationNotification(
int partitionId,
const mqbu::StorageKey& queueKey)
{
// executed by *QUEUE* dispatcher thread associated with partitionId

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));

mqbs::FileStore* fs = d_fileStores[partitionId].get();
BSLS_ASSERT_SAFE(fs->inDispatcherThread());
fs->cancelReplicationNotification(queueKey);
}

// ACCESSORS
bool StorageManager::isStorageEmpty(const bmqt::Uri& uri,
int partitionId) const
Expand Down
7 changes: 7 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,13 @@ class StorageManager : public mqbi::StorageManager {
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
void cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey)
BSLS_KEYWORD_OVERRIDE;

// ACCESSORS

/// Return the processor handle in charge of the specified
Expand Down
15 changes: 15 additions & 0 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4474,6 +4474,21 @@ void StorageManager::gcUnrecognizedDomainQueues()
d_unrecognizedDomains);
}

void StorageManager::cancelReplicationNotification(
int partitionId,
const mqbu::StorageKey& queueKey)
{
// executed by *QUEUE* dispatcher thread associated with partitionId

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));

mqbs::FileStore* fs = d_fileStores[partitionId].get();
BSLS_ASSERT_SAFE(fs->inDispatcherThread());
fs->cancelReplicationNotification(queueKey);
}

mqbs::FileStore& StorageManager::fileStore(int partitionId)
{
// PRECONDITIONS
Expand Down
7 changes: 7 additions & 0 deletions src/groups/mqb/mqbc/mqbc_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,13 @@ class StorageManager
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
void cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey)
BSLS_KEYWORD_OVERRIDE;

/// Return partition corresponding to the specified `partitionId`. The
/// behavior is undefined if `partitionId` does not represent a valid
/// partition id. Note, this modifiable reference to partition is only
Expand Down
7 changes: 7 additions & 0 deletions src/groups/mqb/mqbi/mqbi_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,13 @@ class StorageManager {
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() = 0;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
virtual void
cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey) = 0;

// ACCESSORS

/// Return the processor handle in charge of the specified
Expand Down
7 changes: 7 additions & 0 deletions src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ void StorageManager::gcUnrecognizedDomainQueues()
// NOTHING
}

void StorageManager::cancelReplicationNotification(
BSLS_ANNOTATION_UNUSED int partitionId,
BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey)
{
// NOTHING
}

// ACCESSORS
mqbi::Dispatcher::ProcessorHandle StorageManager::processorForPartition(
BSLS_ANNOTATION_UNUSED int partitionId) const
Expand Down
7 changes: 7 additions & 0 deletions src/groups/mqb/mqbmock/mqbmock_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ class StorageManager : public mqbi::StorageManager {
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
void cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey)
BSLS_KEYWORD_OVERRIDE;

// ACCESSORS

/// Return the processor handle in charge of the specified
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6962,6 +6962,15 @@ void FileStore::notifyQueuesOnReplicatedBatch()
}
}

void FileStore::cancelReplicationNotification(const mqbu::StorageKey& queueKey)
{
bsl::unordered_set<mqbu::StorageKey>::iterator it =
d_replicationNotifications.find(queueKey);
if (it != d_replicationNotifications.end()) {
d_replicationNotifications.erase(it);
}
}

bool FileStore::gcExpiredMessages(const bdlt::Datetime& currentTimeUtc)
{
if (!d_isOpen) {
Expand Down
6 changes: 6 additions & 0 deletions src/groups/mqb/mqbs/mqbs_filestore.h
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,12 @@ class FileStore : public DataStore {
/// builder is empty (just flushed).
void notifyQueuesOnReplicatedBatch();

/// When reconfigure from weak consistency to strong consistency:
/// Cancel replication notification (if any) for the specified `queueKey`.
/// No effect if `queueKey` is not found in this storage or if there are no
/// current registered notifications for this queue.
void cancelReplicationNotification(const mqbu::StorageKey& queueKey);

/// Invoke the specified `functor` with each queue associated to the
/// partition represented by this FileStore if the partition was
/// successfully opened. The behavior is undefined unless invoked from
Expand Down

0 comments on commit b9f5335

Please sign in to comment.