Skip to content

Commit

Permalink
Remove cancelReplicationNotification
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Nov 26, 2024
1 parent 0124dd3 commit 1949b2b
Show file tree
Hide file tree
Showing 10 changed files with 0 additions and 90 deletions.
10 changes: 0 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,6 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
domainCfg.storage().queueLimits().bytes());

if (isReconfigure) {
if (domainCfg.consistency().isStrongValue()) {
/// We register notifications for weak consistency queues to
/// deliver messages after any replication. For strong consistency
/// queues, we start to PUSH only on receipt. We have to check and
/// possibly remove from notifications the queues that changed
/// their consistency level from weak to strong.
d_state_p->storageManager()->cancelReplicationNotification(
d_state_p->partitionId(),
d_state_p->key());
}
if (domainCfg.mode().isFanoutValue()) {
d_state_p->stats().updateDomainAppIds(
domainCfg.mode().fanout().appIDs());
Expand Down
15 changes: 0 additions & 15 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2130,21 +2130,6 @@ void StorageManager::gcUnrecognizedDomainQueues()
d_unrecognizedDomains);
}

void StorageManager::cancelReplicationNotification(
int partitionId,
const mqbu::StorageKey& queueKey)
{
// executed by *QUEUE* dispatcher thread associated with partitionId

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));

mqbs::FileStore* fs = d_fileStores[partitionId].get();
BSLS_ASSERT_SAFE(fs->inDispatcherThread());
fs->cancelReplicationNotification(queueKey);
}

// ACCESSORS
bool StorageManager::isStorageEmpty(const bmqt::Uri& uri,
int partitionId) const
Expand Down
7 changes: 0 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -706,13 +706,6 @@ class StorageManager : public mqbi::StorageManager {
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
void cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey)
BSLS_KEYWORD_OVERRIDE;

// ACCESSORS

/// Return the processor handle in charge of the specified
Expand Down
15 changes: 0 additions & 15 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4474,21 +4474,6 @@ void StorageManager::gcUnrecognizedDomainQueues()
d_unrecognizedDomains);
}

void StorageManager::cancelReplicationNotification(
int partitionId,
const mqbu::StorageKey& queueKey)
{
// executed by *QUEUE* dispatcher thread associated with partitionId

// PRECONDITIONS
BSLS_ASSERT_SAFE(0 <= partitionId &&
partitionId < static_cast<int>(d_fileStores.size()));

mqbs::FileStore* fs = d_fileStores[partitionId].get();
BSLS_ASSERT_SAFE(fs->inDispatcherThread());
fs->cancelReplicationNotification(queueKey);
}

mqbs::FileStore& StorageManager::fileStore(int partitionId)
{
// PRECONDITIONS
Expand Down
7 changes: 0 additions & 7 deletions src/groups/mqb/mqbc/mqbc_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1046,13 +1046,6 @@ class StorageManager
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
void cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey)
BSLS_KEYWORD_OVERRIDE;

/// Return partition corresponding to the specified `partitionId`. The
/// behavior is undefined if `partitionId` does not represent a valid
/// partition id. Note, this modifiable reference to partition is only
Expand Down
7 changes: 0 additions & 7 deletions src/groups/mqb/mqbi/mqbi_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,6 @@ class StorageManager {
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() = 0;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
virtual void
cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey) = 0;

// ACCESSORS

/// Return the processor handle in charge of the specified
Expand Down
7 changes: 0 additions & 7 deletions src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,6 @@ void StorageManager::gcUnrecognizedDomainQueues()
// NOTHING
}

void StorageManager::cancelReplicationNotification(
BSLS_ANNOTATION_UNUSED int partitionId,
BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey)
{
// NOTHING
}

// ACCESSORS
mqbi::Dispatcher::ProcessorHandle StorageManager::processorForPartition(
BSLS_ANNOTATION_UNUSED int partitionId) const
Expand Down
7 changes: 0 additions & 7 deletions src/groups/mqb/mqbmock/mqbmock_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,6 @@ class StorageManager : public mqbi::StorageManager {
/// GC the queues from unrecognized domains, if any.
virtual void gcUnrecognizedDomainQueues() BSLS_KEYWORD_OVERRIDE;

/// Cancel replication notification (if any) for the specified `queueKey`.
/// Thread: executed by QUEUE dispatcher thread associated with the
/// specified `partitionId`.
void cancelReplicationNotification(int partitionId,
const mqbu::StorageKey& queueKey)
BSLS_KEYWORD_OVERRIDE;

// ACCESSORS

/// Return the processor handle in charge of the specified
Expand Down
9 changes: 0 additions & 9 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6948,15 +6948,6 @@ void FileStore::flushQueues()
}
}

void FileStore::cancelReplicationNotification(const mqbu::StorageKey& queueKey)
{
bsl::unordered_set<mqbu::StorageKey>::iterator it =
d_replicationNotifications.find(queueKey);
if (it != d_replicationNotifications.end()) {
d_replicationNotifications.erase(it);
}
}

bool FileStore::gcExpiredMessages(const bdlt::Datetime& currentTimeUtc)
{
if (!d_isOpen) {
Expand Down
6 changes: 0 additions & 6 deletions src/groups/mqb/mqbs/mqbs_filestore.h
Original file line number Diff line number Diff line change
Expand Up @@ -884,12 +884,6 @@ class FileStore : public DataStore {
/// builder is empty (just flushed).
void notifyQueuesOnReplicatedBatch();

/// When reconfigure from weak consistency to strong consistency:
/// Cancel replication notification (if any) for the specified `queueKey`.
/// No effect if `queueKey` is not found in this storage or if there are no
/// current registered notifications for this queue.
void cancelReplicationNotification(const mqbu::StorageKey& queueKey);

/// Invoke the specified `functor` with each queue associated to the
/// partition represented by this FileStore if the partition was
/// successfully opened. The behavior is undefined unless invoked from
Expand Down

0 comments on commit 1949b2b

Please sign in to comment.