diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index 6acb240e0..d1137b0e5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -2033,10 +2033,8 @@ void ClusterStateManager::processLeaderSyncDataQuery( } // Populate queues info - mqbc::ClusterUtil::loadQueuesInfo( - &leaderAdvisory.queues(), - *d_state_p, - d_clusterConfig.clusterAttributes().isCSLModeEnabled()); + mqbc::ClusterUtil::loadQueuesInfo(&leaderAdvisory.queues(), + *d_state_p); } else { // Self is not available. diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index cac44011f..0316e7418 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -198,9 +198,7 @@ void ClusterStateManager::do_applyCSLSelf(const ClusterFSMArgsSp& args) return; // RETURN } - ClusterUtil::loadQueuesInfo(&clusterStateSnapshot.queues(), - tempState, - true); // includeAppIds + ClusterUtil::loadQueuesInfo(&clusterStateSnapshot.queues(), tempState); } else { // Verify that elector term in follower snapshot is less than the @@ -1120,9 +1118,7 @@ int ClusterStateManager::loadClusterStateSnapshot( } mqbc::ClusterUtil::loadPartitionsInfo(&out->partitions(), tempState); - mqbc::ClusterUtil::loadQueuesInfo(&out->queues(), - tempState, - true); // includeAppIds + mqbc::ClusterUtil::loadQueuesInfo(&out->queues(), tempState); return 0; } diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp index e8258298b..24d31a1a5 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp @@ -518,8 +518,7 @@ struct Tester { d_cluster_mp->_state()); mqbc::ClusterUtil::loadQueuesInfo( &response.clusterStateSnapshot().queues(), - d_cluster_mp->_state(), - true); // includeAppIds + d_cluster_mp->_state()); for (TestChannelMapCIter cit = d_cluster_mp->_channels().cbegin(); cit != d_cluster_mp->_channels().cend(); diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 4eddd8e0c..0f09d5f34 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -1107,8 +1107,8 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, return; // RETURN } - bmqu::Printer stateAppIdInfos(&qs->appInfos()); - bmqu::Printer storageAppIdInfos(&appInfos); + bmqu::Printer stateAppInfos(&qs->appInfos()); + bmqu::Printer storageAppInfos(&appInfos); // PartitionId and/or QueueKey and/or AppInfos mismatch. if (!forceUpdate) { @@ -1569,9 +1569,7 @@ void ClusterUtil::sendClusterState( &advisory.sequenceNumber()); advisory.partitions() = partitions; - loadQueuesInfo(&advisory.queues(), - clusterState, - clusterData->cluster().isCSLModeEnabled()); + loadQueuesInfo(&advisory.queues(), clusterState); } else if (sendPartitionPrimaryInfo) { bmqp_ctrlmsg::PartitionPrimaryAdvisory& advisory = @@ -1591,9 +1589,7 @@ void ClusterUtil::sendClusterState( clusterData->electorInfo().nextLeaderMessageSequence( &advisory.sequenceNumber()); - loadQueuesInfo(&advisory.queues(), - clusterState, - clusterData->cluster().isCSLModeEnabled()); + loadQueuesInfo(&advisory.queues(), clusterState); } if (!clusterData->cluster().isCSLModeEnabled()) { @@ -2200,8 +2196,7 @@ void ClusterUtil::loadPartitionsInfo( } void ClusterUtil::loadQueuesInfo(bsl::vector* out, - const ClusterState& state, - bool includeAppIds) + const ClusterState& state) { // PRECONDITIONS BSLS_ASSERT_SAFE(out); @@ -2222,17 +2217,14 @@ void ClusterUtil::loadQueuesInfo(bsl::vector* out, BSLS_ASSERT_SAFE(!qCit->second->key().isNull()); qCit->second->key().loadBinary(&queueInfo.key()); - if (includeAppIds) { - for (AppInfosCIter appIdCit = - qCit->second->appInfos().cbegin(); - appIdCit != qCit->second->appInfos().cend(); - ++appIdCit) { - bmqp_ctrlmsg::AppIdInfo appIdInfo; - appIdInfo.appId() = appIdCit->first; - appIdCit->second.loadBinary(&appIdInfo.appKey()); + for (AppInfosCIter appIdCit = qCit->second->appInfos().cbegin(); + appIdCit != qCit->second->appInfos().cend(); + ++appIdCit) { + bmqp_ctrlmsg::AppIdInfo appIdInfo; + appIdInfo.appId() = appIdCit->first; + appIdCit->second.loadBinary(&appIdInfo.appKey()); - queueInfo.appIds().push_back(appIdInfo); - } + queueInfo.appIds().push_back(appIdInfo); } out->push_back(queueInfo); diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.h b/src/groups/mqb/mqbc/mqbc_clusterutil.h index 5ba3ccb4b..970fdb3e0 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.h +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.h @@ -392,8 +392,7 @@ struct ClusterUtil { /// `state`. If the specified `includeAppIds` is true, then the appId /// info for the queues will be loaded as well. static void loadQueuesInfo(bsl::vector* out, - const ClusterState& state, - bool includeAppIds); + const ClusterState& state); /// Load into the specified `out` the list of peer nodes using the /// specified `clusterData`. diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp index 8a865520d..9c99911e0 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp @@ -342,7 +342,7 @@ class MockDataStore : public mqbs::DataStore { int writeQueueCreationRecord(mqbs::DataStoreRecordHandle*, const bmqt::Uri&, const mqbu::StorageKey&, - const AppIdKeyPairs&, + const AppInfos&, bsls::Types::Uint64, bool) BSLS_KEYWORD_OVERRIDE { diff --git a/src/integration-tests/test_restart.py b/src/integration-tests/test_restart.py index 064f0c076..102805e68 100644 --- a/src/integration-tests/test_restart.py +++ b/src/integration-tests/test_restart.py @@ -141,6 +141,18 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster): ensureMessageAtStorageLayer(cluster) + # Consumer for fanout queue + consumer_foo = next(proxies).create_client("consumer_foo") + consumer_foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True) + consumer_foo.wait_push_event() + assert wait_until( + lambda: len(consumer_foo.list(tc.URI_FANOUT_FOO, block=True)) == 1, 2 + ) + + # Save one confirm to the storage + consumer_foo.confirm(tc.URI_FANOUT_FOO, "+1", succeed=True) + consumer_foo.close(tc.URI_FANOUT_FOO, succeed=True) + cluster.stop_nodes() # Reconfigure the cluster from non-FSM to FSM mode @@ -166,10 +178,17 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster): consumer.wait_push_event() assert wait_until(lambda: len(consumer.list(tc.URI_PRIORITY, block=True)) == 2, 2) - # Consumer for fanout queue - consumer_fanout = next(proxies).create_client("consumer_fanout") - consumer_fanout.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True) - consumer_fanout.wait_push_event() + # Consumers for fanout queue + consumer_bar = next(proxies).create_client("consumer_bar") + consumer_bar.open(tc.URI_FANOUT_BAR, flags=["read"], succeed=True) + consumer_bar.wait_push_event() + assert wait_until( + lambda: len(consumer_bar.list(tc.URI_FANOUT_BAR, block=True)) == 2, 2 + ) + + # make sure the previously saved confirm is not lost + consumer_foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True) + consumer_foo.wait_push_event() assert wait_until( - lambda: len(consumer_fanout.list(tc.URI_FANOUT_FOO, block=True)) == 2, 2 + lambda: len(consumer_foo.list(tc.URI_FANOUT_FOO, block=True)) == 1, 2 )