diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index a004855d0..f1f5d8d91 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -2209,27 +2209,16 @@ void Cluster::onRecoveryStatusDispatched( const bmqt::Uri uri(itMp->uri().canonical()); BSLS_ASSERT_SAFE(itMp->storage()->partitionId() == static_cast(pid)); - if (isCSLModeEnabled()) { - AppIdKeyPairs appIdKeyPairs; - itMp->storage()->loadVirtualStorageDetails(&appIdKeyPairs); - AppIdInfos appIdInfos(appIdKeyPairs.cbegin(), - appIdKeyPairs.cend()); - - d_clusterOrchestrator.registerQueueInfo( - uri, - pid, - itMp->storage()->queueKey(), - appIdInfos, - false); // Force-update? - } - else { - d_clusterOrchestrator.registerQueueInfo( - uri, - pid, - itMp->storage()->queueKey(), - AppIdInfos(), - false); // Force-update? - } + + AppInfos appIdInfos; + itMp->storage()->loadVirtualStorageDetails(&appIdInfos); + + d_clusterOrchestrator.registerQueueInfo( + uri, + pid, + itMp->storage()->queueKey(), + appIdInfos, + false); // Force-update? ++(*itMp); } @@ -2844,18 +2833,22 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain, } // Compute list of added and removed App IDs. - bsl::vector oldCfgAppIds(oldDefn.mode().fanout().appIDs(), - d_allocator_p); - bsl::vector newCfgAppIds(newDefn.mode().fanout().appIDs(), - d_allocator_p); - - bsl::vector addedIds, removedIds; + bsl::unordered_set oldCfgAppIds( + oldDefn.mode().fanout().appIDs().cbegin(), + oldDefn.mode().fanout().appIDs().cend(), + d_allocator_p); + bsl::unordered_set newCfgAppIds( + newDefn.mode().fanout().appIDs().cbegin(), + newDefn.mode().fanout().appIDs().cend(), + d_allocator_p); + + bsl::unordered_set addedIds, removedIds; mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds, &removedIds, oldCfgAppIds, newCfgAppIds); - bsl::vector::const_iterator it = addedIds.begin(); + bsl::unordered_set::const_iterator it = addedIds.cbegin(); for (; it != addedIds.cend(); ++it) { dispatcher()->execute( bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppId, diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index de9c8f815..13fe60889 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -137,11 +137,9 @@ class Cluster : public mqbi::Cluster, private: // PRIVATE TYPES - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; - typedef mqbc::ClusterStatePartitionInfo ClusterStatePartitionInfo; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; typedef mqbc::ClusterMembership::ClusterNodeSessionSp ClusterNodeSessionSp; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index d368d1027..388544711 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -286,7 +286,7 @@ void ClusterOrchestrator::processBufferedQueueAdvisories() void ClusterOrchestrator::registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) { // executed by the *DISPATCHER* thread diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h index 9b9fe35e6..9d090a7f9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h @@ -109,7 +109,7 @@ class ClusterOrchestrator { typedef bdlmt::EventScheduler::RecurringEventHandle RecurringEventHandle; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; private: // DATA @@ -515,7 +515,7 @@ class ClusterOrchestrator { void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate); /// Executed by any thread. diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index ff24e36a0..8822876f4 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -139,8 +139,8 @@ void createQueueUriKey(bmqt::Uri* out, } void afterAppIdRegisteredDispatched( - mqbi::Queue* queue, - const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo) + mqbi::Queue* queue, + const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo) { // executed by the *QUEUE DISPATCHER* thread @@ -148,12 +148,12 @@ void afterAppIdRegisteredDispatched( BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); queue->queueEngine()->afterAppIdRegistered( - mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second)); + mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second)); } void afterAppIdUnregisteredDispatched( - mqbi::Queue* queue, - const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo) + mqbi::Queue* queue, + const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo) { // executed by the *QUEUE DISPATCHER* thread @@ -161,7 +161,7 @@ void afterAppIdUnregisteredDispatched( BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); queue->queueEngine()->afterAppIdUnregistered( - mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second)); + mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second)); } void handleHolderDummy(const bsl::shared_ptr& handle) @@ -2152,27 +2152,15 @@ bsl::shared_ptr ClusterQueueHelper::createQueueFactory( // queue but the queue is never opened, it will not be registered with // the StorageMgr. This is ok. - if (d_cluster_p->isCSLModeEnabled()) { - const AppIdInfos& appIdInfos = - context.d_queueContext_p->d_stateQInfo_sp->appIdInfos(); - const mqbi::Storage::AppIdKeyPairs appIdKeyPairs( - appIdInfos.cbegin(), - appIdInfos.cend()); - d_storageManager_p->registerQueue( - context.d_queueContext_p->uri(), - context.d_queueContext_p->key(), - context.d_queueContext_p->partitionId(), - appIdKeyPairs, - context.d_domain_p); - } - else { - d_storageManager_p->registerQueue( - context.d_queueContext_p->uri(), - context.d_queueContext_p->key(), - context.d_queueContext_p->partitionId(), - mqbi::Storage::AppIdKeyPairs(), - context.d_domain_p); - } + // Use keys in the CSL instead of generating new ones to keep CSL and + // non-CSL consistent. + + d_storageManager_p->registerQueue( + context.d_queueContext_p->uri(), + context.d_queueContext_p->key(), + context.d_queueContext_p->partitionId(), + context.d_queueContext_p->d_stateQInfo_sp->appInfos(), + context.d_domain_p); // Queue must have been registered with storage manager before // registering it with the domain, otherwise Queue.configure() will @@ -3698,28 +3686,12 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId) // node creates a local queue instance (see // 'createQueueFactory'). - if (d_cluster_p->isCSLModeEnabled()) { - const AppIdInfos& appIdInfos = - queueContext->d_stateQInfo_sp->appIdInfos(); - const mqbi::Storage::AppIdKeyPairs appIdKeyPairs( - appIdInfos.cbegin(), - appIdInfos.cend()); - - d_storageManager_p->registerQueue( - queueContext->uri(), - queueContext->key(), - queueContext->partitionId(), - appIdKeyPairs, - qinfo.d_queue_sp->domain()); - } - else { - d_storageManager_p->registerQueue( - queueContext->uri(), - queueContext->key(), - queueContext->partitionId(), - mqbi::Storage::AppIdKeyPairs(), - qinfo.d_queue_sp->domain()); - } + d_storageManager_p->registerQueue( + queueContext->uri(), + queueContext->key(), + queueContext->partitionId(), + queueContext->d_stateQInfo_sp->appInfos(), + qinfo.d_queue_sp->domain()); // Convert the queue from remote to local instance. queueContext->d_liveQInfo.d_queue_sp->convertToLocal(); @@ -4109,6 +4081,7 @@ void ClusterQueueHelper::onQueueAssigned( BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); if (!d_cluster_p->isCSLModeEnabled()) { + // REVISIT return; // RETURN } @@ -4229,14 +4202,11 @@ void ClusterQueueHelper::onQueueAssigned( ->domain(), true); // allowDuplicate - const mqbi::Storage::AppIdKeyPairs appIdKeyPairs( - info.appIdInfos().cbegin(), - info.appIdInfos().cend()); d_storageManager_p->updateQueueReplica( info.partitionId(), info.uri(), info.key(), - appIdKeyPairs, + info.appInfos(), d_clusterState_p->domainStates() .at(info.uri().qualifiedDomain()) ->domain(), @@ -4395,8 +4365,8 @@ void ClusterQueueHelper::onQueueUnassigned( void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { // executed by the cluster *DISPATCHER* thread @@ -4424,19 +4394,21 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, const int partitionId = qiter->second->partitionId(); BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID); - for (AppIdInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); + for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); ++cit) { if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { // Note: In non-CSL mode, the queue creation callback is // invoked at replica nodes when they receive a queue creation // record from the primary in the partition stream. - mqbi::Storage::AppIdKeyPair appIdKeyPair(cit->first, cit->second); - mqbi::Storage::AppIdKeyPairs appIdKeyPairs(1, appIdKeyPair); + + mqbi::Storage::AppInfos one(1, d_allocator_p); + one.emplace(*cit); + d_storageManager_p->updateQueueReplica( partitionId, uri, qiter->second->key(), - appIdKeyPairs, + one, d_clusterState_p->domainStates() .at(uri.qualifiedDomain()) ->domain()); @@ -4450,7 +4422,7 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, } } - for (AppIdInfosCIter cit = removedAppIds.cbegin(); + for (AppInfosCIter cit = removedAppIds.cbegin(); cit != removedAppIds.cend(); ++cit) { if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { @@ -4471,8 +4443,8 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, } } - bmqu::Printer printer1(&addedAppIds); - bmqu::Printer printer2(&removedAppIds); + bmqu::Printer printer1(&addedAppIds); + bmqu::Printer printer2(&removedAppIds); BALL_LOG_INFO << d_cluster_p->description() << ": Updated queue: " << uri << ", addedAppIds: " << printer1 << ", removedAppIds: " << printer2; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 9a60251f2..5d448dca2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -420,9 +420,9 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, /// queue which have a proper valid unique queueId. typedef bsl::unordered_map QueueContextByIdMap; - typedef AppIdInfos::const_iterator AppIdInfosCIter; + typedef AppInfos::const_iterator AppInfosCIter; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; private: // DATA @@ -997,8 +997,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, /// dispatcher thread. virtual void onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()) BSLS_KEYWORD_OVERRIDE; private: diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index a42bbdd30..d1137b0e5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -617,17 +617,12 @@ void ClusterStateManager::onLeaderSyncDataQueryResponse( const mqbu::StorageKey receivedKey( mqbu::StorageKey::BinaryRepresentation(), queueInfo.key().data()); - AppIdInfos appIdInfos; - for (bsl::vector::const_iterator cit = - queueInfo.appIds().cbegin(); - cit != queueInfo.appIds().cend(); - ++cit) { - AppIdInfo appIdInfo; - appIdInfo.first = cit->appId(); - appIdInfo.second.fromBinary(cit->appKey().data()); - - appIdInfos.insert(appIdInfo); - } + + AppInfos appIdInfos(d_allocator_p); + + mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, + queueInfo, + d_allocator_p); registerQueueInfo(queueUri, queueInfo.partitionId(), @@ -1180,7 +1175,7 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri, void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) { // executed by the *DISPATCHER* thread @@ -1689,12 +1684,17 @@ void ClusterStateManager::processQueueAssignmentAdvisory( // no need to update d_state_p->domainStates() entry // , queue was already known and registered + AppInfos appIdInfos(d_allocator_p); + + mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, + queueInfo, + d_allocator_p); BSLA_MAYBE_UNUSED const bool rc = d_state_p->assignQueue( uri, queueKey, queueInfo.partitionId(), - AppIdInfos()); + appIdInfos); BSLS_ASSERT_SAFE(rc == false); } else { @@ -1728,10 +1728,16 @@ void ClusterStateManager::processQueueAssignmentAdvisory( continue; // CONTINUE } + AppInfos appIdInfos(d_allocator_p); + + mqbc::ClusterUtil::parseQueueInfo(&appIdInfos, + queueInfo, + d_allocator_p); + d_state_p->assignQueue(uri, queueKey, queueInfo.partitionId(), - AppIdInfos()); + appIdInfos); d_state_p->domainStates() .at(uri.qualifiedDomain()) @@ -2027,10 +2033,8 @@ void ClusterStateManager::processLeaderSyncDataQuery( } // Populate queues info - mqbc::ClusterUtil::loadQueuesInfo( - &leaderAdvisory.queues(), - *d_state_p, - d_clusterConfig.clusterAttributes().isCSLModeEnabled()); + mqbc::ClusterUtil::loadQueuesInfo(&leaderAdvisory.queues(), + *d_state_p); } else { // Self is not available. diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h index dfd6d38bf..25acc06fc 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h @@ -113,8 +113,8 @@ class ClusterStateManager : public mqbc::ClusterStateObserver, typedef bsl::vector QueueAdvisories; - typedef mqbc::ClusterStateQueueInfo::AppIdInfo AppIdInfo; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfo AppInfo; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; typedef mqbc::ClusterState::UriToQueueInfoMap UriToQueueInfoMap; typedef mqbc::ClusterState::UriToQueueInfoMapCIter UriToQueueInfoMapCIter; @@ -380,7 +380,7 @@ class ClusterStateManager : public mqbc::ClusterStateObserver, virtual void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) BSLS_KEYWORD_OVERRIDE; /// Unassign the queue in the specified `advisory` by applying the diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 88bfd8d3c..10bcf80c7 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -77,7 +77,7 @@ void afterAppIdRegisteredDispatched(mqbi::Queue* queue, BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); queue->queueEngine()->afterAppIdRegistered( - mqbi::Storage::AppIdKeyPair(appId, mqbu::StorageKey())); + mqbi::Storage::AppInfo(appId, mqbu::StorageKey())); } void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, @@ -91,7 +91,7 @@ void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, // Note: Inputing nullKey here is okay since this routine will be removed // when we switch to CSL workflow. queue->queueEngine()->afterAppIdUnregistered( - mqbi::Storage::AppIdKeyPair(appId, mqbu::StorageKey())); + mqbi::Storage::AppInfo(appId, mqbu::StorageKey())); } /// Validates an application subscription. @@ -289,8 +289,8 @@ void Domain::onOpenQueueResponse( confirmationCookie)); } -void Domain::updateAuthorizedAppIds(const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) +void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { mqbconfm::QueueMode& queueMode = d_config.value().mode(); if (!queueMode.isFanoutValue()) { @@ -298,10 +298,7 @@ void Domain::updateAuthorizedAppIds(const AppIdInfos& addedAppIds, } bsl::vector& authorizedAppIds = queueMode.fanout().appIDs(); - const AppIdKeyPairs addedIdKeyPairs(addedAppIds.cbegin(), - addedAppIds.cend()); - for (AppIdKeyPairsCIter cit = addedIdKeyPairs.cbegin(); - cit != addedIdKeyPairs.cend(); + for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); ++cit) { if (bsl::find(authorizedAppIds.begin(), authorizedAppIds.end(), @@ -315,10 +312,8 @@ void Domain::updateAuthorizedAppIds(const AppIdInfos& addedAppIds, authorizedAppIds.push_back(cit->first); } - const AppIdKeyPairs removedIdKeyPairs(removedAppIds.cbegin(), - removedAppIds.cend()); - for (AppIdKeyPairsCIter cit = removedIdKeyPairs.cbegin(); - cit != removedIdKeyPairs.cend(); + for (AppInfosCIter cit = removedAppIds.cbegin(); + cit != removedAppIds.cend(); ++cit) { const bsl::vector::const_iterator it = bsl::find( authorizedAppIds.begin(), @@ -360,13 +355,13 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) return; // RETURN } - updateAuthorizedAppIds(info.appIdInfos()); + updateAuthorizedAppIds(info.appInfos()); } void Domain::onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { // executed by the associated CLUSTER's DISPATCHER thread @@ -500,14 +495,16 @@ int Domain::configure(bsl::ostream& errorDescription, if (!d_cluster_sp->isCSLModeEnabled() && d_config.value().mode().isFanoutValue()) { // Compute list of added and removed App IDs. - bsl::vector oldCfgAppIds( - oldConfig.value().mode().fanout().appIDs(), + bsl::unordered_set oldCfgAppIds( + oldConfig.value().mode().fanout().appIDs().cbegin(), + oldConfig.value().mode().fanout().appIDs().cend(), d_allocator_p); - bsl::vector newCfgAppIds( - d_config.value().mode().fanout().appIDs(), + bsl::unordered_set newCfgAppIds( + d_config.value().mode().fanout().appIDs().cbegin(), + d_config.value().mode().fanout().appIDs().cend(), d_allocator_p); - bsl::vector addedIds, removedIds; + bsl::unordered_set addedIds, removedIds; mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds, &removedIds, oldCfgAppIds, @@ -516,7 +513,8 @@ int Domain::configure(bsl::ostream& errorDescription, bslmt::LockGuard guard(&d_mutex); // Invoke callbacks for each added and removed ID on each queue. - bsl::vector::const_iterator it = addedIds.cbegin(); + bsl::unordered_set::const_iterator it = + addedIds.cbegin(); QueueMap::const_iterator qIt; for (; it != addedIds.cend(); it++) { for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) { diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index a8ef7ee14..af3af7172 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -104,8 +104,8 @@ class Domain : public mqbi::Domain, public mqbc::ClusterStateObserver { typedef QueueMap::iterator QueueMapIter; typedef QueueMap::const_iterator QueueMapCIter; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; - typedef AppIdKeyPairs::const_iterator AppIdKeyPairsCIter; + typedef mqbi::Storage::AppInfos AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; enum DomainState { e_STARTED = 0, e_STOPPING = 1, e_STOPPED = 2 }; @@ -199,9 +199,8 @@ class Domain : public mqbi::Domain, public mqbc::ClusterStateObserver { /// Update the list of authorized appIds by adding the specified /// `addedAppIds` and removing the specified `removedAppIds`. - void - updateAuthorizedAppIds(const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()); + void updateAuthorizedAppIds(const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()); // PRIVATE MANIPULATORS // (virtual: mqbc::ClusterStateObserver) @@ -223,8 +222,8 @@ class Domain : public mqbi::Domain, public mqbc::ClusterStateObserver { /// which case this queue update is ignored. virtual void onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()) BSLS_KEYWORD_OVERRIDE; private: diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp index e262b6f28..b37129837 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp @@ -224,7 +224,7 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const queueStorage.numBytes() = d_storage_mp->numBytes( mqbu::StorageKey::k_NULL_KEY); if (d_storage_mp->numVirtualStorages()) { - mqbi::Storage::AppIdKeyPairs appIdKeyPairs; + mqbi::Storage::AppInfos appIdKeyPairs; d_storage_mp->loadVirtualStorageDetails(&appIdKeyPairs); BSLS_ASSERT_SAFE( appIdKeyPairs.size() == @@ -233,8 +233,13 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const bsl::vector& virtualStorages = queueStorage.virtualStorages(); virtualStorages.resize(appIdKeyPairs.size()); - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { - const mqbi::Storage::AppIdKeyPair& p = appIdKeyPairs[i]; + + size_t i = 0; + for (mqbi::Storage::AppInfos::const_iterator cit = + appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit, ++i) { + const mqbi::Storage::AppInfo& p = *cit; virtualStorages[i].appId() = p.first; os.reset(); os << p.second; diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 3b5ec9e8f..5796bed4a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1589,7 +1589,7 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const mqbcmd::RelayQueueEngine& relayQueueEngine = out->makeRelay(); int numSubStreams = 0; - mqbi::Storage::AppIdKeyPairs appIdKeyPairs; + mqbi::Storage::AppInfos appIdKeyPairs; numSubStreams = storage()->numVirtualStorages(); storage()->loadVirtualStorageDetails(&appIdKeyPairs); @@ -1601,8 +1601,12 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const bsl::vector& subStreams = relayQueueEngine.subStreams(); subStreams.reserve(appIdKeyPairs.size()); - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { - const mqbi::Storage::AppIdKeyPair& p = appIdKeyPairs[i]; + + for (mqbi::Storage::AppInfos::const_iterator cit = + appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit) { + const mqbi::Storage::AppInfo& p = *cit; subStreams.resize(subStreams.size() + 1); mqbcmd::RelayQueueEngineSubStream& subStream = subStreams.back(); diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 57001d455..0bfaa6726 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -1821,7 +1821,7 @@ bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey, } void RootQueueEngine::afterAppIdRegistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + const mqbi::Storage::AppInfo& appIdKeyPair) { // executed by the *QUEUE DISPATCHER* thread @@ -1915,17 +1915,18 @@ void RootQueueEngine::afterAppIdRegistered( BSLS_ASSERT_SAFE(!key.isNull()); + mqbi::Storage::AppInfos one(1, d_allocator_p); + one.emplace(mqbi::Storage::AppInfo(appId, key)); d_queueState_p->storageManager()->updateQueuePrimary( d_queueState_p->uri(), d_queueState_p->key(), d_queueState_p->partitionId(), - mqbi::Storage::AppIdKeyPairs(1, - mqbi::Storage::AppIdKeyPair(appId, key)), - mqbi::Storage::AppIdKeyPairs()); + one, + mqbi::Storage::AppInfos()); } void RootQueueEngine::afterAppIdUnregistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + const mqbi::Storage::AppInfo& appIdKeyPair) { // executed by the *QUEUE DISPATCHER* thread @@ -1967,15 +1968,15 @@ void RootQueueEngine::afterAppIdUnregistered( << "]"; } } + mqbi::Storage::AppInfos one(1, d_allocator_p); + one.emplace(mqbi::Storage::AppInfo(appId, appKey)); d_queueState_p->storageManager()->updateQueuePrimary( d_queueState_p->uri(), d_queueState_p->key(), d_queueState_p->partitionId(), - mqbi::Storage::AppIdKeyPairs(), - mqbi::Storage::AppIdKeyPairs(1, - mqbi::Storage::AppIdKeyPair(appId, - appKey))); + mqbi::Storage::AppInfos(), + one); // No need to log in case of failure because 'updateQueuePrimary' does it // (even in case of success FTM). diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index 818c71dfd..f8bd0b626 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -400,14 +400,14 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void afterAppIdRegistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; + const mqbi::Storage::AppInfo& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; /// Called after the specified `appIdKeyPair` has been dynamically /// unregistered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void afterAppIdUnregistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; + const mqbi::Storage::AppInfo& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; /// Called after creation of a new storage for the specified /// `appIdKeyPair`. diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index c573e12f2..e319f48f3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -367,7 +367,7 @@ void StorageManager::queueCreationCb(int* status, int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isNewQueue) { // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' @@ -1049,7 +1049,7 @@ StorageManager::~StorageManager() void StorageManager::registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -1060,9 +1060,6 @@ void StorageManager::registerQueue(const bmqt::Uri& uri, BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); BSLS_ASSERT_SAFE(domain); - if (!d_cluster_p->isCSLModeEnabled()) { - BSLS_ASSERT_SAFE(appIdKeyPairs.empty()); - } mqbc::StorageUtil::registerQueue(d_cluster_p, d_dispatcher_p, @@ -1112,8 +1109,8 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) int StorageManager::updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -1229,7 +1226,7 @@ void StorageManager::unregisterQueueReplica(int partitionId, void StorageManager::updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain, bool allowDuplicate) { @@ -1410,7 +1407,7 @@ int StorageManager::start(bsl::ostream& errorDescription) bdlf::PlaceHolders::_2, // partitionId bdlf::PlaceHolders::_3, // QueueUri bdlf::PlaceHolders::_4, // QueueKey - bdlf::PlaceHolders::_5, // AppIdKeyPairs + bdlf::PlaceHolders::_5, // AppInfos bdlf::PlaceHolders::_6), // IsNewQueue) bdlf::BindUtil::bind(&StorageManager::queueDeletionCb, this, diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index 86ceb2fff..86a5a366c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -398,7 +398,7 @@ class StorageManager : public mqbi::StorageManager { int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isNewQueue); void queueDeletionCb(int* status, @@ -519,7 +519,7 @@ class StorageManager : public mqbi::StorageManager { virtual void registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; /// Synchronously unregister the queue with the specified `uri` from the @@ -542,8 +542,8 @@ class StorageManager : public mqbi::StorageManager { virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE; virtual void @@ -563,7 +563,7 @@ class StorageManager : public mqbi::StorageManager { updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 8bf7084f7..3f2ab1ed5 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -46,7 +46,7 @@ bsl::ostream& ClusterStateQueueInfo::print(bsl::ostream& stream, printer.printAttribute("uri", uri()); printer.printAttribute("queueKey", key()); printer.printAttribute("partitionId", partitionId()); - printer.printAttribute("appIdInfos", appIdInfos()); + printer.printAttribute("appIdInfos", appInfos()); printer.end(); return stream; @@ -87,8 +87,8 @@ void ClusterStateObserver::onQueueUnassigned( void ClusterStateObserver::onQueueUpdated( BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const bsl::string& domain, - BSLS_ANNOTATION_UNUSED const AppIdInfos& addedAppIds, - BSLS_ANNOTATION_UNUSED const AppIdInfos& removedAppIds) + BSLS_ANNOTATION_UNUSED const AppInfos& addedAppIds, + BSLS_ANNOTATION_UNUSED const AppInfos& removedAppIds) { // NOTHING } @@ -315,7 +315,7 @@ ClusterState& ClusterState::updatePartitionNumActiveQueues(int partitionId, bool ClusterState::assignQueue(const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos) + const AppInfos& appIdInfos) { // executed by the cluster *DISPATCHER* thread @@ -353,14 +353,14 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, updatePartitionQueueMapped(iter->second->partitionId(), -1); iter->second->setKey(key).setPartitionId(partitionId); - iter->second->appIdInfos() = appIdInfos; + iter->second->appInfos() = appIdInfos; iter->second->setPendingUnassignment(false); } } updatePartitionQueueMapped(partitionId, 1); - bmqu::Printer printer(&appIdInfos); + bmqu::Printer printer(&appIdInfos); BALL_LOG_INFO << "Cluster [" << d_cluster_p->name() << "]: " << "Assigning queue [" << uri << "], queueKey: [" << key << "] to Partition [" << partitionId @@ -453,8 +453,8 @@ void ClusterState::clearQueues() int ClusterState::updateQueue(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { // executed by the cluster *DISPATCHER* thread @@ -484,8 +484,8 @@ int ClusterState::updateQueue(const bmqt::Uri& uri, return rc_QUEUE_NOT_FOUND; // RETURN } - AppIdInfos& appIdInfos = iter->second->appIdInfos(); - for (AppIdInfosCIter citer = addedAppIds.cbegin(); + AppInfos& appIdInfos = iter->second->appInfos(); + for (AppInfosCIter citer = addedAppIds.cbegin(); citer != addedAppIds.cend(); ++citer) { if (!appIdInfos.insert(*citer).second) { @@ -493,18 +493,18 @@ int ClusterState::updateQueue(const bmqt::Uri& uri, } } - for (AppIdInfosCIter citer = removedAppIds.begin(); + for (AppInfosCIter citer = removedAppIds.begin(); citer != removedAppIds.end(); ++citer) { - const AppIdInfosCIter appIdInfoCIter = appIdInfos.find(*citer); + const AppInfosCIter appIdInfoCIter = appIdInfos.find(*citer); if (appIdInfoCIter == appIdInfos.cend()) { return rc_APPID_NOT_FOUND; // RETURN } appIdInfos.erase(appIdInfoCIter); } - bmqu::Printer printer1(&addedAppIds); - bmqu::Printer printer2(&removedAppIds); + bmqu::Printer printer1(&addedAppIds); + bmqu::Printer printer2(&removedAppIds); BALL_LOG_INFO << "Cluster [" << d_cluster_p->name() << "]: " << "Updating queue [" << uri << "], queueKey: [" << iter->second->key() << "], partitionId: [" @@ -515,8 +515,8 @@ int ClusterState::updateQueue(const bmqt::Uri& uri, else { // This update is for an entire domain, instead of any individual // queue. - bmqu::Printer printer1(&addedAppIds); - bmqu::Printer printer2(&removedAppIds); + bmqu::Printer printer1(&addedAppIds); + bmqu::Printer printer2(&removedAppIds); BALL_LOG_INFO << "Cluster [" << d_cluster_p->name() << "]: " << "Updating domain: [" << domain << "], addedAppIds: " << printer1 diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.h b/src/groups/mqb/mqbc/mqbc_clusterstate.h index 82b996a08..e1c4a928c 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.h @@ -161,9 +161,9 @@ class ClusterStatePartitionInfo { class ClusterStateQueueInfo { public: // TYPES - typedef mqbi::ClusterStateManager::AppIdInfo AppIdInfo; - typedef mqbi::ClusterStateManager::AppIdInfos AppIdInfos; - typedef mqbi::ClusterStateManager::AppIdInfosCIter AppIdInfosCIter; + typedef mqbi::ClusterStateManager::AppInfo AppInfo; + typedef mqbi::ClusterStateManager::AppInfos AppInfos; + typedef mqbi::ClusterStateManager::AppInfosCIter AppInfosCIter; private: // DATA @@ -178,7 +178,7 @@ class ClusterStateQueueInfo { // Assigned partitionId // (mqbs::DataStore::k_INVALID_PARTITION_ID if unassigned) - AppIdInfos d_appIdInfos; + AppInfos d_appInfos; // List of App id and key pairs // // TBD: Should also be added to mqbconfm::Domain @@ -210,7 +210,7 @@ class ClusterStateQueueInfo { ClusterStateQueueInfo(const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bslma::Allocator* allocator); // MANIPULATORS @@ -222,7 +222,7 @@ class ClusterStateQueueInfo { ClusterStateQueueInfo& setPendingUnassignment(bool value); /// Get a modifiable reference to this object's appIdInfos. - AppIdInfos& appIdInfos(); + AppInfos& appInfos(); /// Reset the `key`, `partitionId`, `appIdInfos` members of this object. /// Note that `uri` is left untouched because it is an invariant member @@ -233,7 +233,7 @@ class ClusterStateQueueInfo { const bmqt::Uri& uri() const; const mqbu::StorageKey& key() const; int partitionId() const; - const AppIdInfos& appIdInfos() const; + const AppInfos& appInfos() const; /// Return the value of the corresponding member of this object. bool pendingUnassignment() const; @@ -271,7 +271,7 @@ bsl::ostream& operator<<(bsl::ostream& stream, class ClusterStateObserver { public: // TYPES - typedef ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef ClusterStateQueueInfo::AppInfos AppInfos; public: // CREATORS @@ -318,11 +318,10 @@ class ClusterStateObserver { /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - virtual void - onQueueUpdated(const bmqt::Uri& uri, - const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()); + virtual void onQueueUpdated(const bmqt::Uri& uri, + const bsl::string& domain, + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()); /// Callback invoked when a partition with the specified `partitionId` /// has been orphan above a certain threshold amount of time. @@ -366,8 +365,8 @@ class ClusterState { public: // TYPES - typedef ClusterStateQueueInfo::AppIdInfos AppIdInfos; - typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter; + typedef ClusterStateQueueInfo::AppInfos AppInfos; + typedef ClusterStateQueueInfo::AppInfosCIter AppInfosCIter; typedef bsl::vector PartitionsInfo; @@ -567,7 +566,7 @@ class ClusterState { bool assignQueue(const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos); + const AppInfos& appIdInfos); /// Un-assign the queue with the specified `uri`. Return true if /// successful, or false if the queue does not exist. @@ -592,8 +591,8 @@ class ClusterState { /// cluster's dispatcher thread. int updateQueue(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()); + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()); /// Clear this cluster state object, without firing any observers. void clear(); @@ -767,7 +766,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( : d_uri(uri, allocator) , d_key() , d_partitionId(mqbs::DataStore::k_INVALID_PARTITION_ID) -, d_appIdInfos(allocator) +, d_appInfos(allocator) , d_pendingUnassignment(false) { // NOTHING @@ -777,12 +776,12 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bslma::Allocator* allocator) : d_uri(uri, allocator) , d_key(key) , d_partitionId(partitionId) -, d_appIdInfos(appIdInfos, allocator) +, d_appInfos(appIdInfos, allocator) , d_pendingUnassignment(false) { // NOTHING @@ -809,9 +808,9 @@ ClusterStateQueueInfo::setPendingUnassignment(bool value) return *this; } -inline ClusterStateQueueInfo::AppIdInfos& ClusterStateQueueInfo::appIdInfos() +inline ClusterStateQueueInfo::AppInfos& ClusterStateQueueInfo::appInfos() { - return d_appIdInfos; + return d_appInfos; } inline void ClusterStateQueueInfo::reset() @@ -821,7 +820,7 @@ inline void ClusterStateQueueInfo::reset() d_key.reset(); d_partitionId = mqbs::DataStore::k_INVALID_PARTITION_ID; - d_appIdInfos.clear(); + d_appInfos.clear(); } // ACCESSORS @@ -840,10 +839,10 @@ inline int ClusterStateQueueInfo::partitionId() const return d_partitionId; } -inline const ClusterStateQueueInfo::AppIdInfos& -ClusterStateQueueInfo::appIdInfos() const +inline const ClusterStateQueueInfo::AppInfos& +ClusterStateQueueInfo::appInfos() const { - return d_appIdInfos; + return d_appInfos; } inline bool ClusterStateQueueInfo::pendingUnassignment() const diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index 801b0ca1b..0316e7418 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -198,9 +198,7 @@ void ClusterStateManager::do_applyCSLSelf(const ClusterFSMArgsSp& args) return; // RETURN } - ClusterUtil::loadQueuesInfo(&clusterStateSnapshot.queues(), - tempState, - true); // includeAppIds + ClusterUtil::loadQueuesInfo(&clusterStateSnapshot.queues(), tempState); } else { // Verify that elector term in follower snapshot is less than the @@ -1120,9 +1118,7 @@ int ClusterStateManager::loadClusterStateSnapshot( } mqbc::ClusterUtil::loadPartitionsInfo(&out->partitions(), tempState); - mqbc::ClusterUtil::loadQueuesInfo(&out->queues(), - tempState, - true); // includeAppIds + mqbc::ClusterUtil::loadQueuesInfo(&out->queues(), tempState); return 0; } @@ -1513,7 +1509,7 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri, void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) { // executed by the *DISPATCHER* thread diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h index b2a8c2670..79af0a1c1 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h @@ -110,7 +110,7 @@ class ClusterStateManager typedef ClusterFSM::ClusterFSMArgs ClusterFSMArgs; typedef ClusterFSM::ClusterFSMArgsSp ClusterFSMArgsSp; - typedef mqbi::ClusterStateManager::AppIdInfos AppIdInfos; + typedef mqbi::ClusterStateManager::AppInfos AppInfos; public: // TYPES @@ -483,7 +483,7 @@ class ClusterStateManager virtual void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) BSLS_KEYWORD_OVERRIDE; /// Unassign the queue in the specified `advisory` by applying the diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp index e8258298b..24d31a1a5 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.t.cpp @@ -518,8 +518,7 @@ struct Tester { d_cluster_mp->_state()); mqbc::ClusterUtil::loadQueuesInfo( &response.clusterStateSnapshot().queues(), - d_cluster_mp->_state(), - true); // includeAppIds + d_cluster_mp->_state()); for (TestChannelMapCIter cit = d_cluster_mp->_channels().cbegin(); cit != d_cluster_mp->_channels().cend(); diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 98be24bbe..0f09d5f34 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -66,9 +66,9 @@ const char k_SELF_NODE_IS_STOPPING[] = "self node is stopping"; const char k_DOMAIN_CREATION_FAILURE[] = "failed to create domain"; // TYPES -typedef ClusterUtil::AppIdInfo AppIdInfo; -typedef ClusterUtil::AppIdInfos AppIdInfos; -typedef ClusterUtil::AppIdInfosCIter AppIdInfosCIter; +typedef ClusterUtil::AppInfo AppInfo; +typedef ClusterUtil::AppInfos AppInfos; +typedef ClusterUtil::AppInfosCIter AppInfosCIter; typedef ClusterUtil::ClusterNodeSessionMapConstIter ClusterNodeSessionMapConstIter; @@ -97,31 +97,24 @@ void applyPartitionPrimary( void applyQueueAssignment(mqbc::ClusterState* clusterState, const bsl::vector& queues) { + // TODO: refactor to use allocator(s) + bslma::Allocator* allocator = 0; + for (bsl::vector::const_iterator it = queues.begin(); it != queues.end(); ++it) { const bmqp_ctrlmsg::QueueInfo& queueInfo = *it; - const bmqt::Uri uri(queueInfo.uri()); + const bmqt::Uri uri(queueInfo.uri(), allocator); const int partitionId(queueInfo.partitionId()); const mqbu::StorageKey queueKey( mqbu::StorageKey::BinaryRepresentation(), queueInfo.key().data()); - const bsl::vector& appIds = - queueInfo.appIds(); - AppIdInfos addedAppIds; - for (bsl::vector::const_iterator citer = - appIds.cbegin(); - citer != appIds.cend(); - ++citer) { - AppIdInfo appIdInfo; - appIdInfo.first = citer->appId(); - appIdInfo.second.fromBinary(citer->appKey().data()); - - addedAppIds.insert(appIdInfo); - } + AppInfos addedAppIds(allocator); + mqbc::ClusterUtil::parseQueueInfo(&addedAppIds, queueInfo, allocator); + // CSL commit clusterState->assignQueue(uri, queueKey, partitionId, addedAppIds); } } @@ -220,24 +213,24 @@ void applyQueueUpdate(mqbc::ClusterState* clusterState, mqbs::DataStore::k_INVALID_PARTITION_ID); } - AppIdInfos addedAppIds; + AppInfos addedAppIds; for (bsl::vector::const_iterator citer = queueUpdate.addedAppIds().cbegin(); citer != queueUpdate.addedAppIds().cend(); ++citer) { - AppIdInfo appIdInfo; + AppInfo appIdInfo; appIdInfo.first = citer->appId(); appIdInfo.second.fromBinary(citer->appKey().data()); addedAppIds.insert(appIdInfo); } - AppIdInfos removedAppIds; + AppInfos removedAppIds; for (bsl::vector::const_iterator citer = queueUpdate.removedAppIds().begin(); citer != queueUpdate.removedAppIds().end(); ++citer) { - AppIdInfo appIdInfo; + AppInfo appIdInfo; appIdInfo.first = citer->appId(); appIdInfo.second.fromBinary(citer->appKey().data()); @@ -808,7 +801,7 @@ void ClusterUtil::populateQueueAssignmentAdvisory( key->loadBinary(&queueInfo.key()); // Generate appIds and appKeys - populateAppIdInfos(&queueInfo.appIds(), domain->config().mode()); + populateAppInfos(&queueInfo.appIds(), domain->config().mode()); BALL_LOG_INFO << clusterData->identity().description() << ": Populated QueueAssignmentAdvisory: " << *advisory; @@ -900,6 +893,8 @@ ClusterUtil::assignQueue(ClusterState* clusterState, DomainStatesIter domIt = clusterState->domainStates().find( uri.qualifiedDomain()); if (domIt == clusterState->domainStates().end()) { + // REVISIT: This is also done in 'ClusterState::assignQueue' + clusterState->domainStates()[uri.qualifiedDomain()].createInplace( allocator, allocator); @@ -977,12 +972,13 @@ ClusterUtil::assignQueue(ClusterState* clusterState, } } - // Queue is no longer pending unassignment - const DomainStatesCIter cit = clusterState->domainStates().find( + // Set the queue as no longer pending unassignment + const DomainStatesCIter citDomainState = clusterState->domainStates().find( uri.qualifiedDomain()); - if (cit != clusterState->domainStates().cend()) { - UriToQueueInfoMapCIter qcit = cit->second->queuesInfo().find(uri); - if (qcit != cit->second->queuesInfo().cend()) { + if (citDomainState != clusterState->domainStates().cend()) { + UriToQueueInfoMapCIter qcit = + citDomainState->second->queuesInfo().find(uri); + if (qcit != citDomainState->second->queuesInfo().cend()) { BSLS_ASSERT_SAFE(cluster->isCSLModeEnabled() && qcit->second->pendingUnassignment()); qcit->second->setPendingUnassignment(false); @@ -1028,11 +1024,21 @@ ClusterUtil::assignQueue(ClusterState* clusterState, // In CSL mode, we assign the queue to ClusterState upon CSL commit // callback of QueueAssignmentAdvisory, so we don't assign it here. + // In non-CSL mode this is the shortcut to call Primary CQH instead of + // waiting for the quorum of acks in the ledger. + + BSLS_ASSERT_SAFE(queueAdvisory.queues().size() == 1); + + bmqp_ctrlmsg::QueueInfo& queueInfo = queueAdvisory.queues().back(); + + AppInfos appInfos(allocator); + mqbc::ClusterUtil::parseQueueInfo(&appInfos, queueInfo, allocator); + BSLA_MAYBE_UNUSED const bool assignRc = clusterState->assignQueue( uri, key, queueAdvisory.queues().back().partitionId(), - AppIdInfos()); + appInfos); BSLS_ASSERT_SAFE(assignRc); domIt->second->adjustQueueCount(1); @@ -1054,7 +1060,7 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appInfos, const QueueAssigningCb& queueAssigningCb, bool forceUpdate) { @@ -1096,40 +1102,39 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, BSLS_ASSERT_SAFE(qs->uri() == uri); if ((qs->partitionId() == partitionId) && - (qs->key() == queueKey) && (qs->appIdInfos() == appIdInfos)) { + (qs->key() == queueKey) && (qs->appInfos() == appInfos)) { // All good.. nothing to update. return; // RETURN } - bmqu::Printer stateAppIdInfos(&qs->appIdInfos()); - bmqu::Printer storageAppIdInfos(&appIdInfos); + bmqu::Printer stateAppInfos(&qs->appInfos()); + bmqu::Printer storageAppInfos(&appInfos); - // PartitionId and/or QueueKey and/or AppIdInfos mismatch. + // PartitionId and/or QueueKey and/or AppInfos mismatch. if (!forceUpdate) { BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") << cluster->description() << ": For queue [ " << uri - << "], different partitionId/queueKey/appIdInfos in " + << "], different partitionId/queueKey/appInfos in " << "cluster state and storage. " - << "PartitionId/QueueKey/AppIdInfos in cluster state [" + << "PartitionId/QueueKey/AppInfos in cluster state [" << qs->partitionId() << "], [" << qs->key() << "], [" - << stateAppIdInfos - << "]. PartitionId/QueueKey/AppIdInfos in storage [" + << stateAppInfos + << "]. PartitionId/QueueKey/AppInfos in storage [" << partitionId << "], [" << queueKey << "], [" - << storageAppIdInfos << "]." << BMQTSK_ALARMLOG_END; + << storageAppInfos << "]." << BMQTSK_ALARMLOG_END; return; // RETURN } BALL_LOG_WARN << cluster->description() << ": For queue [" << uri << "], force-updating " - << "partitionId/queueKey/appIdInfos from [" + << "partitionId/queueKey/appInfos from [" << qs->partitionId() << "], [" << qs->key() << "], [" - << stateAppIdInfos << "] to [" << partitionId - << "], [" << queueKey << "], [" << storageAppIdInfos - << "]."; + << stateAppInfos << "] to [" << partitionId << "], [" + << queueKey << "], [" << storageAppInfos << "]."; clusterState->queueKeys().erase(qs->key()); - clusterState->assignQueue(uri, queueKey, partitionId, appIdInfos); + clusterState->assignQueue(uri, queueKey, partitionId, appInfos); BALL_LOG_INFO << cluster->description() << ": Queue assigned: " << "[uri: " << uri << ", queueKey: " << queueKey @@ -1144,8 +1149,8 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, << ": re-registering a known queue with a stale view, " << "but queueKey is not unique. " << "QueueKey [" << queueKey << "], URI [" << uri << "], Partition [" - << partitionId << "], AppIdInfos [" - << storageAppIdInfos << "]." << BMQTSK_ALARMLOG_END; + << partitionId << "], AppInfos [" << storageAppInfos + << "]." << BMQTSK_ALARMLOG_END; return; // RETURN } @@ -1161,13 +1166,14 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, } // Queue is not known, so add it. - clusterState->assignQueue(uri, queueKey, partitionId, appIdInfos); + clusterState->assignQueue(uri, queueKey, partitionId, appInfos); - bmqu::Printer printer(&appIdInfos); - BALL_LOG_INFO << cluster->description() << ": Queue assigned: " - << "[uri: " << uri << ", queueKey: " << queueKey + bmqu::Printer printer(&appInfos); + BALL_LOG_INFO << cluster->description() + << ": Queue assigned: " << "[uri: " << uri + << ", queueKey: " << queueKey << ", partitionId: " << partitionId - << ", appIdInfos: " << printer << "]"; + << ", appInfos: " << printer << "]"; if (!cluster->isCSLModeEnabled()) { ClusterState::QueueKeysInsertRc insertRc = @@ -1191,11 +1197,11 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, queueAssigningCb(uri, false); // processingPendingRequests } -void ClusterUtil::populateAppIdInfos(AppIdInfos* appIdInfos, - const mqbconfm::QueueMode& domainConfig) +void ClusterUtil::populateAppInfos(AppInfos* appInfos, + const mqbconfm::QueueMode& domainConfig) { // PRECONDITIONS - BSLS_ASSERT_SAFE(appIdInfos && appIdInfos->empty()); + BSLS_ASSERT_SAFE(appInfos && appInfos->empty()); if (domainConfig.isFanoutValue()) { const bsl::vector& cfgAppIds = @@ -1208,21 +1214,21 @@ void ClusterUtil::populateAppIdInfos(AppIdInfos* appIdInfos, mqbu::StorageKey appKey; mqbs::StorageUtil::generateStorageKey(&appKey, &appKeys, *cit); - appIdInfos->insert(AppIdInfo(*cit, appKey)); + appInfos->insert(AppInfo(*cit, appKey)); } } else { - appIdInfos->insert(AppIdInfo(bmqp::ProtocolUtil::k_DEFAULT_APP_ID, - mqbi::QueueEngine::k_DEFAULT_APP_KEY)); + appInfos->insert(AppInfo(bmqp::ProtocolUtil::k_DEFAULT_APP_ID, + mqbi::QueueEngine::k_DEFAULT_APP_KEY)); } } -void ClusterUtil::populateAppIdInfos( - bsl::vector* appIdInfos, +void ClusterUtil::populateAppInfos( + bsl::vector* appInfos, const mqbconfm::QueueMode& domainConfig) { // PRECONDITIONS - BSLS_ASSERT_SAFE(appIdInfos && appIdInfos->empty()); + BSLS_ASSERT_SAFE(appInfos && appInfos->empty()); if (domainConfig.isFanoutValue()) { const bsl::vector& cfgAppIds = @@ -1237,9 +1243,13 @@ void ClusterUtil::populateAppIdInfos( mqbu::StorageKey appKey; mqbs::StorageUtil::generateStorageKey(&appKey, &appKeys, *cit); + // This is the only place generating keys upon queue assignment + // for both CSL (FSM) and non-CSL (non-FSM). The latter used to + // generate keys in 'StorageUtil::registerQueue'. + appKey.loadBinary(&appIdInfo.appKey()); - appIdInfos->push_back(appIdInfo); + appInfos->push_back(appIdInfo); } } else { @@ -1247,7 +1257,7 @@ void ClusterUtil::populateAppIdInfos( appIdInfo.appId() = bmqp::ProtocolUtil::k_DEFAULT_APP_ID; mqbi::QueueEngine::k_DEFAULT_APP_KEY.loadBinary(&appIdInfo.appKey()); - appIdInfos->push_back(appIdInfo); + appInfos->push_back(appIdInfo); } } @@ -1313,7 +1323,7 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, mqbu::StorageKey::k_NULL_KEY.loadBinary((&queueUpdate.key())); queueUpdate.domain() = domain->name(); - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; mqbu::StorageKey::k_NULL_KEY.loadBinary(&appIdInfo.appKey()); @@ -1334,9 +1344,9 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, BSLS_ASSERT_SAFE(queueUpdate.domain() == domain->name()); bsl::unordered_set appKeys; - const AppIdInfos& appIdInfos = qinfoCit->second->appIdInfos(); - for (AppIdInfosCIter appInfoCit = appIdInfos.cbegin(); - appInfoCit != appIdInfos.cend(); + const AppInfos& appInfos = qinfoCit->second->appInfos(); + for (AppInfosCIter appInfoCit = appInfos.cbegin(); + appInfoCit != appInfos.cend(); ++appInfoCit) { if (appInfoCit->first == appId) { BALL_LOG_ERROR << "Failed to register appId '" << appId @@ -1349,7 +1359,7 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, appKeys.insert(appInfoCit->second); } - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; mqbu::StorageKey appKey; @@ -1441,7 +1451,7 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, mqbu::StorageKey::k_NULL_KEY.loadBinary((&queueUpdate.key())); queueUpdate.domain() = domain->name(); - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; mqbu::StorageKey::k_NULL_KEY.loadBinary(&appIdInfo.appKey()); @@ -1462,12 +1472,12 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, BSLS_ASSERT_SAFE(queueUpdate.domain() == domain->name()); bool appIdFound = false; - const AppIdInfos& appIdInfos = qinfoCit->second->appIdInfos(); - for (AppIdInfosCIter appInfoCit = appIdInfos.cbegin(); - appInfoCit != appIdInfos.cend(); + const AppInfos& appInfos = qinfoCit->second->appInfos(); + for (AppInfosCIter appInfoCit = appInfos.cbegin(); + appInfoCit != appInfos.cend(); ++appInfoCit) { if (appInfoCit->first == appId) { - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; appInfoCit->second.loadBinary(&appIdInfo.appKey()); @@ -1559,9 +1569,7 @@ void ClusterUtil::sendClusterState( &advisory.sequenceNumber()); advisory.partitions() = partitions; - loadQueuesInfo(&advisory.queues(), - clusterState, - clusterData->cluster().isCSLModeEnabled()); + loadQueuesInfo(&advisory.queues(), clusterState); } else if (sendPartitionPrimaryInfo) { bmqp_ctrlmsg::PartitionPrimaryAdvisory& advisory = @@ -1581,9 +1589,7 @@ void ClusterUtil::sendClusterState( clusterData->electorInfo().nextLeaderMessageSequence( &advisory.sequenceNumber()); - loadQueuesInfo(&advisory.queues(), - clusterState, - clusterData->cluster().isCSLModeEnabled()); + loadQueuesInfo(&advisory.queues(), clusterState); } if (!clusterData->cluster().isCSLModeEnabled()) { @@ -2190,8 +2196,7 @@ void ClusterUtil::loadPartitionsInfo( } void ClusterUtil::loadQueuesInfo(bsl::vector* out, - const ClusterState& state, - bool includeAppIds) + const ClusterState& state) { // PRECONDITIONS BSLS_ASSERT_SAFE(out); @@ -2212,17 +2217,14 @@ void ClusterUtil::loadQueuesInfo(bsl::vector* out, BSLS_ASSERT_SAFE(!qCit->second->key().isNull()); qCit->second->key().loadBinary(&queueInfo.key()); - if (includeAppIds) { - for (AppIdInfosCIter appIdCit = - qCit->second->appIdInfos().cbegin(); - appIdCit != qCit->second->appIdInfos().cend(); - ++appIdCit) { - bmqp_ctrlmsg::AppIdInfo appIdInfo; - appIdInfo.appId() = appIdCit->first; - appIdCit->second.loadBinary(&appIdInfo.appKey()); + for (AppInfosCIter appIdCit = qCit->second->appInfos().cbegin(); + appIdCit != qCit->second->appInfos().cend(); + ++appIdCit) { + bmqp_ctrlmsg::AppIdInfo appIdInfo; + appIdInfo.appId() = appIdCit->first; + appIdCit->second.loadBinary(&appIdInfo.appKey()); - queueInfo.appIds().push_back(appIdInfo); - } + queueInfo.appIds().push_back(appIdInfo); } out->push_back(queueInfo); @@ -2298,5 +2300,20 @@ int ClusterUtil::latestLedgerLSN(bmqp_ctrlmsg::LeaderMessageSequence* out, return rc_SUCCESS; } +void ClusterUtil::parseQueueInfo(mqbi::ClusterStateManager::AppInfos* out, + const bmqp_ctrlmsg::QueueInfo& queueInfo, + bslma::Allocator* allocator) +{ + for (bsl::vector::const_iterator cit = + queueInfo.appIds().cbegin(); + cit != queueInfo.appIds().cend(); + ++cit) { + out->emplace(mqbi::ClusterStateManager::AppInfo( + bsl::string(cit->appId(), allocator), + mqbu::StorageKey(mqbu::StorageKey::BinaryRepresentation(), + cit->appKey().data()))); + } +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.h b/src/groups/mqb/mqbc/mqbc_clusterutil.h index 5a54e8e49..970fdb3e0 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.h +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.h @@ -88,9 +88,9 @@ struct ClusterUtil { public: // TYPES - typedef ClusterStateQueueInfo::AppIdInfo AppIdInfo; - typedef ClusterStateQueueInfo::AppIdInfos AppIdInfos; - typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter; + typedef ClusterStateQueueInfo::AppInfo AppInfo; + typedef ClusterStateQueueInfo::AppInfos AppInfos; + typedef ClusterStateQueueInfo::AppInfosCIter AppInfosCIter; typedef mqbc::ClusterState::QueueInfoSp QueueInfoSp; typedef ClusterState::UriToQueueInfoMap UriToQueueInfoMap; @@ -270,17 +270,17 @@ struct ClusterUtil { const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, const QueueAssigningCb& queueAssigningCb, bool forceUpdate); /// Generate appKeys based on the appIds in the specified `domainConfig` /// and populate them into the specified `appIdInfos`. - static void populateAppIdInfos(AppIdInfos* appIdInfos, - const mqbconfm::QueueMode& domainConfig); + static void populateAppInfos(AppInfos* appIdInfos, + const mqbconfm::QueueMode& domainConfig); static void - populateAppIdInfos(bsl::vector* appIdInfos, - const mqbconfm::QueueMode& domainConfig); + populateAppInfos(bsl::vector* appIdInfos, + const mqbconfm::QueueMode& domainConfig); /// Register the specified `appId` for all queues in the specified /// `domain`, using the specified `clusterData` and `clusterState`. @@ -392,8 +392,7 @@ struct ClusterUtil { /// `state`. If the specified `includeAppIds` is true, then the appId /// info for the queues will be loaded as well. static void loadQueuesInfo(bsl::vector* out, - const ClusterState& state, - bool includeAppIds); + const ClusterState& state); /// Load into the specified `out` the list of peer nodes using the /// specified `clusterData`. @@ -411,6 +410,12 @@ struct ClusterUtil { static int latestLedgerLSN(bmqp_ctrlmsg::LeaderMessageSequence* out, const ClusterStateLedger& ledger, const ClusterData& clusterData); + + /// Load into the specified `out` all `AppInfo` data from the specified + /// `queueInfo` using the specified `allocator`. + static void parseQueueInfo(mqbi::ClusterStateManager::AppInfos* out, + const bmqp_ctrlmsg::QueueInfo& queueInfo, + bslma::Allocator* allocator); }; // ============================================================================ diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index f7e4eefe7..d472aa809 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3643,10 +3643,10 @@ void StorageManager::initializeQueueKeyInfoMap( mqbs::DataStoreConfigQueueInfo qinfo; qinfo.setCanonicalQueueUri(csQinfo.uri().asString()); qinfo.setPartitionId(csQinfo.partitionId()); - for (AppIdInfosCIter appIdCit = csQinfo.appIdInfos().cbegin(); - appIdCit != csQinfo.appIdInfos().cend(); + for (AppInfosCIter appIdCit = csQinfo.appInfos().cbegin(); + appIdCit != csQinfo.appInfos().cend(); ++appIdCit) { - qinfo.addAppIdKeyPair(*appIdCit); + qinfo.addAppInfo(*appIdCit); } d_queueKeyInfoMapVec.at(csQinfo.partitionId()) @@ -3657,11 +3657,12 @@ void StorageManager::initializeQueueKeyInfoMap( d_isQueueKeyInfoMapVecInitialized = true; } -void StorageManager::registerQueue(const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs, - mqbi::Domain* domain) +void StorageManager::registerQueue( + const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + int partitionId, + const bsl::unordered_set& appIdKeyPairs, + mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -3720,8 +3721,8 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) int StorageManager::updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -3819,7 +3820,7 @@ void StorageManager::unregisterQueueReplica(int partitionId, void StorageManager::updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain, bool allowDuplicate) { diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index 25cfbd166..7fea414f3 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -155,7 +155,7 @@ class StorageManager typedef ClusterState::DomainStatesCIter DomainStatesCIter; typedef ClusterState::UriToQueueInfoMapCIter UriToQueueInfoMapCIter; - typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter; + typedef ClusterStateQueueInfo::AppInfosCIter AppInfosCIter; /// Vector of pairs of buffered primary status advisories and their source typedef bsl::vector< @@ -853,11 +853,12 @@ class StorageManager /// associated queue storage created. /// /// THREAD: Executed by the Client's dispatcher thread. - virtual void registerQueue(const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs, - mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; + virtual void + registerQueue(const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + int partitionId, + const bsl::unordered_set& appIdKeyPairs, + mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; /// Synchronously unregister the queue with the specified `uri` from the /// specified `partitionId`. @@ -878,8 +879,8 @@ class StorageManager virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE; virtual void @@ -899,7 +900,7 @@ class StorageManager updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp index cb2055070..94916c3e4 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp @@ -767,14 +767,14 @@ struct TestHelper { handle, uri_t, rec.d_queueKey, - mqbs::DataStore::AppIdKeyPairs(), + mqbs::DataStore::AppInfos(), rec.d_timestamp, true); // isNewQueue d_cluster_mp->_state().assignQueue(uri_t, queueKey, partitionId, - mqbc::ClusterState::AppIdInfos()); + mqbc::ClusterState::AppInfos()); BSLS_ASSERT_OPT(rc == 0); return queueKey; diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 21a3bf7a2..3bae89492 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -60,30 +60,6 @@ namespace mqbc { namespace { -/// Unary predicate used in certain `find` algorithms to match an element -/// having the specified AppId. -class AppIdMatcher { - private: - // TYPES - typedef bsl::pair AppIdKeyPair; - - // DATA - const bsl::string& d_expectedAppId; - - public: - // CREATORS - AppIdMatcher(const bsl::string& expectedAppId) - : d_expectedAppId(expectedAppId) - { - } - - // ACCESSORS - bool operator()(const AppIdKeyPair& appIdKeyPair) const - { - return d_expectedAppId == appIdKeyPair.first; - } -}; - /// Post on the optionally specified `semaphore`. void optionalSemaphorePost(bslmt::Semaphore* semaphore) { @@ -99,27 +75,17 @@ void optionalSemaphorePost(bslmt::Semaphore* semaphore) // ------------------ // PRIVATE FUNCTIONS -bool StorageUtil::loadUpdatedAppIdKeyPairs( - AppIdKeyPairs* addedAppIdKeyPairs, - AppIdKeyPairs* removedAppIdKeyPairs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - const mqbs::ReplicatedStorage& storage, - const AppIdKeyPairs& newAppIdKeyPairs, - const bsl::vector& cfgAppIds, - bool isCSLMode) +bool StorageUtil::loadUpdatedAppInfos(AppInfos* addedAppInfos, + AppInfos* removedAppInfos, + const mqbs::ReplicatedStorage& storage, + const AppInfos& newAppInfos) { // executed by the *CLUSTER DISPATCHER* thread // PRECONDITIONS - BSLS_ASSERT_SAFE(addedAppIdKeyPairs); - BSLS_ASSERT_SAFE(removedAppIdKeyPairs); - if (isCSLMode) { - BSLS_ASSERT_SAFE(cfgAppIds.empty()); - } - else { - BSLS_ASSERT_SAFE(newAppIdKeyPairs.empty()); - } + BSLS_ASSERT_SAFE(addedAppInfos); + BSLS_ASSERT_SAFE(removedAppInfos); + BSLS_ASSERT_SAFE(!newAppInfos.empty()); // This function is invoked by 'StorageManager::registerQueue' if the queue // with specified 'storage' is in fanout mode, in order to add or remove @@ -135,58 +101,17 @@ bool StorageUtil::loadUpdatedAppIdKeyPairs( // list of newly added and removed appIds, and then invoking 'updateQueue' // in the appropriate thread. - AppIdKeyPairs existingAppIdKeyPairs; - storage.loadVirtualStorageDetails(&existingAppIdKeyPairs); - - if (isCSLMode) { - loadAddedAndRemovedEntries(addedAppIdKeyPairs, - removedAppIdKeyPairs, - existingAppIdKeyPairs, - newAppIdKeyPairs); - - if (addedAppIdKeyPairs->empty() && removedAppIdKeyPairs->empty()) { - // No appIds to add or remove. - return false; // RETURN - } - } - else { - bsl::vector existingAppIds; - for (size_t i = 0; i < existingAppIdKeyPairs.size(); ++i) { - existingAppIds.push_back(existingAppIdKeyPairs[i].first); - } - - bsl::vector addedAppIds; - bsl::vector removedAppIds; - loadAddedAndRemovedEntries(&addedAppIds, - &removedAppIds, - existingAppIds, - cfgAppIds); - - if (addedAppIds.empty() && removedAppIds.empty()) { - // No appIds to add or remove. - return false; // RETURN - } + AppInfos existingAppInfos; + storage.loadVirtualStorageDetails(&existingAppInfos); - // Generate unique appKeys for the added appIds, and populate - // 'addedAppIdKeyPairs'. - for (size_t i = 0; i < addedAppIds.size(); ++i) { - mqbu::StorageKey appKey = generateAppKey(appKeys, - appKeysLock, - addedAppIds[i]); - addedAppIdKeyPairs->push_back( - bsl::make_pair(addedAppIds[i], appKey)); - } + loadAddedAndRemovedEntries(addedAppInfos, + removedAppInfos, + existingAppInfos, + newAppInfos); - // Populate 'removedAppIdKeyPairs'. - for (size_t i = 0; i < removedAppIds.size(); ++i) { - AppIdKeyPairsCIter it = bsl::find_if( - existingAppIdKeyPairs.begin(), - existingAppIdKeyPairs.end(), - AppIdMatcher(removedAppIds[i])); - BSLS_ASSERT_SAFE(it != existingAppIdKeyPairs.end()); - removedAppIdKeyPairs->push_back( - bsl::make_pair(it->first, it->second)); - } + if (addedAppInfos->empty() && removedAppInfos->empty()) { + // No appIds to add or remove. + return false; // RETURN } return true; @@ -196,9 +121,9 @@ void StorageUtil::registerQueueDispatched( BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, mqbs::FileStore* fs, mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs) + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -255,16 +180,16 @@ void StorageUtil::registerQueueDispatched( void StorageUtil::updateQueuePrimaryDispatched( BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, mqbs::ReplicatedStorage* storage, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, - bool isFanout, - bool isCSLMode) + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + AppKeys* appKeys, + bslmt::Mutex* appKeysLock, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -293,12 +218,12 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, mqbs::FileStore* fs, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, - bool isFanout, - bool isCSLMode) + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -352,7 +277,7 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, BALL_LOG_INFO_BLOCK { - bmqu::Printer printer(&addedIdKeyPairs); + bmqu::Printer printer(&addedIdKeyPairs); BALL_LOG_OUTPUT_STREAM << clusterDescription << ": Partition [" << partitionId @@ -364,7 +289,7 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, } if (!removedIdKeyPairs.empty()) { - for (AppIdKeyPairsCIter cit = removedIdKeyPairs.begin(); + for (AppInfosCIter cit = removedIdKeyPairs.begin(); cit != removedIdKeyPairs.end(); ++cit) { // Write QueueDeletionRecord to data store for removed appIds. @@ -409,7 +334,7 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, BALL_LOG_INFO_BLOCK { - bmqu::Printer printer(&removedIdKeyPairs); + bmqu::Printer printer(&removedIdKeyPairs); BALL_LOG_OUTPUT_STREAM << clusterDescription << ": Partition [" << partitionId @@ -424,8 +349,8 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, // away. fs->dispatcherFlush(true, false); - bmqu::Printer printer1(&addedIdKeyPairs); - bmqu::Printer printer2(&removedIdKeyPairs); + bmqu::Printer printer1(&addedIdKeyPairs); + bmqu::Printer printer2(&removedIdKeyPairs); BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] updated [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "] with the storage as primary: " @@ -439,7 +364,7 @@ int StorageUtil::addVirtualStoragesInternal( mqbs::ReplicatedStorage* storage, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, const bsl::string& clusterDescription, int partitionId, bool isFanout, @@ -466,7 +391,7 @@ int StorageUtil::addVirtualStoragesInternal( // Register appKeys with 'appKeys' and then with the underlying // physical 'storage'. - for (AppIdKeyPairsCIter cit = appIdKeyPairs.begin(); + for (AppInfosCIter cit = appIdKeyPairs.begin(); cit != appIdKeyPairs.end(); ++cit) { AppKeysInsertRc irc = appKeys->insert(cit->second); @@ -1524,13 +1449,15 @@ void StorageUtil::recoveredQueuesCb( } if (qinfo.appIdKeyPairs().size() != 1 || - qinfo.appIdKeyPairs()[0].first != + qinfo.appIdKeyPairs().cbegin()->first != bmqp::ProtocolUtil::k_DEFAULT_APP_ID) { - // This ia a fanout queue + // This is a fanout queue AppIds appIds; - for (size_t n = 0; n < qinfo.appIdKeyPairs().size(); ++n) { - const AppIdKeyPair& p = qinfo.appIdKeyPairs()[n]; + for (AppInfos::const_iterator cit = qinfo.appIdKeyPairs().cbegin(); + cit != qinfo.appIdKeyPairs().cend(); + ++cit) { + const AppInfo& p = *cit; AppIdsInsertRc appIdsIrc = appIds.insert(p.first); if (false == appIdsIrc.second) { @@ -1662,7 +1589,7 @@ void StorageUtil::recoveredQueuesCb( ++qit) { const mqbu::StorageKey& queueKey = qit->first; const mqbs::DataStoreConfigQueueInfo& qinfo = qit->second; - const AppIdKeyPairs& appIdKeyPairs = qinfo.appIdKeyPairs(); + const AppInfos& appIdKeyPairs = qinfo.appIdKeyPairs(); const bmqt::Uri queueUri(qinfo.canonicalQueueUri()); BSLS_ASSERT_SAFE(queueUri.isValid()); @@ -1696,7 +1623,7 @@ void StorageUtil::recoveredQueuesCb( BSLS_ASSERT_SAFE(queueKey == rstorage->queueKey()); BSLS_ASSERT_SAFE(partitionId == rstorage->partitionId()); - for (AppIdKeyPairsCIter ait = appIdKeyPairs.begin(); + for (AppInfosCIter ait = appIdKeyPairs.begin(); ait != appIdKeyPairs.end(); ++ait) { BSLA_MAYBE_UNUSED const bsl::string& appId = ait->first; @@ -1830,7 +1757,7 @@ void StorageUtil::recoveredQueuesCb( bmqu::MemOutStream errorDesc; int rc; if (domain->config().mode().isFanoutValue()) { - for (AppIdKeyPairsCIter ait = appIdKeyPairs.begin(); + for (AppInfosCIter ait = appIdKeyPairs.begin(); ait != appIdKeyPairs.end(); ++ait) { const bsl::string& appId = ait->first; @@ -2338,7 +2265,7 @@ void StorageUtil::registerQueue( const mqbu::StorageKey& queueKey, const bsl::string& clusterDescription, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -2357,9 +2284,6 @@ void StorageUtil::registerQueue( partitionId < cluster->clusterConfig()->partitionConfig().numPartitions()); BSLS_ASSERT_SAFE(domain); - if (!cluster->isCSLModeEnabled()) { - BSLS_ASSERT_SAFE(appIdKeyPairs.empty()); - } // StorageMgr is either aware of the queue (the 'uri') or it isn't. If it // is already aware, either this queue was registered earlier or it was @@ -2374,7 +2298,7 @@ void StorageUtil::registerQueue( // and deployed before the node started (some appIds were added or removed // or both). We need to make sure that these appIds are handled correctly. // The logic to get a list of added and/or removed appId/key pairs is - // handled by invoking 'loadUpdatedAppIdKeyPairs' in this function if queue + // handled by invoking 'loadUpdatedAppInfos' in this function if queue // is in fanout mode. // If StorageMgr is not aware of the queue, then its a simpler process -- @@ -2438,34 +2362,12 @@ void StorageUtil::registerQueue( // to be added or removed (see comments at the beginning of this // routine for explanation). - AppIdKeyPairs addedAppIdKeyPairs, removedAppIdKeyPairs; + AppInfos addedAppInfos, removedAppInfos; - bool hasUpdate = false; - if (cluster->isCSLModeEnabled()) { - // In CSL mode, queue assignment procedure is split into queue - // assignment and queue update, so we simply remove all appIds - // here, and re-add them during queue update phase. - hasUpdate = loadUpdatedAppIdKeyPairs( - &addedAppIdKeyPairs, - &removedAppIdKeyPairs, - appKeys, - appKeysLock, - *storageSp.get(), - appIdKeyPairs, - bsl::vector(), - true); // isCSLMode - } - else { - hasUpdate = loadUpdatedAppIdKeyPairs( - &addedAppIdKeyPairs, - &removedAppIdKeyPairs, - appKeys, - appKeysLock, - *storageSp.get(), - AppIdKeyPairs(), - queueMode.fanout().appIDs(), - false); // isCSLMode - } + bool hasUpdate = loadUpdatedAppInfos(&addedAppInfos, + &removedAppInfos, + *storageSp.get(), + appIdKeyPairs); if (!hasUpdate) { // No update needed for AppId/Key pairs. return; // RETURN @@ -2490,8 +2392,8 @@ void StorageUtil::registerQueue( appKeysLock, clusterDescription, partitionId, - addedAppIdKeyPairs, - removedAppIdKeyPairs, + addedAppInfos, + removedAppInfos, domain->config().mode().isFanoutValue(), cluster->isCSLModeEnabled())); @@ -2501,7 +2403,7 @@ void StorageUtil::registerQueue( // Wait for 'updateQueuePrimaryDispatched' operation to complete. // We need to wait because 'updateQueuePrimaryDispatched' creates - // virtual storages corresponding to 'addedAppIdKeyPairs' (if any), + // virtual storages corresponding to 'addedAppInfos' (if any), // and the caller of 'registerQueue' expects these virtual storages // to be created this routine or its caller returns. Before // waiting, release the 'storagesLock' guard and unlock it to avoid @@ -2535,10 +2437,10 @@ void StorageUtil::registerQueue( bmqu::MemOutStream errorDesc; int rc = 0; - AppIdKeyPairs appIdKeyPairsToUse; + AppInfos appIdKeyPairsToUse; if (queueMode.isFanoutValue()) { - if (cluster->isCSLModeEnabled()) { - for (AppIdKeyPairsCIter citer = appIdKeyPairs.begin(); + if (cluster->isCSLModeEnabled() || !appIdKeyPairs.empty()) { + for (AppInfosCIter citer = appIdKeyPairs.begin(); citer != appIdKeyPairs.end(); ++citer) { rc = storageSp->addVirtualStorage(errorDesc, @@ -2566,7 +2468,7 @@ void StorageUtil::registerQueue( rc = storageSp->addVirtualStorage(errorDesc, *citer, appKey); - appIdKeyPairsToUse.push_back(bsl::make_pair(*citer, appKey)); + appIdKeyPairsToUse.emplace(bsl::make_pair(*citer, appKey)); } } } @@ -2762,8 +2664,8 @@ int StorageUtil::updateQueuePrimary(StorageSpMap* storageMap, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, bool isCSLMode) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -2783,8 +2685,8 @@ int StorageUtil::updateQueuePrimary(StorageSpMap* storageMap, StorageSpMapIter it = storageMap->find(uri); if (storageMap->end() == it) { - bmqu::Printer printer1(&addedIdKeyPairs); - bmqu::Printer printer2(&removedIdKeyPairs); + bmqu::Printer printer1(&addedIdKeyPairs); + bmqu::Printer printer2(&removedIdKeyPairs); BALL_LOG_ERROR << clusterDescription << " Partition [" << partitionId << "]: Error when updating queue '" << uri << "' with addedAppIds: [" << printer1 @@ -3134,7 +3036,7 @@ void StorageUtil::updateQueueReplicaDispatched( int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isCSLMode, mqbi::Domain* domain, bool allowDuplicate) @@ -3166,7 +3068,7 @@ void StorageUtil::updateQueueReplicaDispatched( if (it == storageMap->end()) { // Cluster state and/or partition are out of sync at this replica. - bmqu::Printer printer(&appIdKeyPairs); + bmqu::Printer printer(&appIdKeyPairs); BMQTSK_ALARMLOG_ALARM("REPLICATION") << "At partition [" << partitionId << "], failure while registering appIds [" << printer @@ -3198,7 +3100,7 @@ void StorageUtil::updateQueueReplicaDispatched( isCSLMode); if (rc != 0) { if (!allowDuplicate) { - bmqu::Printer printer(&appIdKeyPairs); + bmqu::Printer printer(&appIdKeyPairs); BMQTSK_ALARMLOG_ALARM("REPLICATION") << "At partition [" << partitionId << "], failure while registering appIds [" << printer @@ -3213,7 +3115,7 @@ void StorageUtil::updateQueueReplicaDispatched( return; // RETURN } - bmqu::Printer printer(&appIdKeyPairs); + bmqu::Printer printer(&appIdKeyPairs); BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] updated [" << uri << "], queueKey [" << queueKey << "] with the storage as replica: " << "addedIdKeyPairs:" diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index 8db21fcfb..3f6e6cfe4 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -101,9 +101,9 @@ struct StorageUtil { private: // TYPES - typedef mqbi::StorageManager::AppIdKeyPair AppIdKeyPair; - typedef mqbi::StorageManager::AppIdKeyPairs AppIdKeyPairs; - typedef mqbi::StorageManager::AppIdKeyPairsCIter AppIdKeyPairsCIter; + typedef mqbi::StorageManager::AppInfo AppInfo; + typedef mqbi::StorageManager::AppInfos AppInfos; + typedef mqbi::StorageManager::AppInfosCIter AppInfosCIter; typedef mqbi::StorageManager::AppIds AppIds; typedef mqbi::StorageManager::AppIdsIter AppIdsIter; @@ -177,38 +177,33 @@ struct StorageUtil { /// Load into the specified `result` the list of elements present in /// `baseSet` which are not present in `subtractionSet`. template - static void loadDifference(bsl::vector* result, - const bsl::vector& baseSet, - const bsl::vector& subtractionSet); + static void loadDifference(bsl::unordered_set* result, + const bsl::unordered_set& baseSet, + const bsl::unordered_set& subtractionSet); - /// Load into the specified `addedAppIdKeyPairs` and - /// `removedAppIdKeyPairs` the appId/key pairs which have been added and + /// Load into the specified `addedAppInfos` and + /// `removedAppInfos` the appId/key pairs which have been added and /// removed respectively for the specified `storage` based on the - /// specified `newAppIdKeyPairs` or `cfgAppIds`, as well as the + /// specified `newAppInfos` or `cfgAppIds`, as well as the /// specified `isCSLMode` mode. If new app keys are generated, load /// them into the specified `appKeys`. If the optionally specified /// `appKeysLock` is provided, lock it. Return true if there are any /// added or removed appId/key pairs, false otherwise. /// /// THREAD: Executed by the cluster dispatcher thread. - static bool - loadUpdatedAppIdKeyPairs(AppIdKeyPairs* addedAppIdKeyPairs, - AppIdKeyPairs* removedAppIdKeyPairs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - const mqbs::ReplicatedStorage& storage, - const AppIdKeyPairs& newAppIdKeyPairs, - const bsl::vector& cfgAppIds, - bool isCSLMode); + static bool loadUpdatedAppInfos(AppInfos* addedAppInfos, + AppInfos* removedAppInfos, + const mqbs::ReplicatedStorage& storage, + const AppInfos& newAppInfos); /// THREAD: Executed by the Queue's dispatcher thread. static void registerQueueDispatched(const mqbi::Dispatcher::ProcessorHandle& processor, mqbs::FileStore* fs, mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs); + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs); /// THREAD: This method is called from the Queue's dispatcher thread. static void updateQueuePrimaryDispatched( @@ -220,8 +215,8 @@ struct StorageUtil { bslmt::Mutex* appKeysLock, const bsl::string& clusterDescription, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, bool isFanout, bool isCSLMode); @@ -233,18 +228,18 @@ struct StorageUtil { mqbs::FileStore* fs, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, - bool isFanout, - bool isCSLMode); + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode); static int addVirtualStoragesInternal(mqbs::ReplicatedStorage* storage, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, const bsl::string& clusterDescription, int partitionId, bool isFanout, @@ -387,10 +382,10 @@ struct StorageUtil { /// present in `existingEntries` but not in `newEntries`. template static void - loadAddedAndRemovedEntries(bsl::vector* addedEntries, - bsl::vector* removedEntries, - const bsl::vector& existingEntries, - const bsl::vector& newEntries); + loadAddedAndRemovedEntries(bsl::unordered_set* addedEntries, + bsl::unordered_set* removedEntries, + const bsl::unordered_set& existingEntries, + const bsl::unordered_set& newEntries); /// Return true if the queue having specified `uri` and assigned to the /// specified `partitionId` has no messages in the specified @@ -639,7 +634,7 @@ struct StorageUtil { const mqbu::StorageKey& queueKey, const bsl::string& clusterDescription, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain); /// THREAD: Executed by the Queue's dispatcher thread. @@ -671,8 +666,8 @@ struct StorageUtil { const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, bool isCSLMode); static void @@ -714,7 +709,7 @@ struct StorageUtil { int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& addedIdKeyPairs, + const AppInfos& addedIdKeyPairs, bool isCSLMode, mqbi::Domain* domain = 0, bool allowDuplicate = false); @@ -822,29 +817,28 @@ unsigned int StorageUtil::extractPartitionId(const bmqp::Event& event); // ------------------ template -void StorageUtil::loadDifference(bsl::vector* result, - const bsl::vector& baseSet, - const bsl::vector& subtractionSet) +void StorageUtil::loadDifference(bsl::unordered_set* result, + const bsl::unordered_set& baseSet, + const bsl::unordered_set& subtractionSet) { // PRECONDITIONS BSLS_ASSERT_SAFE(result); - typedef typename bsl::vector::const_iterator CIter; + typedef typename bsl::unordered_set::const_iterator CIter; - for (CIter it = baseSet.begin(); it != baseSet.end(); ++it) { - if (subtractionSet.end() == - bsl::find(subtractionSet.begin(), subtractionSet.end(), *it)) { - result->push_back(*it); + for (CIter it = baseSet.cbegin(); it != baseSet.cend(); ++it) { + if (subtractionSet.end() == subtractionSet.find(*it)) { + result->emplace(*it); } } } template void StorageUtil::loadAddedAndRemovedEntries( - bsl::vector* addedEntries, - bsl::vector* removedEntries, - const bsl::vector& existingEntries, - const bsl::vector& newEntries) + bsl::unordered_set* addedEntries, + bsl::unordered_set* removedEntries, + const bsl::unordered_set& existingEntries, + const bsl::unordered_set& newEntries) { // PRECONDITIONS BSLS_ASSERT_SAFE(addedEntries); diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index 886589775..fa754df9c 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -102,9 +102,9 @@ class ClusterStateManager { AfterPartitionPrimaryAssignmentCb; /// Pair of (appId, appKey) - typedef bsl::pair AppIdInfo; - typedef bsl::unordered_set AppIdInfos; - typedef AppIdInfos::const_iterator AppIdInfosCIter; + typedef bsl::pair AppInfo; + typedef bsl::unordered_set AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; struct QueueAssignmentResult { enum Enum { @@ -226,7 +226,7 @@ class ClusterStateManager { virtual void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) = 0; /// Unassign the queue in the specified `advisory` by applying the diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.cpp b/src/groups/mqb/mqbi/mqbi_queueengine.cpp index 582956c6b..b26c4a293 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.cpp +++ b/src/groups/mqb/mqbi/mqbi_queueengine.cpp @@ -36,13 +36,13 @@ QueueEngine::~QueueEngine() } void QueueEngine::afterAppIdRegistered( - BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfo& appIdKeyPair) { // NOTHING } void QueueEngine::afterAppIdUnregistered( - BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfo& appIdKeyPair) { // NOTHING } diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.h b/src/groups/mqb/mqbi/mqbi_queueengine.h index cf6f27195..14a77e13a 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.h +++ b/src/groups/mqb/mqbi/mqbi_queueengine.h @@ -198,14 +198,14 @@ class QueueEngine { /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void - afterAppIdRegistered(const mqbi::Storage::AppIdKeyPair& appIdKeyPair); + afterAppIdRegistered(const mqbi::Storage::AppInfo& appIdKeyPair); /// Called after the specified `appIdKeyPair` has been dynamically /// unregistered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void - afterAppIdUnregistered(const mqbi::Storage::AppIdKeyPair& appIdKeyPair); + afterAppIdUnregistered(const mqbi::Storage::AppInfo& appIdKeyPair); /// Called after creation of a new storage for the specified /// `appIdKeyPair`. diff --git a/src/groups/mqb/mqbi/mqbi_storage.h b/src/groups/mqb/mqbi/mqbi_storage.h index b05a42aa5..08b95c609 100644 --- a/src/groups/mqb/mqbi/mqbi_storage.h +++ b/src/groups/mqb/mqbi/mqbi_storage.h @@ -375,12 +375,12 @@ class Storage { public: // PUBLIC TYPES - /// `AppIdKeyPair` is an alias for an (appId, appKey) pairing + /// `AppInfo` is an alias for an (appId, appKey) pairing /// representing unique virtual storage identification. - typedef bsl::pair AppIdKeyPair; + typedef bsl::pair AppInfo; - /// `AppIdKeyPairs` is an alias for a list of pairs of appId and appKey - typedef bsl::vector AppIdKeyPairs; + /// `AppInfos` is an alias for a set of pairs of appId and appKey + typedef bsl::unordered_set AppInfos; typedef bmqc::Array @@ -648,7 +648,7 @@ class Storage { /// Load into the specified `buffer` the list of pairs of appId and /// appKey for all the virtual storages registered with this instance. - virtual void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const = 0; + virtual void loadVirtualStorageDetails(AppInfos* buffer) const = 0; /// Return the number of auto confirmed Apps for the current message. virtual unsigned int numAutoConfirms() const = 0; diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index a157f66dc..e0397e582 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -168,9 +168,9 @@ class StorageManagerIterator { class StorageManager : public mqbi::AppKeyGenerator { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; - typedef AppIdKeyPairs::const_iterator AppIdKeyPairsCIter; + typedef mqbi::Storage::AppInfo AppInfo; + typedef mqbi::Storage::AppInfos AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; typedef bsl::unordered_set AppIds; typedef AppIds::iterator AppIdsIter; @@ -237,7 +237,7 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual void registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) = 0; /// Synchronously unregister the queue with the specified `uri` from the @@ -259,8 +259,8 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) = 0; + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) = 0; virtual void registerQueueReplica(int partitionId, const bmqt::Uri& uri, @@ -276,7 +276,7 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual void updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) = 0; diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp index 3264e0110..34ddd037d 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp @@ -60,7 +60,7 @@ void StorageManager::registerQueue( BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey, BSLS_ANNOTATION_UNUSED int partitionId, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& appIdKeyPairs, + BSLS_ANNOTATION_UNUSED const AppInfos& appIdKeyPairs, BSLS_ANNOTATION_UNUSED mqbi::Domain* domain) { // NOTHING @@ -77,8 +77,8 @@ int StorageManager::updateQueuePrimary( BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey, BSLS_ANNOTATION_UNUSED int partitionId, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& addedIdKeyPairs, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& removedIdKeyPairs) + BSLS_ANNOTATION_UNUSED const AppInfos& addedIdKeyPairs, + BSLS_ANNOTATION_UNUSED const AppInfos& removedIdKeyPairs) { return 0; } @@ -106,7 +106,7 @@ void StorageManager::updateQueueReplica( BSLS_ANNOTATION_UNUSED int partitionId, BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& appIdKeyPairs, + BSLS_ANNOTATION_UNUSED const AppInfos& appIdKeyPairs, BSLS_ANNOTATION_UNUSED mqbi::Domain* domain, BSLS_ANNOTATION_UNUSED bool allowDuplicate) { diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h index 028e17a3f..77e032778 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h @@ -80,7 +80,7 @@ class StorageManager : public mqbi::StorageManager { virtual void registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; /// Synchronously unregister the queue with the specified `uri` from the @@ -103,8 +103,8 @@ class StorageManager : public mqbi::StorageManager { virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE; virtual void @@ -124,7 +124,7 @@ class StorageManager : public mqbi::StorageManager { updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 7130a2fd0..67e7845a7 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -271,9 +271,9 @@ struct DataStoreRecordKeyLess { class DataStoreConfigQueueInfo { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; private: // DATA @@ -281,7 +281,7 @@ class DataStoreConfigQueueInfo { int d_partitionId; - AppIdKeyPairs d_appIdKeyPairs; + AppInfos d_appIdKeyPairs; public: // TRAITS @@ -299,14 +299,14 @@ class DataStoreConfigQueueInfo { void setPartitionId(int value); - void addAppIdKeyPair(const AppIdKeyPair& value); + void addAppInfo(const AppInfo& value); // ACCESSORS const bsl::string& canonicalQueueUri() const; int partitionId() const; - const AppIdKeyPairs& appIdKeyPairs() const; + const AppInfos& appIdKeyPairs() const; }; // ===================== @@ -335,15 +335,15 @@ class DataStoreConfig { typedef Records::const_iterator RecordConstIterator; - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef bsl::function QueueCreationCb; @@ -541,9 +541,9 @@ class DataStore : public mqbi::DispatcherClient { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef DataStoreConfig::QueueKeyInfoMap QueueKeyInfoMap; @@ -596,7 +596,7 @@ class DataStore : public mqbi::DispatcherClient { virtual int writeQueueCreationRecord(DataStoreRecordHandle* handle, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bsls::Types::Uint64 timestamp, bool isNewQueue) = 0; @@ -889,10 +889,9 @@ inline void DataStoreConfigQueueInfo::setPartitionId(int value) d_partitionId = value; } -inline void -DataStoreConfigQueueInfo::addAppIdKeyPair(const AppIdKeyPair& value) +inline void DataStoreConfigQueueInfo::addAppInfo(const AppInfo& value) { - d_appIdKeyPairs.push_back(value); + d_appIdKeyPairs.insert(value); } // ACCESSORS @@ -906,7 +905,7 @@ inline int DataStoreConfigQueueInfo::partitionId() const return d_partitionId; } -inline const DataStoreConfigQueueInfo::AppIdKeyPairs& +inline const DataStoreConfigQueueInfo::AppInfos& DataStoreConfigQueueInfo::appIdKeyPairs() const { return d_appIdKeyPairs; diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index 1903536f3..091f4cfd2 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -131,9 +131,9 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef ReplicatedStorage::RecordHandles RecordHandles; @@ -334,8 +334,8 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// Load into the specified 'buffer' the list of pairs of appId and appKey // for all the virtual storages registered with this instance. - virtual void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const - BSLS_KEYWORD_OVERRIDE; + virtual void + loadVirtualStorageDetails(AppInfos* buffer) const BSLS_KEYWORD_OVERRIDE; /// Store in the specified 'msgSize' the size, in bytes, of the message /// having the specified 'msgGUID' if found and return success, or return @@ -720,7 +720,7 @@ inline bool FileBackedStorage::hasVirtualStorage(const bsl::string& appId, } inline void -FileBackedStorage::loadVirtualStorageDetails(AppIdKeyPairs* buffer) const +FileBackedStorage::loadVirtualStorageDetails(AppInfos* buffer) const { return d_virtualStorageCatalog.loadVirtualStorageDetails(buffer); } diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp index 8a865520d..9c99911e0 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp @@ -342,7 +342,7 @@ class MockDataStore : public mqbs::DataStore { int writeQueueCreationRecord(mqbs::DataStoreRecordHandle*, const bmqt::Uri&, const mqbu::StorageKey&, - const AppIdKeyPairs&, + const AppInfos&, bsls::Types::Uint64, bool) BSLS_KEYWORD_OVERRIDE { diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 45d5b7b06..0b59ae95a 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -2019,13 +2019,15 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, FileStoreProtocol::k_HASH_LENGTH, appIdsAreaLen); - AppIdKeyPairs appIdKeyPairs; - FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - appIdsBlock, - numAppIds); - - for (size_t n = 0; n < appIdKeyPairs.size(); ++n) { - const AppIdKeyPair& p = appIdKeyPairs[n]; + AppInfos appIdKeyPairs; + FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + appIdsBlock, + numAppIds); + + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit) { + const AppInfo& p = *cit; if (0 == deletedAppKeysOffsets.count(p.second)) { // This appKey is not deleted. Add it to the list // of 'alive' appId/appKey pairs for this queue. @@ -2034,7 +2036,7 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, // StorageMgr because we have recovered all // appId/appKey pairs by that time. - qinfo.addAppIdKeyPair(p); + qinfo.addAppInfo(p); BALL_LOG_INFO << partitionDesc() @@ -4267,7 +4269,7 @@ int FileStore::writeQueueCreationRecord( } bmqt::Uri quri; - AppIdKeyPairs appIdKeyPairs; + AppInfos appIdKeyPairs; if (!d_isFSMWorkflow) { // Check qlist offset in the replicated journal record sent by the // primary vs qlist offset maintained by self. A mismatch means that @@ -4301,9 +4303,9 @@ int FileStore::writeQueueCreationRecord( queueRecHeaderLen + paddedUriLen + FileStoreProtocol::k_HASH_LENGTH, appIdsAreaSize); - FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - appIdsBlock, - queueRecHeader->numAppIds()); + FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + appIdsBlock, + queueRecHeader->numAppIds()); } BALL_LOG_INFO_BLOCK @@ -4316,10 +4318,11 @@ int FileStore::writeQueueCreationRecord( BALL_LOG_OUTPUT_STREAM << ", queue [" << quri << "]" << ", with [" << appIdKeyPairs.size() << "] appId/appKey pairs "; - for (size_t n = 0; n < appIdKeyPairs.size(); ++n) { - BALL_LOG_OUTPUT_STREAM << " [" << appIdKeyPairs[n].first - << ", " << appIdKeyPairs[n].second - << "]"; + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit) { + BALL_LOG_OUTPUT_STREAM << " [" << cit->first << ", " + << cit->second << "]"; } } } @@ -5593,7 +5596,7 @@ int FileStore::writeMessageRecord(mqbi::StorageMessageAttributes* attributes, int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bsls::Types::Uint64 timestamp, bool isNewQueue) { @@ -5654,9 +5657,11 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, totalLength = sizeof(QueueRecordHeader) + queueUri.asString().length() + queueUriPadding + FileStoreProtocol::k_HASH_LENGTH; - - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { - const AppIdKeyPair& appIdKeyPair = appIdKeyPairs[i]; + size_t i = 0; + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit, ++i) { + const AppInfo& appIdKeyPair = *cit; BSLS_ASSERT_SAFE(!appIdKeyPair.first.empty()); BSLS_ASSERT_SAFE(!appIdKeyPair.second.isNull()); appIdWords[i] = bmqp::ProtocolUtil::calcNumWordsAndPadding( @@ -5761,7 +5766,10 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, qlistFilePos += FileStoreProtocol::k_HASH_LENGTH; // 3) Append AppIds and AppKeys - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { + size_t i = 0; + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit, ++i) { // Append AppIdHeader. OffsetPtr appIdHeader(qlistFile.block(), @@ -5773,10 +5781,8 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, // Append AppId. OffsetPtr appId(qlistFile.block(), qlistFilePos); - bsl::memcpy(appId.get(), - appIdKeyPairs[i].first.c_str(), - appIdKeyPairs[i].first.length()); - qlistFilePos += appIdKeyPairs[i].first.length(); + bsl::memcpy(appId.get(), cit->first.c_str(), cit->first.length()); + qlistFilePos += cit->first.length(); // Append padding after AppId. @@ -5789,7 +5795,7 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, // above for explanation). char appIdHash[mqbs::FileStoreProtocol::k_HASH_LENGTH] = {0}; - bsl::memcpy(appIdHash, appIdKeyPairs[i].second.data(), k_KEY_LEN); + bsl::memcpy(appIdHash, cit->second.data(), k_KEY_LEN); OffsetPtr appHash(qlistFile.block(), qlistFilePos); bsl::memcpy(appHash.get(), appIdHash, diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index d2f1a2d23..1559d05d2 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -213,8 +213,8 @@ class FileStore : public DataStore { typedef DataStoreConfig::QueueKeyInfoMapConstIter QueueKeyInfoMapConstIter; typedef DataStoreConfig::QueueKeyInfoMapInsertRc QueueKeyInfoMapInsertRc; - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfo AppInfo; + typedef mqbi::Storage::AppInfos AppInfos; typedef StorageCollectionUtil::StoragesMap StoragesMap; typedef StorageCollectionUtil::StorageMapIter StorageMapIter; @@ -773,7 +773,7 @@ class FileStore : public DataStore { int writeQueueCreationRecord(DataStoreRecordHandle* handle, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bsls::Types::Uint64 timestamp, bool isNewQueue) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp index 6977a451b..0bddb7814 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp @@ -85,7 +85,7 @@ const int k_NODE_ID = 12345; // ALIASES typedef mqbs::FileStoreTestUtil_Record Record; -typedef mqbs::DataStore::AppIdKeyPairs AppIdKeyPairs; +typedef mqbs::DataStore::AppInfos AppInfos; typedef mqbs::FileStore::SyncPointOffsetPairs SyncPointOffsetPairs; typedef bsl::pair HandleRecordPair; @@ -368,7 +368,7 @@ struct Tester { bmqt::Uri(rec.d_uri, s_allocator_p), rec.d_queueKey, - AppIdKeyPairs(), + AppInfos(), rec.d_timestamp, true); // isNewQueue diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp index 9a5856ea9..c6824b675 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp @@ -330,10 +330,11 @@ int FileStoreProtocolUtil::calculateMd5Digest( return 0; } -void FileStoreProtocolUtil::loadAppIdKeyPairs( - bsl::vector >* appIdKeyPairs, - const MemoryBlock& appIdsBlock, - unsigned int numAppIds) +void FileStoreProtocolUtil::loadAppInfos( + bsl::unordered_set >* + appIdKeyPairs, + const MemoryBlock& appIdsBlock, + unsigned int numAppIds) { // PRECONDITIONS BSLS_ASSERT_SAFE(appIdKeyPairs); @@ -351,7 +352,7 @@ void FileStoreProtocolUtil::loadAppIdKeyPairs( const char* appIdBegin = appIdsBlock.base() + offset + sizeof(AppIdHeader); - appIdKeyPairs->emplace_back( + appIdKeyPairs->emplace( bsl::string(appIdBegin, paddedLen - appIdBegin[paddedLen - 1], appIdKeyPairs->get_allocator()), diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h index 9fbd417a0..d6789c841 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -107,10 +108,11 @@ struct FileStoreProtocolUtil { const bmqu::BlobPosition& startPos, unsigned int length); - static void loadAppIdKeyPairs( - bsl::vector >* appIdKeyPairs, - const MemoryBlock& appIdsBlock, - unsigned int numAppIds); + static void + loadAppInfos(bsl::unordered_set >* + appIdKeyPairs, + const MemoryBlock& appIdsBlock, + unsigned int numAppIds); }; } // close package namespace diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp index 5948826f7..3eb2b8072 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp @@ -495,25 +495,25 @@ static void test3_lastJournalSyncPoint() } } -static void test4_loadAppIdKeyPairs() +static void test4_loadAppInfos() // ------------------------------------------------------------------------ // Testing: -// loadAppIdKeyPairs() +// loadAppInfos() // ------------------------------------------------------------------------ { - typedef bsl::pair AppIdKeyPair; - typedef bsl::vector AppIdKeyPairs; + typedef bsl::pair AppInfo; + typedef bsl::unordered_set AppInfos; { // No appIds. char* p = static_cast(s_allocator_p->allocate(1)); mqbs::MemoryBlock mb(p, 1); - AppIdKeyPairs appIdKeyPairs(s_allocator_p); + AppInfos appIdKeyPairs(s_allocator_p); - mqbs::FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - mb, - 0); // no appIds + mqbs::FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + mb, + 0); // no appIds ASSERT_EQ(0u, appIdKeyPairs.size()); @@ -563,15 +563,15 @@ static void test4_loadAppIdKeyPairs() // Test. mqbs::MemoryBlock mb(p, totalSize); - AppIdKeyPairs appIdKeyPairs(s_allocator_p); + AppInfos appIdKeyPairs(s_allocator_p); - mqbs::FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - mb, - 1); // 1 appId + mqbs::FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + mb, + 1); // 1 appId ASSERT_EQ(1U, appIdKeyPairs.size()); - ASSERT_EQ(appId, appIdKeyPairs[0].first); - ASSERT_EQ(appKey, appIdKeyPairs[0].second); + ASSERT_EQ(appId, appIdKeyPairs.begin()->first); + ASSERT_EQ(appKey, appIdKeyPairs.begin()->second); s_allocator_p->deallocate(p); } @@ -601,10 +601,9 @@ static void test4_loadAppIdKeyPairs() mqbs::FileStoreProtocol::k_HASH_LENGTH; } - char* p = static_cast(s_allocator_p->allocate(totalSize)); - size_t offset = 0; - bsl::vector expectedAppIds(s_allocator_p); - bsl::vector expectedAppKeys(s_allocator_p); + char* p = static_cast(s_allocator_p->allocate(totalSize)); + size_t offset = 0; + AppInfos expectedAppInfos(s_allocator_p); for (int n = 0; n < numAppIds; ++n) { // Append AppIdHeader. @@ -624,7 +623,6 @@ static void test4_loadAppIdKeyPairs() s_allocator_p); bsl::memcpy(p + offset, appId.c_str(), appIdLenVec[n]); offset += appIdLenVec[n]; - expectedAppIds.push_back(appId); // Append AppId padding. bmqp::ProtocolUtil::appendPaddingRaw(p + offset, @@ -646,24 +644,21 @@ static void test4_loadAppIdKeyPairs() bsl::memcpy(p + offset, appHash, mqbs::FileStoreProtocol::k_HASH_LENGTH); - expectedAppKeys.push_back(appKey); + + expectedAppInfos.emplace( + AppInfo(bsl::string(appId, s_allocator_p), appKey)); offset += mqbs::FileStoreProtocol::k_HASH_LENGTH; } - // Test. mqbs::MemoryBlock mb(p, totalSize); - AppIdKeyPairs appIdKeyPairs(s_allocator_p); + AppInfos appIdKeyPairs(s_allocator_p); - mqbs::FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - mb, - numAppIds); + mqbs::FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + mb, + numAppIds); ASSERT_EQ(static_cast(numAppIds), appIdKeyPairs.size()); - - for (size_t n = 0; n < appIdKeyPairs.size(); ++n) { - ASSERT_EQ_D(n, expectedAppIds[n], appIdKeyPairs[n].first); - ASSERT_EQ_D(n, expectedAppKeys[n], appIdKeyPairs[n].second); - } + ASSERT_EQ(appIdKeyPairs, expectedAppInfos); s_allocator_p->deallocate(p); } @@ -911,7 +906,7 @@ int main(int argc, char* argv[]) switch (_testCase) { case 0: case 5: test5_calculateMd5Digest(); break; - case 4: test4_loadAppIdKeyPairs(); break; + case 4: test4_loadAppInfos(); break; case 3: test3_lastJournalSyncPoint(); break; case 2: test2_lastJournalRecord(); break; case 1: test1_hasBmqHeader(); break; diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index caf70ae0f..4f4de320d 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -167,9 +167,9 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef ReplicatedStorage::RecordHandles RecordHandles; @@ -515,8 +515,8 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// Load into the specified 'buffer' the list of pairs of appId and /// appKey for all the virtual storages registered with this instance. - virtual void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const - BSLS_KEYWORD_OVERRIDE; + virtual void + loadVirtualStorageDetails(AppInfos* buffer) const BSLS_KEYWORD_OVERRIDE; virtual unsigned int numAutoConfirms() const BSLS_KEYWORD_OVERRIDE; @@ -766,8 +766,7 @@ inline bool InMemoryStorage::hasReceipt(const bmqt::MessageGUID&) const return true; } -inline void -InMemoryStorage::loadVirtualStorageDetails(AppIdKeyPairs* buffer) const +inline void InMemoryStorage::loadVirtualStorageDetails(AppInfos* buffer) const { return d_virtualStorageCatalog.loadVirtualStorageDetails(buffer); diff --git a/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp b/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp index 53519711a..98ed74991 100644 --- a/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp @@ -174,11 +174,11 @@ void StoragePrintUtil::printRecoveredStorages( out << " ["; } - AppIdKeyPairs appIdKeyPairs; + AppInfos appIdKeyPairs; rs->loadVirtualStorageDetails(&appIdKeyPairs); BSLS_ASSERT_SAFE(numVS == appIdKeyPairs.size()); - for (AppIdKeyPairsCIter vit = appIdKeyPairs.begin(); + for (AppInfosCIter vit = appIdKeyPairs.begin(); vit != appIdKeyPairs.end(); ++vit) { BSLS_ASSERT_SAFE(rs->hasVirtualStorage(vit->second)); diff --git a/src/groups/mqb/mqbs/mqbs_storageprintutil.h b/src/groups/mqb/mqbs/mqbs_storageprintutil.h index 0a20a02b5..6c420dba1 100644 --- a/src/groups/mqb/mqbs/mqbs_storageprintutil.h +++ b/src/groups/mqb/mqbs/mqbs_storageprintutil.h @@ -66,8 +66,8 @@ namespace mqbs { struct StoragePrintUtil { private: // PRIVATE TYPES - typedef mqbi::StorageManager::AppIdKeyPairs AppIdKeyPairs; - typedef AppIdKeyPairs::const_iterator AppIdKeyPairsCIter; + typedef mqbi::StorageManager::AppInfos AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; public: // TYPES diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp index 6ff76dc84..8cd24b5bf 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp @@ -565,8 +565,7 @@ bool VirtualStorageCatalog::hasVirtualStorage(const bsl::string& appId, return hasVs; } -void VirtualStorageCatalog::loadVirtualStorageDetails( - AppIdKeyPairs* buffer) const +void VirtualStorageCatalog::loadVirtualStorageDetails(AppInfos* buffer) const { // PRECONDITIONS BSLS_ASSERT_SAFE(buffer); @@ -575,7 +574,7 @@ void VirtualStorageCatalog::loadVirtualStorageDetails( cit != d_virtualStorages.end(); ++cit) { BSLS_ASSERT_SAFE(cit->key2() == cit->value()->appKey()); - buffer->push_back(bsl::make_pair(cit->key1(), cit->key2())); + buffer->emplace(bsl::make_pair(cit->key1(), cit->key2())); } } diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h index 767887890..94f89762f 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h @@ -75,9 +75,9 @@ class VirtualStorageCatalog { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef unsigned int Ordinal; @@ -277,7 +277,7 @@ class VirtualStorageCatalog { /// Load into the specified 'buffer' the list of pairs of appId and /// appKey for all the virtual storages registered with this instance. - void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const; + void loadVirtualStorageDetails(AppInfos* buffer) const; /// Return the number of messages in the virtual storage associated with /// the specified 'appKey'. Behavior is undefined unless a virtual diff --git a/src/integration-tests/test_restart.py b/src/integration-tests/test_restart.py index 064f0c076..102805e68 100644 --- a/src/integration-tests/test_restart.py +++ b/src/integration-tests/test_restart.py @@ -141,6 +141,18 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster): ensureMessageAtStorageLayer(cluster) + # Consumer for fanout queue + consumer_foo = next(proxies).create_client("consumer_foo") + consumer_foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True) + consumer_foo.wait_push_event() + assert wait_until( + lambda: len(consumer_foo.list(tc.URI_FANOUT_FOO, block=True)) == 1, 2 + ) + + # Save one confirm to the storage + consumer_foo.confirm(tc.URI_FANOUT_FOO, "+1", succeed=True) + consumer_foo.close(tc.URI_FANOUT_FOO, succeed=True) + cluster.stop_nodes() # Reconfigure the cluster from non-FSM to FSM mode @@ -166,10 +178,17 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster): consumer.wait_push_event() assert wait_until(lambda: len(consumer.list(tc.URI_PRIORITY, block=True)) == 2, 2) - # Consumer for fanout queue - consumer_fanout = next(proxies).create_client("consumer_fanout") - consumer_fanout.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True) - consumer_fanout.wait_push_event() + # Consumers for fanout queue + consumer_bar = next(proxies).create_client("consumer_bar") + consumer_bar.open(tc.URI_FANOUT_BAR, flags=["read"], succeed=True) + consumer_bar.wait_push_event() + assert wait_until( + lambda: len(consumer_bar.list(tc.URI_FANOUT_BAR, block=True)) == 2, 2 + ) + + # make sure the previously saved confirm is not lost + consumer_foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True) + consumer_foo.wait_push_event() assert wait_until( - lambda: len(consumer_fanout.list(tc.URI_FANOUT_FOO, block=True)) == 2, 2 + lambda: len(consumer_foo.list(tc.URI_FANOUT_FOO, block=True)) == 1, 2 )