Skip to content

Commit

Permalink
Remove flushQueues
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Nov 28, 2024
1 parent f7be85e commit 7a4de87
Show file tree
Hide file tree
Showing 10 changed files with 7 additions and 40 deletions.
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,6 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
1);
}
}

// If 'FileStore::d_storageEventBuilder' is flushed, flush all relevant
// queues (call 'afterNewMessage' to deliver accumulated data)
d_state_p->storage()->flushQueues();
}

void LocalQueue::onPushMessage(
Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbi/mqbi_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,6 @@ class Storage {
/// undefined unless this cluster node is the primary for this partition.
virtual void flushStorage() = 0;

/// Flush all associated weak consistency queues. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
virtual void flushQueues() = 0;

/// Return the resource capacity meter associated to this storage.
virtual mqbu::CapacityMeter* capacityMeter() = 0;

Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbs/mqbs_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,6 @@ class DataStore : public mqbi::DispatcherClient {
/// undefined unless this cluster node is the primary for this partition.
virtual void flushStorage() = 0;

/// Flush all associated weak consistency queues. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
virtual void flushQueues() = 0;

// ACCESSORS

/// Return true if this instance is open, false otherwise.
Expand Down
5 changes: 0 additions & 5 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,6 @@ void FileBackedStorage::flushStorage()
d_store_p->flushStorage();
}

void FileBackedStorage::flushQueues()
{
d_store_p->flushQueues();
}

int FileBackedStorage::gcExpiredMessages(
bsls::Types::Uint64* latestMsgTimestampEpoch,
bsls::Types::Int64* configuredTtlValue,
Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,6 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
/// undefined unless this cluster node is the primary for this partition.
void flushStorage() BSLS_KEYWORD_OVERRIDE;

/// Flush all associated weak consistency queues. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
void flushQueues() BSLS_KEYWORD_OVERRIDE;

/// Attempt to garbage-collect messages for which TTL has expired, and
/// return the number of messages garbage-collected. Populate the
/// specified `latestMsgTimestampEpoch` with the timestamp, as seconds
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,6 @@ class MockDataStore : public mqbs::DataStore {

void flushStorage() BSLS_KEYWORD_OVERRIDE {}

void flushQueues() BSLS_KEYWORD_OVERRIDE {}

bool isOpen() const BSLS_KEYWORD_OVERRIDE { return true; }

const mqbs::DataStoreConfig& config() const BSLS_KEYWORD_OVERRIDE
Expand Down
7 changes: 4 additions & 3 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5118,7 +5118,7 @@ void FileStore::flushIfNeeded(bool immediateFlush)
d_storageEventBuilder.messageCount() >= d_nagglePacketCount) {
// Should notify weak consistency queues after replicated batch
flushStorage();
flushQueues();
notifyQueuesOnReplicatedBatch();
}
}

Expand Down Expand Up @@ -6922,7 +6922,7 @@ void FileStore::flushStorage()
}
}

void FileStore::flushQueues()
void FileStore::notifyQueuesOnReplicatedBatch()
{
if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(
d_storageEventBuilder.messageCount() == 0)) {
Expand All @@ -6935,7 +6935,8 @@ void FileStore::flushQueues()
it++) {
// TODO: possible to store ReplicatedStorage directly and have one
// less lookup, but it requires to handle the case when the
// storage was removed before `flushQueues` call.
// storage was removed before `notifyQueuesOnReplicatedBatch`
// call.
StoragesMap::iterator storageIt = d_storages.find(*it);
if (storageIt != d_storages.end()) {
ReplicatedStorage* rs = storageIt->second;
Expand Down
8 changes: 3 additions & 5 deletions src/groups/mqb/mqbs/mqbs_filestore.h
Original file line number Diff line number Diff line change
Expand Up @@ -876,12 +876,10 @@ class FileStore : public DataStore {
/// undefined unless this cluster node is the primary for this partition.
void flushStorage() BSLS_KEYWORD_OVERRIDE;

/// Flush all associated weak consistency queues. Behaviour is
/// Flush weak consistency queues that have replicated messages since the
/// last call. This method has no effect if `d_storageEventBuilder` is not
/// empty, and must only be called after `flushStorage`. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
void flushQueues() BSLS_KEYWORD_OVERRIDE;

/// Call `onReplicatedBatch` on all associated queues if the storage
/// builder is empty (just flushed).
void notifyQueuesOnReplicatedBatch();

/// Invoke the specified `functor` with each queue associated to the
Expand Down
5 changes: 0 additions & 5 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,6 @@ void InMemoryStorage::flushStorage()
// NOTHING
}

void InMemoryStorage::flushQueues()
{
// NOTHING
}

int InMemoryStorage::gcExpiredMessages(
bsls::Types::Uint64* latestMsgTimestampEpoch,
bsls::Types::Int64* configuredTtlValue,
Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,6 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
/// undefined unless this cluster node is the primary for this partition.
void flushStorage() BSLS_KEYWORD_OVERRIDE;

/// Flush all associated weak consistency queues. Behaviour is
/// undefined unless this cluster node is the primary for this partition.
void flushQueues() BSLS_KEYWORD_OVERRIDE;

/// Return the resource capacity meter associated to this storage.
virtual mqbu::CapacityMeter* capacityMeter() BSLS_KEYWORD_OVERRIDE;

Expand Down

0 comments on commit 7a4de87

Please sign in to comment.