diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index 98decdcf46..c3a674a78f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -530,10 +530,6 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, 1); } } - - // If 'FileStore::d_storageEventBuilder' is flushed, flush all relevant - // queues (call 'afterNewMessage' to deliver accumulated data) - d_state_p->storage()->flushQueues(); } void LocalQueue::onPushMessage( diff --git a/src/groups/mqb/mqbi/mqbi_storage.h b/src/groups/mqb/mqbi/mqbi_storage.h index 3c21e77123..f20d161d3a 100644 --- a/src/groups/mqb/mqbi/mqbi_storage.h +++ b/src/groups/mqb/mqbi/mqbi_storage.h @@ -511,10 +511,6 @@ class Storage { /// undefined unless this cluster node is the primary for this partition. virtual void flushStorage() = 0; - /// Flush all associated weak consistency queues. Behaviour is - /// undefined unless this cluster node is the primary for this partition. - virtual void flushQueues() = 0; - /// Return the resource capacity meter associated to this storage. virtual mqbu::CapacityMeter* capacityMeter() = 0; diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 41bb7e962b..3de503c969 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -690,10 +690,6 @@ class DataStore : public mqbi::DispatcherClient { /// undefined unless this cluster node is the primary for this partition. virtual void flushStorage() = 0; - /// Flush all associated weak consistency queues. Behaviour is - /// undefined unless this cluster node is the primary for this partition. - virtual void flushQueues() = 0; - // ACCESSORS /// Return true if this instance is open, false otherwise. diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index 961f23d008..9b5b090199 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -619,11 +619,6 @@ void FileBackedStorage::flushStorage() d_store_p->flushStorage(); } -void FileBackedStorage::flushQueues() -{ - d_store_p->flushQueues(); -} - int FileBackedStorage::gcExpiredMessages( bsls::Types::Uint64* latestMsgTimestampEpoch, bsls::Types::Int64* configuredTtlValue, diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index 223da9655a..d3c59a5852 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -475,10 +475,6 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// undefined unless this cluster node is the primary for this partition. void flushStorage() BSLS_KEYWORD_OVERRIDE; - /// Flush all associated weak consistency queues. Behaviour is - /// undefined unless this cluster node is the primary for this partition. - void flushQueues() BSLS_KEYWORD_OVERRIDE; - /// Attempt to garbage-collect messages for which TTL has expired, and /// return the number of messages garbage-collected. Populate the /// specified `latestMsgTimestampEpoch` with the timestamp, as seconds diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp index d624884a57..2b3421a086 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp @@ -410,8 +410,6 @@ class MockDataStore : public mqbs::DataStore { void flushStorage() BSLS_KEYWORD_OVERRIDE {} - void flushQueues() BSLS_KEYWORD_OVERRIDE {} - bool isOpen() const BSLS_KEYWORD_OVERRIDE { return true; } const mqbs::DataStoreConfig& config() const BSLS_KEYWORD_OVERRIDE diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 17d7f5aa85..dcaae2fedf 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -5118,7 +5118,7 @@ void FileStore::flushIfNeeded(bool immediateFlush) d_storageEventBuilder.messageCount() >= d_nagglePacketCount) { // Should notify weak consistency queues after replicated batch flushStorage(); - flushQueues(); + notifyQueuesOnReplicatedBatch(); } } @@ -6922,7 +6922,7 @@ void FileStore::flushStorage() } } -void FileStore::flushQueues() +void FileStore::notifyQueuesOnReplicatedBatch() { if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY( d_storageEventBuilder.messageCount() == 0)) { @@ -6935,7 +6935,8 @@ void FileStore::flushQueues() it++) { // TODO: possible to store ReplicatedStorage directly and have one // less lookup, but it requires to handle the case when the - // storage was removed before `flushQueues` call. + // storage was removed before `notifyQueuesOnReplicatedBatch` + // call. StoragesMap::iterator storageIt = d_storages.find(*it); if (storageIt != d_storages.end()) { ReplicatedStorage* rs = storageIt->second; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index aacb67d551..9db6a44def 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -876,12 +876,10 @@ class FileStore : public DataStore { /// undefined unless this cluster node is the primary for this partition. void flushStorage() BSLS_KEYWORD_OVERRIDE; - /// Flush all associated weak consistency queues. Behaviour is + /// Flush weak consistency queues that have replicated messages since the + /// last call. This method has no effect if `d_storageEventBuilder` is not + /// empty, and must only be called after `flushStorage`. Behaviour is /// undefined unless this cluster node is the primary for this partition. - void flushQueues() BSLS_KEYWORD_OVERRIDE; - - /// Call `onReplicatedBatch` on all associated queues if the storage - /// builder is empty (just flushed). void notifyQueuesOnReplicatedBatch(); /// Invoke the specified `functor` with each queue associated to the diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index 9f280f3ca9..9cb8cae340 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -428,11 +428,6 @@ void InMemoryStorage::flushStorage() // NOTHING } -void InMemoryStorage::flushQueues() -{ - // NOTHING -} - int InMemoryStorage::gcExpiredMessages( bsls::Types::Uint64* latestMsgTimestampEpoch, bsls::Types::Int64* configuredTtlValue, diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index 018852e682..0666e8921a 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -377,10 +377,6 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// undefined unless this cluster node is the primary for this partition. void flushStorage() BSLS_KEYWORD_OVERRIDE; - /// Flush all associated weak consistency queues. Behaviour is - /// undefined unless this cluster node is the primary for this partition. - void flushQueues() BSLS_KEYWORD_OVERRIDE; - /// Return the resource capacity meter associated to this storage. virtual mqbu::CapacityMeter* capacityMeter() BSLS_KEYWORD_OVERRIDE;