Skip to content

Commit

Permalink
more fixes and an IT
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Oct 25, 2024
1 parent 7cea389 commit d535271
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 37 deletions.
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2033,10 +2033,8 @@ void ClusterStateManager::processLeaderSyncDataQuery(
}

// Populate queues info
mqbc::ClusterUtil::loadQueuesInfo(
&leaderAdvisory.queues(),
*d_state_p,
d_clusterConfig.clusterAttributes().isCSLModeEnabled());
mqbc::ClusterUtil::loadQueuesInfo(&leaderAdvisory.queues(),
*d_state_p);
}
else {
// Self is not available.
Expand Down
8 changes: 2 additions & 6 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ void ClusterStateManager::do_applyCSLSelf(const ClusterFSMArgsSp& args)
return; // RETURN
}

ClusterUtil::loadQueuesInfo(&clusterStateSnapshot.queues(),
tempState,
true); // includeAppIds
ClusterUtil::loadQueuesInfo(&clusterStateSnapshot.queues(), tempState);
}
else {
// Verify that elector term in follower snapshot is less than the
Expand Down Expand Up @@ -1120,9 +1118,7 @@ int ClusterStateManager::loadClusterStateSnapshot(
}

mqbc::ClusterUtil::loadPartitionsInfo(&out->partitions(), tempState);
mqbc::ClusterUtil::loadQueuesInfo(&out->queues(),
tempState,
true); // includeAppIds
mqbc::ClusterUtil::loadQueuesInfo(&out->queues(), tempState);

return 0;
}
Expand Down
32 changes: 12 additions & 20 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1107,8 +1107,8 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState,
return; // RETURN
}

bmqu::Printer<AppInfos> stateAppIdInfos(&qs->appInfos());
bmqu::Printer<AppInfos> storageAppIdInfos(&appInfos);
bmqu::Printer<AppInfos> stateAppInfos(&qs->appInfos());
bmqu::Printer<AppInfos> storageAppInfos(&appInfos);

// PartitionId and/or QueueKey and/or AppInfos mismatch.
if (!forceUpdate) {
Expand Down Expand Up @@ -1569,9 +1569,7 @@ void ClusterUtil::sendClusterState(
&advisory.sequenceNumber());

advisory.partitions() = partitions;
loadQueuesInfo(&advisory.queues(),
clusterState,
clusterData->cluster().isCSLModeEnabled());
loadQueuesInfo(&advisory.queues(), clusterState);
}
else if (sendPartitionPrimaryInfo) {
bmqp_ctrlmsg::PartitionPrimaryAdvisory& advisory =
Expand All @@ -1591,9 +1589,7 @@ void ClusterUtil::sendClusterState(
clusterData->electorInfo().nextLeaderMessageSequence(
&advisory.sequenceNumber());

loadQueuesInfo(&advisory.queues(),
clusterState,
clusterData->cluster().isCSLModeEnabled());
loadQueuesInfo(&advisory.queues(), clusterState);
}

if (!clusterData->cluster().isCSLModeEnabled()) {
Expand Down Expand Up @@ -2200,8 +2196,7 @@ void ClusterUtil::loadPartitionsInfo(
}

void ClusterUtil::loadQueuesInfo(bsl::vector<bmqp_ctrlmsg::QueueInfo>* out,
const ClusterState& state,
bool includeAppIds)
const ClusterState& state)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(out);
Expand All @@ -2222,17 +2217,14 @@ void ClusterUtil::loadQueuesInfo(bsl::vector<bmqp_ctrlmsg::QueueInfo>* out,
BSLS_ASSERT_SAFE(!qCit->second->key().isNull());
qCit->second->key().loadBinary(&queueInfo.key());

if (includeAppIds) {
for (AppInfosCIter appIdCit =
qCit->second->appInfos().cbegin();
appIdCit != qCit->second->appInfos().cend();
++appIdCit) {
bmqp_ctrlmsg::AppIdInfo appIdInfo;
appIdInfo.appId() = appIdCit->first;
appIdCit->second.loadBinary(&appIdInfo.appKey());
for (AppInfosCIter appIdCit = qCit->second->appInfos().cbegin();
appIdCit != qCit->second->appInfos().cend();
++appIdCit) {
bmqp_ctrlmsg::AppIdInfo appIdInfo;
appIdInfo.appId() = appIdCit->first;
appIdCit->second.loadBinary(&appIdInfo.appKey());

queueInfo.appIds().push_back(appIdInfo);
}
queueInfo.appIds().push_back(appIdInfo);
}

out->push_back(queueInfo);
Expand Down
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbc/mqbc_clusterutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,7 @@ struct ClusterUtil {
/// `state`. If the specified `includeAppIds` is true, then the appId
/// info for the queues will be loaded as well.
static void loadQueuesInfo(bsl::vector<bmqp_ctrlmsg::QueueInfo>* out,
const ClusterState& state,
bool includeAppIds);
const ClusterState& state);

/// Load into the specified `out` the list of peer nodes using the
/// specified `clusterData`.
Expand Down
29 changes: 24 additions & 5 deletions src/integration-tests/test_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster):

ensureMessageAtStorageLayer(cluster)

# Consumer for fanout queue
consumer_foo = next(proxies).create_client("consumer_foo")
consumer_foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
consumer_foo.wait_push_event()
assert wait_until(
lambda: len(consumer_foo.list(tc.URI_FANOUT_FOO, block=True)) == 1, 2
)

# Save one confirm to the storage
consumer_foo.confirm(tc.URI_FANOUT_FOO, "+1", succeed=True)
consumer_foo.close(tc.URI_FANOUT_FOO, succeed=True)

cluster.stop_nodes()

# Reconfigure the cluster from non-FSM to FSM mode
Expand All @@ -166,10 +178,17 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster):
consumer.wait_push_event()
assert wait_until(lambda: len(consumer.list(tc.URI_PRIORITY, block=True)) == 2, 2)

# Consumer for fanout queue
consumer_fanout = next(proxies).create_client("consumer_fanout")
consumer_fanout.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
consumer_fanout.wait_push_event()
# Consumers for fanout queue
consumer_bar = next(proxies).create_client("consumer_bar")
consumer_bar.open(tc.URI_FANOUT_BAR, flags=["read"], succeed=True)
consumer_bar.wait_push_event()
assert wait_until(
lambda: len(consumer_fanout.list(tc.URI_FANOUT_FOO, block=True)) == 2, 2
lambda: len(consumer_bar.list(tc.URI_FANOUT_BAR, block=True)) == 2, 2
)

# make sure the previously saved confirm is not lost
consumer_foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
consumer_foo.wait_push_event()
assert wait_until(
lambda: len(consumer_foo.list(tc.URI_FANOUT_FOO, block=True)) == 1, 2
)

0 comments on commit d535271

Please sign in to comment.