diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 56dfde63f..0fca95d49 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -2228,10 +2228,8 @@ void Cluster::onRecoveryStatusDispatched( BSLS_ASSERT_SAFE(itMp->storage()->partitionId() == static_cast(pid)); if (isCSLModeEnabled()) { - AppIdKeyPairs appIdKeyPairs; - itMp->storage()->loadVirtualStorageDetails(&appIdKeyPairs); - AppIdInfos appIdInfos(appIdKeyPairs.cbegin(), - appIdKeyPairs.cend()); + AppInfos appIdInfos; + itMp->storage()->loadVirtualStorageDetails(&appIdInfos); d_clusterOrchestrator.registerQueueInfo( uri, @@ -2245,7 +2243,7 @@ void Cluster::onRecoveryStatusDispatched( uri, pid, itMp->storage()->queueKey(), - AppIdInfos(), + AppInfos(), false); // Force-update? } @@ -2862,18 +2860,22 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain, } // Compute list of added and removed App IDs. - bsl::vector oldCfgAppIds(oldDefn.mode().fanout().appIDs(), - d_allocator_p); - bsl::vector newCfgAppIds(newDefn.mode().fanout().appIDs(), - d_allocator_p); - - bsl::vector addedIds, removedIds; + bsl::unordered_set oldCfgAppIds( + oldDefn.mode().fanout().appIDs().cbegin(), + oldDefn.mode().fanout().appIDs().cend(), + d_allocator_p); + bsl::unordered_set newCfgAppIds( + newDefn.mode().fanout().appIDs().cbegin(), + newDefn.mode().fanout().appIDs().cend(), + d_allocator_p); + + bsl::unordered_set addedIds, removedIds; mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds, &removedIds, oldCfgAppIds, newCfgAppIds); - bsl::vector::const_iterator it = addedIds.begin(); + bsl::unordered_set::const_iterator it = addedIds.cbegin(); for (; it != addedIds.cend(); ++it) { dispatcher()->execute( bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppId, diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 1609461d1..2a830f3d7 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -138,11 +138,9 @@ class Cluster : public mqbi::Cluster, private: // PRIVATE TYPES - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; - typedef mqbc::ClusterStatePartitionInfo ClusterStatePartitionInfo; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; typedef mqbc::ClusterMembership::ClusterNodeSessionSp ClusterNodeSessionSp; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index d95843a12..54cf715ff 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -287,7 +287,7 @@ void ClusterOrchestrator::processBufferedQueueAdvisories() void ClusterOrchestrator::registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) { // executed by the *DISPATCHER* thread diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h index c649b62a2..1194a28d3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h @@ -110,7 +110,7 @@ class ClusterOrchestrator { typedef bdlmt::EventScheduler::RecurringEventHandle RecurringEventHandle; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; private: // DATA @@ -516,7 +516,7 @@ class ClusterOrchestrator { void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate); /// Executed by any thread. diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 761f9d37d..1091b6910 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -140,8 +140,8 @@ void createQueueUriKey(bmqt::Uri* out, } void afterAppIdRegisteredDispatched( - mqbi::Queue* queue, - const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo) + mqbi::Queue* queue, + const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo) { // executed by the *QUEUE DISPATCHER* thread @@ -149,12 +149,12 @@ void afterAppIdRegisteredDispatched( BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); queue->queueEngine()->afterAppIdRegistered( - mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second)); + mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second)); } void afterAppIdUnregisteredDispatched( - mqbi::Queue* queue, - const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo) + mqbi::Queue* queue, + const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo) { // executed by the *QUEUE DISPATCHER* thread @@ -162,7 +162,7 @@ void afterAppIdUnregisteredDispatched( BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); queue->queueEngine()->afterAppIdUnregistered( - mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second)); + mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second)); } void handleHolderDummy(const bsl::shared_ptr& handle) @@ -2153,27 +2153,18 @@ bsl::shared_ptr ClusterQueueHelper::createQueueFactory( // queue but the queue is never opened, it will not be registered with // the StorageMgr. This is ok. - if (d_cluster_p->isCSLModeEnabled()) { - const AppIdInfos& appIdInfos = - context.d_queueContext_p->d_stateQInfo_sp->appIdInfos(); - const mqbi::Storage::AppIdKeyPairs appIdKeyPairs( - appIdInfos.cbegin(), - appIdInfos.cend()); - d_storageManager_p->registerQueue( - context.d_queueContext_p->uri(), - context.d_queueContext_p->key(), - context.d_queueContext_p->partitionId(), - appIdKeyPairs, - context.d_domain_p); - } - else { - d_storageManager_p->registerQueue( - context.d_queueContext_p->uri(), - context.d_queueContext_p->key(), - context.d_queueContext_p->partitionId(), - mqbi::Storage::AppIdKeyPairs(), - context.d_domain_p); - } + // Use keys in the CSL instead of generating new ones to keep CSL and + // non-CSL consistent. + + const AppInfos& appIdInfos = + context.d_queueContext_p->d_stateQInfo_sp->appInfos(); + + d_storageManager_p->registerQueue( + context.d_queueContext_p->uri(), + context.d_queueContext_p->key(), + context.d_queueContext_p->partitionId(), + appIdInfos, + context.d_domain_p); // Queue must have been registered with storage manager before // registering it with the domain, otherwise Queue.configure() will @@ -3700,17 +3691,14 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId) // 'createQueueFactory'). if (d_cluster_p->isCSLModeEnabled()) { - const AppIdInfos& appIdInfos = - queueContext->d_stateQInfo_sp->appIdInfos(); - const mqbi::Storage::AppIdKeyPairs appIdKeyPairs( - appIdInfos.cbegin(), - appIdInfos.cend()); + const AppInfos& appIdInfos = + queueContext->d_stateQInfo_sp->appInfos(); d_storageManager_p->registerQueue( queueContext->uri(), queueContext->key(), queueContext->partitionId(), - appIdKeyPairs, + appIdInfos, qinfo.d_queue_sp->domain()); } else { @@ -3718,7 +3706,7 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId) queueContext->uri(), queueContext->key(), queueContext->partitionId(), - mqbi::Storage::AppIdKeyPairs(), + mqbi::Storage::AppInfos(), qinfo.d_queue_sp->domain()); } @@ -4110,6 +4098,7 @@ void ClusterQueueHelper::onQueueAssigned( BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); if (!d_cluster_p->isCSLModeEnabled()) { + // REVISIT return; // RETURN } @@ -4230,14 +4219,11 @@ void ClusterQueueHelper::onQueueAssigned( ->domain(), true); // allowDuplicate - const mqbi::Storage::AppIdKeyPairs appIdKeyPairs( - info.appIdInfos().cbegin(), - info.appIdInfos().cend()); d_storageManager_p->updateQueueReplica( info.partitionId(), info.uri(), info.key(), - appIdKeyPairs, + info.appInfos(), d_clusterState_p->domainStates() .at(info.uri().qualifiedDomain()) ->domain(), @@ -4396,8 +4382,8 @@ void ClusterQueueHelper::onQueueUnassigned( void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { // executed by the cluster *DISPATCHER* thread @@ -4425,19 +4411,21 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, const int partitionId = qiter->second->partitionId(); BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID); - for (AppIdInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); + for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); ++cit) { if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { // Note: In non-CSL mode, the queue creation callback is // invoked at replica nodes when they receive a queue creation // record from the primary in the partition stream. - mqbi::Storage::AppIdKeyPair appIdKeyPair(cit->first, cit->second); - mqbi::Storage::AppIdKeyPairs appIdKeyPairs(1, appIdKeyPair); + + mqbi::Storage::AppInfos one(1, d_allocator_p); + one.emplace(*cit); + d_storageManager_p->updateQueueReplica( partitionId, uri, qiter->second->key(), - appIdKeyPairs, + one, d_clusterState_p->domainStates() .at(uri.qualifiedDomain()) ->domain()); @@ -4451,7 +4439,7 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, } } - for (AppIdInfosCIter cit = removedAppIds.cbegin(); + for (AppInfosCIter cit = removedAppIds.cbegin(); cit != removedAppIds.cend(); ++cit) { if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { @@ -4472,8 +4460,8 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, } } - mwcu::Printer printer1(&addedAppIds); - mwcu::Printer printer2(&removedAppIds); + mwcu::Printer printer1(&addedAppIds); + mwcu::Printer printer2(&removedAppIds); BALL_LOG_INFO << d_cluster_p->description() << ": Updated queue: " << uri << ", addedAppIds: " << printer1 << ", removedAppIds: " << printer2; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index bbc42d362..353dfc39e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -421,9 +421,9 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, /// queue which have a proper valid unique queueId. typedef bsl::unordered_map QueueContextByIdMap; - typedef AppIdInfos::const_iterator AppIdInfosCIter; + typedef AppInfos::const_iterator AppInfosCIter; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; private: // DATA @@ -998,8 +998,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, /// dispatcher thread. virtual void onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()) BSLS_KEYWORD_OVERRIDE; private: diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp index e215d3ac7..f74da33ba 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp @@ -618,12 +618,12 @@ void ClusterStateManager::onLeaderSyncDataQueryResponse( const mqbu::StorageKey receivedKey( mqbu::StorageKey::BinaryRepresentation(), queueInfo.key().data()); - AppIdInfos appIdInfos; + AppInfos appIdInfos; for (bsl::vector::const_iterator cit = queueInfo.appIds().cbegin(); cit != queueInfo.appIds().cend(); ++cit) { - AppIdInfo appIdInfo; + AppInfo appIdInfo; appIdInfo.first = cit->appId(); appIdInfo.second.fromBinary(cit->appKey().data()); @@ -1181,7 +1181,7 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri, void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) { // executed by the *DISPATCHER* thread @@ -1695,7 +1695,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory( uri, queueKey, queueInfo.partitionId(), - AppIdInfos()); + AppInfos()); BSLS_ASSERT_SAFE(rc == false); } else { @@ -1732,7 +1732,7 @@ void ClusterStateManager::processQueueAssignmentAdvisory( d_state_p->assignQueue(uri, queueKey, queueInfo.partitionId(), - AppIdInfos()); + AppInfos()); d_state_p->domainStates() .at(uri.qualifiedDomain()) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h index 771a704de..5b2dcde56 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h @@ -114,8 +114,8 @@ class ClusterStateManager : public mqbc::ClusterStateObserver, typedef bsl::vector QueueAdvisories; - typedef mqbc::ClusterStateQueueInfo::AppIdInfo AppIdInfo; - typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef mqbc::ClusterStateQueueInfo::AppInfo AppInfo; + typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos; typedef mqbc::ClusterState::UriToQueueInfoMap UriToQueueInfoMap; typedef mqbc::ClusterState::UriToQueueInfoMapCIter UriToQueueInfoMapCIter; @@ -381,7 +381,7 @@ class ClusterStateManager : public mqbc::ClusterStateObserver, virtual void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) BSLS_KEYWORD_OVERRIDE; /// Unassign the queue in the specified `advisory` by applying the diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 74d0a7f4c..e7e943aef 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -78,7 +78,7 @@ void afterAppIdRegisteredDispatched(mqbi::Queue* queue, BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); queue->queueEngine()->afterAppIdRegistered( - mqbi::Storage::AppIdKeyPair(appId, mqbu::StorageKey())); + mqbi::Storage::AppInfo(appId, mqbu::StorageKey())); } void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, @@ -92,7 +92,7 @@ void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, // Note: Inputing nullKey here is okay since this routine will be removed // when we switch to CSL workflow. queue->queueEngine()->afterAppIdUnregistered( - mqbi::Storage::AppIdKeyPair(appId, mqbu::StorageKey())); + mqbi::Storage::AppInfo(appId, mqbu::StorageKey())); } /// Validates an application subscription. @@ -290,8 +290,8 @@ void Domain::onOpenQueueResponse( confirmationCookie)); } -void Domain::updateAuthorizedAppIds(const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) +void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { mqbconfm::QueueMode& queueMode = d_config.value().mode(); if (!queueMode.isFanoutValue()) { @@ -299,10 +299,7 @@ void Domain::updateAuthorizedAppIds(const AppIdInfos& addedAppIds, } bsl::vector& authorizedAppIds = queueMode.fanout().appIDs(); - const AppIdKeyPairs addedIdKeyPairs(addedAppIds.cbegin(), - addedAppIds.cend()); - for (AppIdKeyPairsCIter cit = addedIdKeyPairs.cbegin(); - cit != addedIdKeyPairs.cend(); + for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); ++cit) { if (bsl::find(authorizedAppIds.begin(), authorizedAppIds.end(), @@ -316,10 +313,8 @@ void Domain::updateAuthorizedAppIds(const AppIdInfos& addedAppIds, authorizedAppIds.push_back(cit->first); } - const AppIdKeyPairs removedIdKeyPairs(removedAppIds.cbegin(), - removedAppIds.cend()); - for (AppIdKeyPairsCIter cit = removedIdKeyPairs.cbegin(); - cit != removedIdKeyPairs.cend(); + for (AppInfosCIter cit = removedAppIds.cbegin(); + cit != removedAppIds.cend(); ++cit) { const bsl::vector::const_iterator it = bsl::find( authorizedAppIds.begin(), @@ -361,13 +356,13 @@ void Domain::onQueueAssigned(const mqbc::ClusterStateQueueInfo& info) return; // RETURN } - updateAuthorizedAppIds(info.appIdInfos()); + updateAuthorizedAppIds(info.appInfos()); } void Domain::onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { // executed by the associated CLUSTER's DISPATCHER thread @@ -501,14 +496,16 @@ int Domain::configure(bsl::ostream& errorDescription, if (!d_cluster_sp->isCSLModeEnabled() && d_config.value().mode().isFanoutValue()) { // Compute list of added and removed App IDs. - bsl::vector oldCfgAppIds( - oldConfig.value().mode().fanout().appIDs(), + bsl::unordered_set oldCfgAppIds( + oldConfig.value().mode().fanout().appIDs().cbegin(), + oldConfig.value().mode().fanout().appIDs().cend(), d_allocator_p); - bsl::vector newCfgAppIds( - d_config.value().mode().fanout().appIDs(), + bsl::unordered_set newCfgAppIds( + d_config.value().mode().fanout().appIDs().cbegin(), + d_config.value().mode().fanout().appIDs().cend(), d_allocator_p); - bsl::vector addedIds, removedIds; + bsl::unordered_set addedIds, removedIds; mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds, &removedIds, oldCfgAppIds, @@ -517,7 +514,8 @@ int Domain::configure(bsl::ostream& errorDescription, bslmt::LockGuard guard(&d_mutex); // Invoke callbacks for each added and removed ID on each queue. - bsl::vector::const_iterator it = addedIds.cbegin(); + bsl::unordered_set::const_iterator it = + addedIds.cbegin(); QueueMap::const_iterator qIt; for (; it != addedIds.cend(); it++) { for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) { diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index 255563656..8bf893969 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -104,8 +104,8 @@ class Domain : public mqbi::Domain, public mqbc::ClusterStateObserver { typedef QueueMap::iterator QueueMapIter; typedef QueueMap::const_iterator QueueMapCIter; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; - typedef AppIdKeyPairs::const_iterator AppIdKeyPairsCIter; + typedef mqbi::Storage::AppInfos AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; enum DomainState { e_STARTED = 0, e_STOPPING = 1, e_STOPPED = 2 }; @@ -199,9 +199,8 @@ class Domain : public mqbi::Domain, public mqbc::ClusterStateObserver { /// Update the list of authorized appIds by adding the specified /// `addedAppIds` and removing the specified `removedAppIds`. - void - updateAuthorizedAppIds(const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()); + void updateAuthorizedAppIds(const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()); // PRIVATE MANIPULATORS // (virtual: mqbc::ClusterStateObserver) @@ -223,8 +222,8 @@ class Domain : public mqbi::Domain, public mqbc::ClusterStateObserver { /// which case this queue update is ignored. virtual void onQueueUpdated(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()) BSLS_KEYWORD_OVERRIDE; private: diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp index 864c090fb..e474671c6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp @@ -225,7 +225,7 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const queueStorage.numBytes() = d_storage_mp->numBytes( mqbu::StorageKey::k_NULL_KEY); if (d_storage_mp->numVirtualStorages()) { - mqbi::Storage::AppIdKeyPairs appIdKeyPairs; + mqbi::Storage::AppInfos appIdKeyPairs; d_storage_mp->loadVirtualStorageDetails(&appIdKeyPairs); BSLS_ASSERT_SAFE( appIdKeyPairs.size() == @@ -234,8 +234,13 @@ void QueueState::loadInternals(mqbcmd::QueueState* out) const bsl::vector& virtualStorages = queueStorage.virtualStorages(); virtualStorages.resize(appIdKeyPairs.size()); - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { - const mqbi::Storage::AppIdKeyPair& p = appIdKeyPairs[i]; + + size_t i = 0; + for (mqbi::Storage::AppInfos::const_iterator cit = + appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit, ++i) { + const mqbi::Storage::AppInfo& p = *cit; virtualStorages[i].appId() = p.first; os.reset(); os << p.second; diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 7f09e90f0..55c2059a8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1590,7 +1590,7 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const mqbcmd::RelayQueueEngine& relayQueueEngine = out->makeRelay(); int numSubStreams = 0; - mqbi::Storage::AppIdKeyPairs appIdKeyPairs; + mqbi::Storage::AppInfos appIdKeyPairs; numSubStreams = storage()->numVirtualStorages(); storage()->loadVirtualStorageDetails(&appIdKeyPairs); @@ -1602,8 +1602,12 @@ void RelayQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const bsl::vector& subStreams = relayQueueEngine.subStreams(); subStreams.reserve(appIdKeyPairs.size()); - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { - const mqbi::Storage::AppIdKeyPair& p = appIdKeyPairs[i]; + + for (mqbi::Storage::AppInfos::const_iterator cit = + appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit) { + const mqbi::Storage::AppInfo& p = *cit; subStreams.resize(subStreams.size() + 1); mqbcmd::RelayQueueEngineSubStream& subStream = subStreams.back(); diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index afd8bea2c..19c9a5605 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -1822,7 +1822,7 @@ bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey, } void RootQueueEngine::afterAppIdRegistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + const mqbi::Storage::AppInfo& appIdKeyPair) { // executed by the *QUEUE DISPATCHER* thread @@ -1916,17 +1916,18 @@ void RootQueueEngine::afterAppIdRegistered( BSLS_ASSERT_SAFE(!key.isNull()); + mqbi::Storage::AppInfos one(1, d_allocator_p); + one.emplace(mqbi::Storage::AppInfo(appId, key)); d_queueState_p->storageManager()->updateQueuePrimary( d_queueState_p->uri(), d_queueState_p->key(), d_queueState_p->partitionId(), - mqbi::Storage::AppIdKeyPairs(1, - mqbi::Storage::AppIdKeyPair(appId, key)), - mqbi::Storage::AppIdKeyPairs()); + one, + mqbi::Storage::AppInfos()); } void RootQueueEngine::afterAppIdUnregistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + const mqbi::Storage::AppInfo& appIdKeyPair) { // executed by the *QUEUE DISPATCHER* thread @@ -1968,15 +1969,15 @@ void RootQueueEngine::afterAppIdUnregistered( << "]"; } } + mqbi::Storage::AppInfos one(1, d_allocator_p); + one.emplace(mqbi::Storage::AppInfo(appId, appKey)); d_queueState_p->storageManager()->updateQueuePrimary( d_queueState_p->uri(), d_queueState_p->key(), d_queueState_p->partitionId(), - mqbi::Storage::AppIdKeyPairs(), - mqbi::Storage::AppIdKeyPairs(1, - mqbi::Storage::AppIdKeyPair(appId, - appKey))); + mqbi::Storage::AppInfos(), + one); // No need to log in case of failure because 'updateQueuePrimary' does it // (even in case of success FTM). diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index 8ca5012a4..f13150de9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -401,14 +401,14 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void afterAppIdRegistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; + const mqbi::Storage::AppInfo& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; /// Called after the specified `appIdKeyPair` has been dynamically /// unregistered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void afterAppIdUnregistered( - const mqbi::Storage::AppIdKeyPair& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; + const mqbi::Storage::AppInfo& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; /// Called after creation of a new storage for the specified /// `appIdKeyPair`. diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 04454f9c9..6fc2e518b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -368,7 +368,7 @@ void StorageManager::queueCreationCb(int* status, int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isNewQueue) { // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' @@ -1050,7 +1050,7 @@ StorageManager::~StorageManager() void StorageManager::registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -1061,9 +1061,6 @@ void StorageManager::registerQueue(const bmqt::Uri& uri, BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); BSLS_ASSERT_SAFE(domain); - if (!d_cluster_p->isCSLModeEnabled()) { - BSLS_ASSERT_SAFE(appIdKeyPairs.empty()); - } mqbc::StorageUtil::registerQueue(d_cluster_p, d_dispatcher_p, @@ -1113,8 +1110,8 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) int StorageManager::updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -1230,7 +1227,7 @@ void StorageManager::unregisterQueueReplica(int partitionId, void StorageManager::updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain, bool allowDuplicate) { @@ -1411,7 +1408,7 @@ int StorageManager::start(bsl::ostream& errorDescription) bdlf::PlaceHolders::_2, // partitionId bdlf::PlaceHolders::_3, // QueueUri bdlf::PlaceHolders::_4, // QueueKey - bdlf::PlaceHolders::_5, // AppIdKeyPairs + bdlf::PlaceHolders::_5, // AppInfos bdlf::PlaceHolders::_6), // IsNewQueue) bdlf::BindUtil::bind(&StorageManager::queueDeletionCb, this, diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index a351b2f20..5390f1de4 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -399,7 +399,7 @@ class StorageManager : public mqbi::StorageManager { int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isNewQueue); void queueDeletionCb(int* status, @@ -520,7 +520,7 @@ class StorageManager : public mqbi::StorageManager { virtual void registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; /// Synchronously unregister the queue with the specified `uri` from the @@ -543,8 +543,8 @@ class StorageManager : public mqbi::StorageManager { virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE; virtual void @@ -564,7 +564,7 @@ class StorageManager : public mqbi::StorageManager { updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 23dee1d41..6ff5012c3 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -47,7 +47,7 @@ bsl::ostream& ClusterStateQueueInfo::print(bsl::ostream& stream, printer.printAttribute("uri", uri()); printer.printAttribute("queueKey", key()); printer.printAttribute("partitionId", partitionId()); - printer.printAttribute("appIdInfos", appIdInfos()); + printer.printAttribute("appIdInfos", appInfos()); printer.end(); return stream; @@ -88,8 +88,8 @@ void ClusterStateObserver::onQueueUnassigned( void ClusterStateObserver::onQueueUpdated( BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const bsl::string& domain, - BSLS_ANNOTATION_UNUSED const AppIdInfos& addedAppIds, - BSLS_ANNOTATION_UNUSED const AppIdInfos& removedAppIds) + BSLS_ANNOTATION_UNUSED const AppInfos& addedAppIds, + BSLS_ANNOTATION_UNUSED const AppInfos& removedAppIds) { // NOTHING } @@ -316,7 +316,7 @@ ClusterState& ClusterState::updatePartitionNumActiveQueues(int partitionId, bool ClusterState::assignQueue(const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos) + const AppInfos& appIdInfos) { // executed by the cluster *DISPATCHER* thread @@ -354,14 +354,14 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, updatePartitionQueueMapped(iter->second->partitionId(), -1); iter->second->setKey(key).setPartitionId(partitionId); - iter->second->appIdInfos() = appIdInfos; + iter->second->appInfos() = appIdInfos; iter->second->setPendingUnassignment(false); } } updatePartitionQueueMapped(partitionId, 1); - mwcu::Printer printer(&appIdInfos); + mwcu::Printer printer(&appIdInfos); BALL_LOG_INFO << "Cluster [" << d_cluster_p->name() << "]: " << "Assigning queue [" << uri << "], queueKey: [" << key << "] to Partition [" << partitionId @@ -454,8 +454,8 @@ void ClusterState::clearQueues() int ClusterState::updateQueue(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds) + const AppInfos& addedAppIds, + const AppInfos& removedAppIds) { // executed by the cluster *DISPATCHER* thread @@ -485,8 +485,8 @@ int ClusterState::updateQueue(const bmqt::Uri& uri, return rc_QUEUE_NOT_FOUND; // RETURN } - AppIdInfos& appIdInfos = iter->second->appIdInfos(); - for (AppIdInfosCIter citer = addedAppIds.cbegin(); + AppInfos& appIdInfos = iter->second->appInfos(); + for (AppInfosCIter citer = addedAppIds.cbegin(); citer != addedAppIds.cend(); ++citer) { if (!appIdInfos.insert(*citer).second) { @@ -494,18 +494,18 @@ int ClusterState::updateQueue(const bmqt::Uri& uri, } } - for (AppIdInfosCIter citer = removedAppIds.begin(); + for (AppInfosCIter citer = removedAppIds.begin(); citer != removedAppIds.end(); ++citer) { - const AppIdInfosCIter appIdInfoCIter = appIdInfos.find(*citer); + const AppInfosCIter appIdInfoCIter = appIdInfos.find(*citer); if (appIdInfoCIter == appIdInfos.cend()) { return rc_APPID_NOT_FOUND; // RETURN } appIdInfos.erase(appIdInfoCIter); } - mwcu::Printer printer1(&addedAppIds); - mwcu::Printer printer2(&removedAppIds); + mwcu::Printer printer1(&addedAppIds); + mwcu::Printer printer2(&removedAppIds); BALL_LOG_INFO << "Cluster [" << d_cluster_p->name() << "]: " << "Updating queue [" << uri << "], queueKey: [" << iter->second->key() << "], partitionId: [" @@ -516,8 +516,8 @@ int ClusterState::updateQueue(const bmqt::Uri& uri, else { // This update is for an entire domain, instead of any individual // queue. - mwcu::Printer printer1(&addedAppIds); - mwcu::Printer printer2(&removedAppIds); + mwcu::Printer printer1(&addedAppIds); + mwcu::Printer printer2(&removedAppIds); BALL_LOG_INFO << "Cluster [" << d_cluster_p->name() << "]: " << "Updating domain: [" << domain << "], addedAppIds: " << printer1 diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.h b/src/groups/mqb/mqbc/mqbc_clusterstate.h index 82b996a08..e1c4a928c 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.h @@ -161,9 +161,9 @@ class ClusterStatePartitionInfo { class ClusterStateQueueInfo { public: // TYPES - typedef mqbi::ClusterStateManager::AppIdInfo AppIdInfo; - typedef mqbi::ClusterStateManager::AppIdInfos AppIdInfos; - typedef mqbi::ClusterStateManager::AppIdInfosCIter AppIdInfosCIter; + typedef mqbi::ClusterStateManager::AppInfo AppInfo; + typedef mqbi::ClusterStateManager::AppInfos AppInfos; + typedef mqbi::ClusterStateManager::AppInfosCIter AppInfosCIter; private: // DATA @@ -178,7 +178,7 @@ class ClusterStateQueueInfo { // Assigned partitionId // (mqbs::DataStore::k_INVALID_PARTITION_ID if unassigned) - AppIdInfos d_appIdInfos; + AppInfos d_appInfos; // List of App id and key pairs // // TBD: Should also be added to mqbconfm::Domain @@ -210,7 +210,7 @@ class ClusterStateQueueInfo { ClusterStateQueueInfo(const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bslma::Allocator* allocator); // MANIPULATORS @@ -222,7 +222,7 @@ class ClusterStateQueueInfo { ClusterStateQueueInfo& setPendingUnassignment(bool value); /// Get a modifiable reference to this object's appIdInfos. - AppIdInfos& appIdInfos(); + AppInfos& appInfos(); /// Reset the `key`, `partitionId`, `appIdInfos` members of this object. /// Note that `uri` is left untouched because it is an invariant member @@ -233,7 +233,7 @@ class ClusterStateQueueInfo { const bmqt::Uri& uri() const; const mqbu::StorageKey& key() const; int partitionId() const; - const AppIdInfos& appIdInfos() const; + const AppInfos& appInfos() const; /// Return the value of the corresponding member of this object. bool pendingUnassignment() const; @@ -271,7 +271,7 @@ bsl::ostream& operator<<(bsl::ostream& stream, class ClusterStateObserver { public: // TYPES - typedef ClusterStateQueueInfo::AppIdInfos AppIdInfos; + typedef ClusterStateQueueInfo::AppInfos AppInfos; public: // CREATORS @@ -318,11 +318,10 @@ class ClusterStateObserver { /// /// THREAD: This method is invoked in the associated cluster's /// dispatcher thread. - virtual void - onQueueUpdated(const bmqt::Uri& uri, - const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()); + virtual void onQueueUpdated(const bmqt::Uri& uri, + const bsl::string& domain, + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()); /// Callback invoked when a partition with the specified `partitionId` /// has been orphan above a certain threshold amount of time. @@ -366,8 +365,8 @@ class ClusterState { public: // TYPES - typedef ClusterStateQueueInfo::AppIdInfos AppIdInfos; - typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter; + typedef ClusterStateQueueInfo::AppInfos AppInfos; + typedef ClusterStateQueueInfo::AppInfosCIter AppInfosCIter; typedef bsl::vector PartitionsInfo; @@ -567,7 +566,7 @@ class ClusterState { bool assignQueue(const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos); + const AppInfos& appIdInfos); /// Un-assign the queue with the specified `uri`. Return true if /// successful, or false if the queue does not exist. @@ -592,8 +591,8 @@ class ClusterState { /// cluster's dispatcher thread. int updateQueue(const bmqt::Uri& uri, const bsl::string& domain, - const AppIdInfos& addedAppIds, - const AppIdInfos& removedAppIds = AppIdInfos()); + const AppInfos& addedAppIds, + const AppInfos& removedAppIds = AppInfos()); /// Clear this cluster state object, without firing any observers. void clear(); @@ -767,7 +766,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( : d_uri(uri, allocator) , d_key() , d_partitionId(mqbs::DataStore::k_INVALID_PARTITION_ID) -, d_appIdInfos(allocator) +, d_appInfos(allocator) , d_pendingUnassignment(false) { // NOTHING @@ -777,12 +776,12 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( const bmqt::Uri& uri, const mqbu::StorageKey& key, int partitionId, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bslma::Allocator* allocator) : d_uri(uri, allocator) , d_key(key) , d_partitionId(partitionId) -, d_appIdInfos(appIdInfos, allocator) +, d_appInfos(appIdInfos, allocator) , d_pendingUnassignment(false) { // NOTHING @@ -809,9 +808,9 @@ ClusterStateQueueInfo::setPendingUnassignment(bool value) return *this; } -inline ClusterStateQueueInfo::AppIdInfos& ClusterStateQueueInfo::appIdInfos() +inline ClusterStateQueueInfo::AppInfos& ClusterStateQueueInfo::appInfos() { - return d_appIdInfos; + return d_appInfos; } inline void ClusterStateQueueInfo::reset() @@ -821,7 +820,7 @@ inline void ClusterStateQueueInfo::reset() d_key.reset(); d_partitionId = mqbs::DataStore::k_INVALID_PARTITION_ID; - d_appIdInfos.clear(); + d_appInfos.clear(); } // ACCESSORS @@ -840,10 +839,10 @@ inline int ClusterStateQueueInfo::partitionId() const return d_partitionId; } -inline const ClusterStateQueueInfo::AppIdInfos& -ClusterStateQueueInfo::appIdInfos() const +inline const ClusterStateQueueInfo::AppInfos& +ClusterStateQueueInfo::appInfos() const { - return d_appIdInfos; + return d_appInfos; } inline bool ClusterStateQueueInfo::pendingUnassignment() const diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp index 801b0ca1b..cac44011f 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp @@ -1513,7 +1513,7 @@ ClusterStateManager::assignQueue(const bmqt::Uri& uri, void ClusterStateManager::registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) { // executed by the *DISPATCHER* thread diff --git a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h index 4ab48ddd7..751dfdca9 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstatemanager.h @@ -111,7 +111,7 @@ class ClusterStateManager typedef ClusterFSM::ClusterFSMArgs ClusterFSMArgs; typedef ClusterFSM::ClusterFSMArgsSp ClusterFSMArgsSp; - typedef mqbi::ClusterStateManager::AppIdInfos AppIdInfos; + typedef mqbi::ClusterStateManager::AppInfos AppInfos; public: // TYPES @@ -484,7 +484,7 @@ class ClusterStateManager virtual void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) BSLS_KEYWORD_OVERRIDE; /// Unassign the queue in the specified `advisory` by applying the diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index cdf19c625..00a3d6b70 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -67,9 +67,9 @@ const char k_SELF_NODE_IS_STOPPING[] = "self node is stopping"; const char k_DOMAIN_CREATION_FAILURE[] = "failed to create domain"; // TYPES -typedef ClusterUtil::AppIdInfo AppIdInfo; -typedef ClusterUtil::AppIdInfos AppIdInfos; -typedef ClusterUtil::AppIdInfosCIter AppIdInfosCIter; +typedef ClusterUtil::AppInfo AppInfo; +typedef ClusterUtil::AppInfos AppInfos; +typedef ClusterUtil::AppInfosCIter AppInfosCIter; typedef ClusterUtil::ClusterNodeSessionMapConstIter ClusterNodeSessionMapConstIter; @@ -111,18 +111,19 @@ void applyQueueAssignment(mqbc::ClusterState* clusterState, const bsl::vector& appIds = queueInfo.appIds(); - AppIdInfos addedAppIds; + AppInfos addedAppIds; for (bsl::vector::const_iterator citer = appIds.cbegin(); citer != appIds.cend(); ++citer) { - AppIdInfo appIdInfo; + AppInfo appIdInfo; appIdInfo.first = citer->appId(); appIdInfo.second.fromBinary(citer->appKey().data()); addedAppIds.insert(appIdInfo); } + // CSL commit clusterState->assignQueue(uri, queueKey, partitionId, addedAppIds); } } @@ -221,24 +222,24 @@ void applyQueueUpdate(mqbc::ClusterState* clusterState, mqbs::DataStore::k_INVALID_PARTITION_ID); } - AppIdInfos addedAppIds; + AppInfos addedAppIds; for (bsl::vector::const_iterator citer = queueUpdate.addedAppIds().cbegin(); citer != queueUpdate.addedAppIds().cend(); ++citer) { - AppIdInfo appIdInfo; + AppInfo appIdInfo; appIdInfo.first = citer->appId(); appIdInfo.second.fromBinary(citer->appKey().data()); addedAppIds.insert(appIdInfo); } - AppIdInfos removedAppIds; + AppInfos removedAppIds; for (bsl::vector::const_iterator citer = queueUpdate.removedAppIds().begin(); citer != queueUpdate.removedAppIds().end(); ++citer) { - AppIdInfo appIdInfo; + AppInfo appIdInfo; appIdInfo.first = citer->appId(); appIdInfo.second.fromBinary(citer->appKey().data()); @@ -809,7 +810,7 @@ void ClusterUtil::populateQueueAssignmentAdvisory( key->loadBinary(&queueInfo.key()); // Generate appIds and appKeys - populateAppIdInfos(&queueInfo.appIds(), domain->config().mode()); + populateAppInfos(&queueInfo.appIds(), domain->config().mode()); BALL_LOG_INFO << clusterData->identity().description() << ": Populated QueueAssignmentAdvisory: " << *advisory; @@ -901,6 +902,8 @@ ClusterUtil::assignQueue(ClusterState* clusterState, DomainStatesIter domIt = clusterState->domainStates().find( uri.qualifiedDomain()); if (domIt == clusterState->domainStates().end()) { + // REVISIT: This is also done in 'ClusterState::assignQueue' + clusterState->domainStates()[uri.qualifiedDomain()].createInplace( allocator, allocator); @@ -978,12 +981,13 @@ ClusterUtil::assignQueue(ClusterState* clusterState, } } - // Queue is no longer pending unassignment - const DomainStatesCIter cit = clusterState->domainStates().find( + // Set the queue as no longer pending unassignment + const DomainStatesCIter citDomainState = clusterState->domainStates().find( uri.qualifiedDomain()); - if (cit != clusterState->domainStates().cend()) { - UriToQueueInfoMapCIter qcit = cit->second->queuesInfo().find(uri); - if (qcit != cit->second->queuesInfo().cend()) { + if (citDomainState != clusterState->domainStates().cend()) { + UriToQueueInfoMapCIter qcit = + citDomainState->second->queuesInfo().find(uri); + if (qcit != citDomainState->second->queuesInfo().cend()) { BSLS_ASSERT_SAFE(cluster->isCSLModeEnabled() && qcit->second->pendingUnassignment()); qcit->second->setPendingUnassignment(false); @@ -1029,11 +1033,29 @@ ClusterUtil::assignQueue(ClusterState* clusterState, // In CSL mode, we assign the queue to ClusterState upon CSL commit // callback of QueueAssignmentAdvisory, so we don't assign it here. + // In non-CSL mode this is the shortcut to call Primary CQH instead of + // waiting for the quorum of acks + + BSLS_ASSERT_SAFE(queueAdvisory.queues().size() == 1); + + bmqp_ctrlmsg::QueueInfo& queueInfo = queueAdvisory.queues().back(); + + AppInfos appInfos(allocator); + + for (bsl::vector::const_iterator cit = + queueInfo.appIds().cbegin(); + cit != queueInfo.appIds().end(); + ++cit) { + appInfos.emplace(bsl::make_pair( + cit->appId(), + mqbu::StorageKey(mqbu::StorageKey::BinaryRepresentation(), + cit->appKey().data()))); + } BSLA_MAYBE_UNUSED const bool assignRc = clusterState->assignQueue( uri, key, queueAdvisory.queues().back().partitionId(), - AppIdInfos()); + appInfos); BSLS_ASSERT_SAFE(assignRc); domIt->second->adjustQueueCount(1); @@ -1055,7 +1077,7 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appInfos, const QueueAssigningCb& queueAssigningCb, bool forceUpdate) { @@ -1097,40 +1119,39 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, BSLS_ASSERT_SAFE(qs->uri() == uri); if ((qs->partitionId() == partitionId) && - (qs->key() == queueKey) && (qs->appIdInfos() == appIdInfos)) { + (qs->key() == queueKey) && (qs->appInfos() == appInfos)) { // All good.. nothing to update. return; // RETURN } - mwcu::Printer stateAppIdInfos(&qs->appIdInfos()); - mwcu::Printer storageAppIdInfos(&appIdInfos); + mwcu::Printer stateAppInfos(&qs->appInfos()); + mwcu::Printer storageAppInfos(&appInfos); - // PartitionId and/or QueueKey and/or AppIdInfos mismatch. + // PartitionId and/or QueueKey and/or AppInfos mismatch. if (!forceUpdate) { MWCTSK_ALARMLOG_ALARM("CLUSTER_STATE") << cluster->description() << ": For queue [ " << uri - << "], different partitionId/queueKey/appIdInfos in " + << "], different partitionId/queueKey/appInfos in " << "cluster state and storage. " - << "PartitionId/QueueKey/AppIdInfos in cluster state [" + << "PartitionId/QueueKey/AppInfos in cluster state [" << qs->partitionId() << "], [" << qs->key() << "], [" - << stateAppIdInfos - << "]. PartitionId/QueueKey/AppIdInfos in storage [" + << stateAppInfos + << "]. PartitionId/QueueKey/AppInfos in storage [" << partitionId << "], [" << queueKey << "], [" - << storageAppIdInfos << "]." << MWCTSK_ALARMLOG_END; + << storageAppInfos << "]." << MWCTSK_ALARMLOG_END; return; // RETURN } BALL_LOG_WARN << cluster->description() << ": For queue [" << uri << "], force-updating " - << "partitionId/queueKey/appIdInfos from [" + << "partitionId/queueKey/appInfos from [" << qs->partitionId() << "], [" << qs->key() << "], [" - << stateAppIdInfos << "] to [" << partitionId - << "], [" << queueKey << "], [" << storageAppIdInfos - << "]."; + << stateAppInfos << "] to [" << partitionId << "], [" + << queueKey << "], [" << storageAppInfos << "]."; clusterState->queueKeys().erase(qs->key()); - clusterState->assignQueue(uri, queueKey, partitionId, appIdInfos); + clusterState->assignQueue(uri, queueKey, partitionId, appInfos); BALL_LOG_INFO << cluster->description() << ": Queue assigned: " << "[uri: " << uri << ", queueKey: " << queueKey @@ -1145,8 +1166,8 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, << ": re-registering a known queue with a stale view, " << "but queueKey is not unique. " << "QueueKey [" << queueKey << "], URI [" << uri << "], Partition [" - << partitionId << "], AppIdInfos [" - << storageAppIdInfos << "]." << MWCTSK_ALARMLOG_END; + << partitionId << "], AppInfos [" << storageAppInfos + << "]." << MWCTSK_ALARMLOG_END; return; // RETURN } @@ -1162,13 +1183,14 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, } // Queue is not known, so add it. - clusterState->assignQueue(uri, queueKey, partitionId, appIdInfos); + clusterState->assignQueue(uri, queueKey, partitionId, appInfos); - mwcu::Printer printer(&appIdInfos); - BALL_LOG_INFO << cluster->description() << ": Queue assigned: " - << "[uri: " << uri << ", queueKey: " << queueKey + mwcu::Printer printer(&appInfos); + BALL_LOG_INFO << cluster->description() + << ": Queue assigned: " << "[uri: " << uri + << ", queueKey: " << queueKey << ", partitionId: " << partitionId - << ", appIdInfos: " << printer << "]"; + << ", appInfos: " << printer << "]"; if (!cluster->isCSLModeEnabled()) { ClusterState::QueueKeysInsertRc insertRc = @@ -1192,11 +1214,11 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, queueAssigningCb(uri, false); // processingPendingRequests } -void ClusterUtil::populateAppIdInfos(AppIdInfos* appIdInfos, - const mqbconfm::QueueMode& domainConfig) +void ClusterUtil::populateAppInfos(AppInfos* appInfos, + const mqbconfm::QueueMode& domainConfig) { // PRECONDITIONS - BSLS_ASSERT_SAFE(appIdInfos && appIdInfos->empty()); + BSLS_ASSERT_SAFE(appInfos && appInfos->empty()); if (domainConfig.isFanoutValue()) { const bsl::vector& cfgAppIds = @@ -1209,21 +1231,21 @@ void ClusterUtil::populateAppIdInfos(AppIdInfos* appIdInfos, mqbu::StorageKey appKey; mqbs::StorageUtil::generateStorageKey(&appKey, &appKeys, *cit); - appIdInfos->insert(AppIdInfo(*cit, appKey)); + appInfos->insert(AppInfo(*cit, appKey)); } } else { - appIdInfos->insert(AppIdInfo(bmqp::ProtocolUtil::k_DEFAULT_APP_ID, - mqbi::QueueEngine::k_DEFAULT_APP_KEY)); + appInfos->insert(AppInfo(bmqp::ProtocolUtil::k_DEFAULT_APP_ID, + mqbi::QueueEngine::k_DEFAULT_APP_KEY)); } } -void ClusterUtil::populateAppIdInfos( - bsl::vector* appIdInfos, +void ClusterUtil::populateAppInfos( + bsl::vector* appInfos, const mqbconfm::QueueMode& domainConfig) { // PRECONDITIONS - BSLS_ASSERT_SAFE(appIdInfos && appIdInfos->empty()); + BSLS_ASSERT_SAFE(appInfos && appInfos->empty()); if (domainConfig.isFanoutValue()) { const bsl::vector& cfgAppIds = @@ -1238,9 +1260,13 @@ void ClusterUtil::populateAppIdInfos( mqbu::StorageKey appKey; mqbs::StorageUtil::generateStorageKey(&appKey, &appKeys, *cit); + // This is the only place generating keys upon queue assignment + // for both CSL (FSM) and non-CSL (non-FSM). The latter used to + // generate keys in 'StorageUtil::registerQueue'. + appKey.loadBinary(&appIdInfo.appKey()); - appIdInfos->push_back(appIdInfo); + appInfos->push_back(appIdInfo); } } else { @@ -1248,7 +1274,7 @@ void ClusterUtil::populateAppIdInfos( appIdInfo.appId() = bmqp::ProtocolUtil::k_DEFAULT_APP_ID; mqbi::QueueEngine::k_DEFAULT_APP_KEY.loadBinary(&appIdInfo.appKey()); - appIdInfos->push_back(appIdInfo); + appInfos->push_back(appIdInfo); } } @@ -1314,7 +1340,7 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, mqbu::StorageKey::k_NULL_KEY.loadBinary((&queueUpdate.key())); queueUpdate.domain() = domain->name(); - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; mqbu::StorageKey::k_NULL_KEY.loadBinary(&appIdInfo.appKey()); @@ -1335,9 +1361,9 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, BSLS_ASSERT_SAFE(queueUpdate.domain() == domain->name()); bsl::unordered_set appKeys; - const AppIdInfos& appIdInfos = qinfoCit->second->appIdInfos(); - for (AppIdInfosCIter appInfoCit = appIdInfos.cbegin(); - appInfoCit != appIdInfos.cend(); + const AppInfos& appInfos = qinfoCit->second->appInfos(); + for (AppInfosCIter appInfoCit = appInfos.cbegin(); + appInfoCit != appInfos.cend(); ++appInfoCit) { if (appInfoCit->first == appId) { BALL_LOG_ERROR << "Failed to register appId '" << appId @@ -1350,7 +1376,7 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, appKeys.insert(appInfoCit->second); } - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; mqbu::StorageKey appKey; @@ -1442,7 +1468,7 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, mqbu::StorageKey::k_NULL_KEY.loadBinary((&queueUpdate.key())); queueUpdate.domain() = domain->name(); - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; mqbu::StorageKey::k_NULL_KEY.loadBinary(&appIdInfo.appKey()); @@ -1463,12 +1489,12 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, BSLS_ASSERT_SAFE(queueUpdate.domain() == domain->name()); bool appIdFound = false; - const AppIdInfos& appIdInfos = qinfoCit->second->appIdInfos(); - for (AppIdInfosCIter appInfoCit = appIdInfos.cbegin(); - appInfoCit != appIdInfos.cend(); + const AppInfos& appInfos = qinfoCit->second->appInfos(); + for (AppInfosCIter appInfoCit = appInfos.cbegin(); + appInfoCit != appInfos.cend(); ++appInfoCit) { if (appInfoCit->first == appId) { - // Populate AppIdInfo + // Populate AppInfo bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appId; appInfoCit->second.loadBinary(&appIdInfo.appKey()); @@ -2214,9 +2240,9 @@ void ClusterUtil::loadQueuesInfo(bsl::vector* out, qCit->second->key().loadBinary(&queueInfo.key()); if (includeAppIds) { - for (AppIdInfosCIter appIdCit = - qCit->second->appIdInfos().cbegin(); - appIdCit != qCit->second->appIdInfos().cend(); + for (AppInfosCIter appIdCit = + qCit->second->appInfos().cbegin(); + appIdCit != qCit->second->appInfos().cend(); ++appIdCit) { bmqp_ctrlmsg::AppIdInfo appIdInfo; appIdInfo.appId() = appIdCit->first; diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.h b/src/groups/mqb/mqbc/mqbc_clusterutil.h index 5a54e8e49..6eb85869f 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.h +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.h @@ -88,9 +88,9 @@ struct ClusterUtil { public: // TYPES - typedef ClusterStateQueueInfo::AppIdInfo AppIdInfo; - typedef ClusterStateQueueInfo::AppIdInfos AppIdInfos; - typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter; + typedef ClusterStateQueueInfo::AppInfo AppInfo; + typedef ClusterStateQueueInfo::AppInfos AppInfos; + typedef ClusterStateQueueInfo::AppInfosCIter AppInfosCIter; typedef mqbc::ClusterState::QueueInfoSp QueueInfoSp; typedef ClusterState::UriToQueueInfoMap UriToQueueInfoMap; @@ -270,17 +270,17 @@ struct ClusterUtil { const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, const QueueAssigningCb& queueAssigningCb, bool forceUpdate); /// Generate appKeys based on the appIds in the specified `domainConfig` /// and populate them into the specified `appIdInfos`. - static void populateAppIdInfos(AppIdInfos* appIdInfos, - const mqbconfm::QueueMode& domainConfig); + static void populateAppInfos(AppInfos* appIdInfos, + const mqbconfm::QueueMode& domainConfig); static void - populateAppIdInfos(bsl::vector* appIdInfos, - const mqbconfm::QueueMode& domainConfig); + populateAppInfos(bsl::vector* appIdInfos, + const mqbconfm::QueueMode& domainConfig); /// Register the specified `appId` for all queues in the specified /// `domain`, using the specified `clusterData` and `clusterState`. diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 6b69f05fa..8c04eebda 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3644,10 +3644,10 @@ void StorageManager::initializeQueueKeyInfoMap( mqbs::DataStoreConfigQueueInfo qinfo; qinfo.setCanonicalQueueUri(csQinfo.uri().asString()); qinfo.setPartitionId(csQinfo.partitionId()); - for (AppIdInfosCIter appIdCit = csQinfo.appIdInfos().cbegin(); - appIdCit != csQinfo.appIdInfos().cend(); + for (AppInfosCIter appIdCit = csQinfo.appInfos().cbegin(); + appIdCit != csQinfo.appInfos().cend(); ++appIdCit) { - qinfo.addAppIdKeyPair(*appIdCit); + qinfo.addAppInfo(*appIdCit); } d_queueKeyInfoMapVec.at(csQinfo.partitionId()) @@ -3658,11 +3658,12 @@ void StorageManager::initializeQueueKeyInfoMap( d_isQueueKeyInfoMapVecInitialized = true; } -void StorageManager::registerQueue(const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs, - mqbi::Domain* domain) +void StorageManager::registerQueue( + const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + int partitionId, + const bsl::unordered_set& appIdKeyPairs, + mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -3721,8 +3722,8 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId) int StorageManager::updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -3820,7 +3821,7 @@ void StorageManager::unregisterQueueReplica(int partitionId, void StorageManager::updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain, bool allowDuplicate) { diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index 15937dc9f..50576bac8 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -155,7 +155,7 @@ class StorageManager typedef ClusterState::DomainStatesCIter DomainStatesCIter; typedef ClusterState::UriToQueueInfoMapCIter UriToQueueInfoMapCIter; - typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter; + typedef ClusterStateQueueInfo::AppInfosCIter AppInfosCIter; /// Vector of pairs of buffered primary status advisories and their source typedef bsl::vector< @@ -853,11 +853,12 @@ class StorageManager /// associated queue storage created. /// /// THREAD: Executed by the Client's dispatcher thread. - virtual void registerQueue(const bmqt::Uri& uri, - const mqbu::StorageKey& queueKey, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs, - mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; + virtual void + registerQueue(const bmqt::Uri& uri, + const mqbu::StorageKey& queueKey, + int partitionId, + const bsl::unordered_set& appIdKeyPairs, + mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; /// Synchronously unregister the queue with the specified `uri` from the /// specified `partitionId`. @@ -878,8 +879,8 @@ class StorageManager virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE; virtual void @@ -899,7 +900,7 @@ class StorageManager updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp index 6a99c581d..f5ed60ee2 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp @@ -768,14 +768,14 @@ struct TestHelper { handle, uri_t, rec.d_queueKey, - mqbs::DataStore::AppIdKeyPairs(), + mqbs::DataStore::AppInfos(), rec.d_timestamp, true); // isNewQueue d_cluster_mp->_state().assignQueue(uri_t, queueKey, partitionId, - mqbc::ClusterState::AppIdInfos()); + mqbc::ClusterState::AppInfos()); BSLS_ASSERT_OPT(rc == 0); return queueKey; diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index cad304f2b..5e151a61f 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -66,7 +66,7 @@ namespace { class AppIdMatcher { private: // TYPES - typedef bsl::pair AppIdKeyPair; + typedef bsl::pair AppInfo; // DATA const bsl::string& d_expectedAppId; @@ -79,7 +79,7 @@ class AppIdMatcher { } // ACCESSORS - bool operator()(const AppIdKeyPair& appIdKeyPair) const + bool operator()(const AppInfo& appIdKeyPair) const { return d_expectedAppId == appIdKeyPair.first; } @@ -100,26 +100,26 @@ void optionalSemaphorePost(bslmt::Semaphore* semaphore) // ------------------ // PRIVATE FUNCTIONS -bool StorageUtil::loadUpdatedAppIdKeyPairs( - AppIdKeyPairs* addedAppIdKeyPairs, - AppIdKeyPairs* removedAppIdKeyPairs, +bool StorageUtil::loadUpdatedAppInfos( + AppInfos* addedAppInfos, + AppInfos* removedAppInfos, AppKeys* appKeys, bslmt::Mutex* appKeysLock, const mqbs::ReplicatedStorage& storage, - const AppIdKeyPairs& newAppIdKeyPairs, + const AppInfos& newAppInfos, const bsl::vector& cfgAppIds, bool isCSLMode) { // executed by the *CLUSTER DISPATCHER* thread // PRECONDITIONS - BSLS_ASSERT_SAFE(addedAppIdKeyPairs); - BSLS_ASSERT_SAFE(removedAppIdKeyPairs); + BSLS_ASSERT_SAFE(addedAppInfos); + BSLS_ASSERT_SAFE(removedAppInfos); if (isCSLMode) { BSLS_ASSERT_SAFE(cfgAppIds.empty()); } else { - BSLS_ASSERT_SAFE(newAppIdKeyPairs.empty()); + BSLS_ASSERT_SAFE(newAppInfos.empty()); } // This function is invoked by 'StorageManager::registerQueue' if the queue @@ -136,32 +136,38 @@ bool StorageUtil::loadUpdatedAppIdKeyPairs( // list of newly added and removed appIds, and then invoking 'updateQueue' // in the appropriate thread. - AppIdKeyPairs existingAppIdKeyPairs; - storage.loadVirtualStorageDetails(&existingAppIdKeyPairs); + AppInfos existingAppInfos; + storage.loadVirtualStorageDetails(&existingAppInfos); if (isCSLMode) { - loadAddedAndRemovedEntries(addedAppIdKeyPairs, - removedAppIdKeyPairs, - existingAppIdKeyPairs, - newAppIdKeyPairs); + loadAddedAndRemovedEntries(addedAppInfos, + removedAppInfos, + existingAppInfos, + newAppInfos); - if (addedAppIdKeyPairs->empty() && removedAppIdKeyPairs->empty()) { + if (addedAppInfos->empty() && removedAppInfos->empty()) { // No appIds to add or remove. return false; // RETURN } } else { - bsl::vector existingAppIds; - for (size_t i = 0; i < existingAppIdKeyPairs.size(); ++i) { - existingAppIds.push_back(existingAppIdKeyPairs[i].first); + bsl::unordered_set existingAppIds; + for (AppInfos::const_iterator cit = existingAppInfos.cbegin(); + cit != existingAppInfos.cend(); + ++cit) { + existingAppIds.emplace(cit->first); } - bsl::vector addedAppIds; - bsl::vector removedAppIds; + bsl::unordered_set addedAppIds; + bsl::unordered_set removedAppIds; + + bsl::unordered_set cfgAppSet(cfgAppIds.cbegin(), + cfgAppIds.cend()); + loadAddedAndRemovedEntries(&addedAppIds, &removedAppIds, existingAppIds, - cfgAppIds); + cfgAppSet); if (addedAppIds.empty() && removedAppIds.empty()) { // No appIds to add or remove. @@ -169,24 +175,27 @@ bool StorageUtil::loadUpdatedAppIdKeyPairs( } // Generate unique appKeys for the added appIds, and populate - // 'addedAppIdKeyPairs'. - for (size_t i = 0; i < addedAppIds.size(); ++i) { + // 'addedAppInfos'. + for (bsl::unordered_set::const_iterator cit = + addedAppIds.cbegin(); + cit != addedAppIds.cend(); + ++cit) { mqbu::StorageKey appKey = generateAppKey(appKeys, appKeysLock, - addedAppIds[i]); - addedAppIdKeyPairs->push_back( - bsl::make_pair(addedAppIds[i], appKey)); + *cit); + addedAppInfos->emplace(bsl::make_pair(*cit, appKey)); } - // Populate 'removedAppIdKeyPairs'. - for (size_t i = 0; i < removedAppIds.size(); ++i) { - AppIdKeyPairsCIter it = bsl::find_if( - existingAppIdKeyPairs.begin(), - existingAppIdKeyPairs.end(), - AppIdMatcher(removedAppIds[i])); - BSLS_ASSERT_SAFE(it != existingAppIdKeyPairs.end()); - removedAppIdKeyPairs->push_back( - bsl::make_pair(it->first, it->second)); + // Populate 'removedAppInfos'. + for (bsl::unordered_set::const_iterator cit = + removedAppIds.cbegin(); + cit != removedAppIds.cend(); + ++cit) { + AppInfosCIter it = bsl::find_if(existingAppInfos.cbegin(), + existingAppInfos.cend(), + AppIdMatcher(*cit)); + BSLS_ASSERT_SAFE(it != existingAppInfos.end()); + removedAppInfos->emplace(bsl::make_pair(it->first, it->second)); } } @@ -197,9 +206,9 @@ void StorageUtil::registerQueueDispatched( BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, mqbs::FileStore* fs, mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs) + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -256,16 +265,16 @@ void StorageUtil::registerQueueDispatched( void StorageUtil::updateQueuePrimaryDispatched( BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor, mqbs::ReplicatedStorage* storage, - bslmt::Mutex* storagesLock, - mqbs::FileStore* fs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, - bool isFanout, - bool isCSLMode) + bslmt::Mutex* storagesLock, + mqbs::FileStore* fs, + AppKeys* appKeys, + bslmt::Mutex* appKeysLock, + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -294,12 +303,12 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, mqbs::FileStore* fs, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, - bool isFanout, - bool isCSLMode) + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -353,7 +362,7 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, BALL_LOG_INFO_BLOCK { - mwcu::Printer printer(&addedIdKeyPairs); + mwcu::Printer printer(&addedIdKeyPairs); BALL_LOG_OUTPUT_STREAM << clusterDescription << ": Partition [" << partitionId @@ -365,7 +374,7 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, } if (!removedIdKeyPairs.empty()) { - for (AppIdKeyPairsCIter cit = removedIdKeyPairs.begin(); + for (AppInfosCIter cit = removedIdKeyPairs.begin(); cit != removedIdKeyPairs.end(); ++cit) { // Write QueueDeletionRecord to data store for removed appIds. @@ -410,7 +419,7 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, BALL_LOG_INFO_BLOCK { - mwcu::Printer printer(&removedIdKeyPairs); + mwcu::Printer printer(&removedIdKeyPairs); BALL_LOG_OUTPUT_STREAM << clusterDescription << ": Partition [" << partitionId @@ -425,8 +434,8 @@ int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage, // away. fs->dispatcherFlush(true, false); - mwcu::Printer printer1(&addedIdKeyPairs); - mwcu::Printer printer2(&removedIdKeyPairs); + mwcu::Printer printer1(&addedIdKeyPairs); + mwcu::Printer printer2(&removedIdKeyPairs); BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] updated [" << storage->queueUri() << "], queueKey [" << storage->queueKey() << "] with the storage as primary: " @@ -440,7 +449,7 @@ int StorageUtil::addVirtualStoragesInternal( mqbs::ReplicatedStorage* storage, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, const bsl::string& clusterDescription, int partitionId, bool isFanout, @@ -467,7 +476,7 @@ int StorageUtil::addVirtualStoragesInternal( // Register appKeys with 'appKeys' and then with the underlying // physical 'storage'. - for (AppIdKeyPairsCIter cit = appIdKeyPairs.begin(); + for (AppInfosCIter cit = appIdKeyPairs.begin(); cit != appIdKeyPairs.end(); ++cit) { AppKeysInsertRc irc = appKeys->insert(cit->second); @@ -1526,13 +1535,15 @@ void StorageUtil::recoveredQueuesCb( } if (qinfo.appIdKeyPairs().size() != 1 || - qinfo.appIdKeyPairs()[0].first != + qinfo.appIdKeyPairs().cbegin()->first != bmqp::ProtocolUtil::k_DEFAULT_APP_ID) { - // This ia a fanout queue + // This is a fanout queue AppIds appIds; - for (size_t n = 0; n < qinfo.appIdKeyPairs().size(); ++n) { - const AppIdKeyPair& p = qinfo.appIdKeyPairs()[n]; + for (AppInfos::const_iterator cit = qinfo.appIdKeyPairs().cbegin(); + cit != qinfo.appIdKeyPairs().cend(); + ++cit) { + const AppInfo& p = *cit; AppIdsInsertRc appIdsIrc = appIds.insert(p.first); if (false == appIdsIrc.second) { @@ -1664,7 +1675,7 @@ void StorageUtil::recoveredQueuesCb( ++qit) { const mqbu::StorageKey& queueKey = qit->first; const mqbs::DataStoreConfigQueueInfo& qinfo = qit->second; - const AppIdKeyPairs& appIdKeyPairs = qinfo.appIdKeyPairs(); + const AppInfos& appIdKeyPairs = qinfo.appIdKeyPairs(); const bmqt::Uri queueUri(qinfo.canonicalQueueUri()); BSLS_ASSERT_SAFE(queueUri.isValid()); @@ -1698,7 +1709,7 @@ void StorageUtil::recoveredQueuesCb( BSLS_ASSERT_SAFE(queueKey == rstorage->queueKey()); BSLS_ASSERT_SAFE(partitionId == rstorage->partitionId()); - for (AppIdKeyPairsCIter ait = appIdKeyPairs.begin(); + for (AppInfosCIter ait = appIdKeyPairs.begin(); ait != appIdKeyPairs.end(); ++ait) { BSLA_MAYBE_UNUSED const bsl::string& appId = ait->first; @@ -1832,7 +1843,7 @@ void StorageUtil::recoveredQueuesCb( mwcu::MemOutStream errorDesc; int rc; if (domain->config().mode().isFanoutValue()) { - for (AppIdKeyPairsCIter ait = appIdKeyPairs.begin(); + for (AppInfosCIter ait = appIdKeyPairs.begin(); ait != appIdKeyPairs.end(); ++ait) { const bsl::string& appId = ait->first; @@ -2340,7 +2351,7 @@ void StorageUtil::registerQueue( const mqbu::StorageKey& queueKey, const bsl::string& clusterDescription, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) { // executed by the *CLUSTER DISPATCHER* thread @@ -2359,9 +2370,6 @@ void StorageUtil::registerQueue( partitionId < cluster->clusterConfig()->partitionConfig().numPartitions()); BSLS_ASSERT_SAFE(domain); - if (!cluster->isCSLModeEnabled()) { - BSLS_ASSERT_SAFE(appIdKeyPairs.empty()); - } // StorageMgr is either aware of the queue (the 'uri') or it isn't. If it // is already aware, either this queue was registered earlier or it was @@ -2376,7 +2384,7 @@ void StorageUtil::registerQueue( // and deployed before the node started (some appIds were added or removed // or both). We need to make sure that these appIds are handled correctly. // The logic to get a list of added and/or removed appId/key pairs is - // handled by invoking 'loadUpdatedAppIdKeyPairs' in this function if queue + // handled by invoking 'loadUpdatedAppInfos' in this function if queue // is in fanout mode. // If StorageMgr is not aware of the queue, then its a simpler process -- @@ -2440,33 +2448,31 @@ void StorageUtil::registerQueue( // to be added or removed (see comments at the beginning of this // routine for explanation). - AppIdKeyPairs addedAppIdKeyPairs, removedAppIdKeyPairs; + AppInfos addedAppInfos, removedAppInfos; bool hasUpdate = false; if (cluster->isCSLModeEnabled()) { // In CSL mode, queue assignment procedure is split into queue // assignment and queue update, so we simply remove all appIds // here, and re-add them during queue update phase. - hasUpdate = loadUpdatedAppIdKeyPairs( - &addedAppIdKeyPairs, - &removedAppIdKeyPairs, - appKeys, - appKeysLock, - *storageSp.get(), - appIdKeyPairs, - bsl::vector(), - true); // isCSLMode + hasUpdate = loadUpdatedAppInfos(&addedAppInfos, + &removedAppInfos, + appKeys, + appKeysLock, + *storageSp.get(), + appIdKeyPairs, + bsl::vector(), + true); // isCSLMode } else { - hasUpdate = loadUpdatedAppIdKeyPairs( - &addedAppIdKeyPairs, - &removedAppIdKeyPairs, - appKeys, - appKeysLock, - *storageSp.get(), - AppIdKeyPairs(), - queueMode.fanout().appIDs(), - false); // isCSLMode + hasUpdate = loadUpdatedAppInfos(&addedAppInfos, + &removedAppInfos, + appKeys, + appKeysLock, + *storageSp.get(), + AppInfos(), + queueMode.fanout().appIDs(), + false); // isCSLMode } if (!hasUpdate) { // No update needed for AppId/Key pairs. @@ -2492,8 +2498,8 @@ void StorageUtil::registerQueue( appKeysLock, clusterDescription, partitionId, - addedAppIdKeyPairs, - removedAppIdKeyPairs, + addedAppInfos, + removedAppInfos, domain->config().mode().isFanoutValue(), cluster->isCSLModeEnabled())); @@ -2503,7 +2509,7 @@ void StorageUtil::registerQueue( // Wait for 'updateQueuePrimaryDispatched' operation to complete. // We need to wait because 'updateQueuePrimaryDispatched' creates - // virtual storages corresponding to 'addedAppIdKeyPairs' (if any), + // virtual storages corresponding to 'addedAppInfos' (if any), // and the caller of 'registerQueue' expects these virtual storages // to be created this routine or its caller returns. Before // waiting, release the 'storagesLock' guard and unlock it to avoid @@ -2537,10 +2543,10 @@ void StorageUtil::registerQueue( mwcu::MemOutStream errorDesc; int rc = 0; - AppIdKeyPairs appIdKeyPairsToUse; + AppInfos appIdKeyPairsToUse; if (queueMode.isFanoutValue()) { - if (cluster->isCSLModeEnabled()) { - for (AppIdKeyPairsCIter citer = appIdKeyPairs.begin(); + if (cluster->isCSLModeEnabled() || !appIdKeyPairs.empty()) { + for (AppInfosCIter citer = appIdKeyPairs.begin(); citer != appIdKeyPairs.end(); ++citer) { rc = storageSp->addVirtualStorage(errorDesc, @@ -2568,7 +2574,7 @@ void StorageUtil::registerQueue( rc = storageSp->addVirtualStorage(errorDesc, *citer, appKey); - appIdKeyPairsToUse.push_back(bsl::make_pair(*citer, appKey)); + appIdKeyPairsToUse.emplace(bsl::make_pair(*citer, appKey)); } } } @@ -2764,8 +2770,8 @@ int StorageUtil::updateQueuePrimary(StorageSpMap* storageMap, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, bool isCSLMode) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -2785,8 +2791,8 @@ int StorageUtil::updateQueuePrimary(StorageSpMap* storageMap, StorageSpMapIter it = storageMap->find(uri); if (storageMap->end() == it) { - mwcu::Printer printer1(&addedIdKeyPairs); - mwcu::Printer printer2(&removedIdKeyPairs); + mwcu::Printer printer1(&addedIdKeyPairs); + mwcu::Printer printer2(&removedIdKeyPairs); BALL_LOG_ERROR << clusterDescription << " Partition [" << partitionId << "]: Error when updating queue '" << uri << "' with addedAppIds: [" << printer1 @@ -3136,7 +3142,7 @@ void StorageUtil::updateQueueReplicaDispatched( int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bool isCSLMode, mqbi::Domain* domain, bool allowDuplicate) @@ -3168,7 +3174,7 @@ void StorageUtil::updateQueueReplicaDispatched( if (it == storageMap->end()) { // Cluster state and/or partition are out of sync at this replica. - mwcu::Printer printer(&appIdKeyPairs); + mwcu::Printer printer(&appIdKeyPairs); MWCTSK_ALARMLOG_ALARM("REPLICATION") << "At partition [" << partitionId << "], failure while registering appIds [" << printer @@ -3200,7 +3206,7 @@ void StorageUtil::updateQueueReplicaDispatched( isCSLMode); if (rc != 0) { if (!allowDuplicate) { - mwcu::Printer printer(&appIdKeyPairs); + mwcu::Printer printer(&appIdKeyPairs); MWCTSK_ALARMLOG_ALARM("REPLICATION") << "At partition [" << partitionId << "], failure while registering appIds [" << printer @@ -3215,7 +3221,7 @@ void StorageUtil::updateQueueReplicaDispatched( return; // RETURN } - mwcu::Printer printer(&appIdKeyPairs); + mwcu::Printer printer(&appIdKeyPairs); BALL_LOG_INFO << clusterDescription << ": Partition [" << partitionId << "] updated [" << uri << "], queueKey [" << queueKey << "] with the storage as replica: " << "addedIdKeyPairs:" diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.h b/src/groups/mqb/mqbc/mqbc_storageutil.h index 39231f211..270c383af 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.h +++ b/src/groups/mqb/mqbc/mqbc_storageutil.h @@ -102,9 +102,9 @@ struct StorageUtil { private: // TYPES - typedef mqbi::StorageManager::AppIdKeyPair AppIdKeyPair; - typedef mqbi::StorageManager::AppIdKeyPairs AppIdKeyPairs; - typedef mqbi::StorageManager::AppIdKeyPairsCIter AppIdKeyPairsCIter; + typedef mqbi::StorageManager::AppInfo AppInfo; + typedef mqbi::StorageManager::AppInfos AppInfos; + typedef mqbi::StorageManager::AppInfosCIter AppInfosCIter; typedef mqbi::StorageManager::AppIds AppIds; typedef mqbi::StorageManager::AppIdsIter AppIdsIter; @@ -178,38 +178,37 @@ struct StorageUtil { /// Load into the specified `result` the list of elements present in /// `baseSet` which are not present in `subtractionSet`. template - static void loadDifference(bsl::vector* result, - const bsl::vector& baseSet, - const bsl::vector& subtractionSet); + static void loadDifference(bsl::unordered_set* result, + const bsl::unordered_set& baseSet, + const bsl::unordered_set& subtractionSet); - /// Load into the specified `addedAppIdKeyPairs` and - /// `removedAppIdKeyPairs` the appId/key pairs which have been added and + /// Load into the specified `addedAppInfos` and + /// `removedAppInfos` the appId/key pairs which have been added and /// removed respectively for the specified `storage` based on the - /// specified `newAppIdKeyPairs` or `cfgAppIds`, as well as the + /// specified `newAppInfos` or `cfgAppIds`, as well as the /// specified `isCSLMode` mode. If new app keys are generated, load /// them into the specified `appKeys`. If the optionally specified /// `appKeysLock` is provided, lock it. Return true if there are any /// added or removed appId/key pairs, false otherwise. /// /// THREAD: Executed by the cluster dispatcher thread. - static bool - loadUpdatedAppIdKeyPairs(AppIdKeyPairs* addedAppIdKeyPairs, - AppIdKeyPairs* removedAppIdKeyPairs, - AppKeys* appKeys, - bslmt::Mutex* appKeysLock, - const mqbs::ReplicatedStorage& storage, - const AppIdKeyPairs& newAppIdKeyPairs, - const bsl::vector& cfgAppIds, - bool isCSLMode); + static bool loadUpdatedAppInfos(AppInfos* addedAppInfos, + AppInfos* removedAppInfos, + AppKeys* appKeys, + bslmt::Mutex* appKeysLock, + const mqbs::ReplicatedStorage& storage, + const AppInfos& newAppInfos, + const bsl::vector& cfgAppIds, + bool isCSLMode); /// THREAD: Executed by the Queue's dispatcher thread. static void registerQueueDispatched(const mqbi::Dispatcher::ProcessorHandle& processor, mqbs::FileStore* fs, mqbs::ReplicatedStorage* storage, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& appIdKeyPairs); + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& appIdKeyPairs); /// THREAD: This method is called from the Queue's dispatcher thread. static void updateQueuePrimaryDispatched( @@ -221,8 +220,8 @@ struct StorageUtil { bslmt::Mutex* appKeysLock, const bsl::string& clusterDescription, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, bool isFanout, bool isCSLMode); @@ -234,18 +233,18 @@ struct StorageUtil { mqbs::FileStore* fs, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const bsl::string& clusterDescription, - int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, - bool isFanout, - bool isCSLMode); + const bsl::string& clusterDescription, + int partitionId, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, + bool isFanout, + bool isCSLMode); static int addVirtualStoragesInternal(mqbs::ReplicatedStorage* storage, AppKeys* appKeys, bslmt::Mutex* appKeysLock, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, const bsl::string& clusterDescription, int partitionId, bool isFanout, @@ -388,10 +387,10 @@ struct StorageUtil { /// present in `existingEntries` but not in `newEntries`. template static void - loadAddedAndRemovedEntries(bsl::vector* addedEntries, - bsl::vector* removedEntries, - const bsl::vector& existingEntries, - const bsl::vector& newEntries); + loadAddedAndRemovedEntries(bsl::unordered_set* addedEntries, + bsl::unordered_set* removedEntries, + const bsl::unordered_set& existingEntries, + const bsl::unordered_set& newEntries); /// Return true if the queue having specified `uri` and assigned to the /// specified `partitionId` has no messages in the specified @@ -640,7 +639,7 @@ struct StorageUtil { const mqbu::StorageKey& queueKey, const bsl::string& clusterDescription, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain); /// THREAD: Executed by the Queue's dispatcher thread. @@ -672,8 +671,8 @@ struct StorageUtil { const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs, + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs, bool isCSLMode); static void @@ -715,7 +714,7 @@ struct StorageUtil { int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& addedIdKeyPairs, + const AppInfos& addedIdKeyPairs, bool isCSLMode, mqbi::Domain* domain = 0, bool allowDuplicate = false); @@ -823,29 +822,28 @@ unsigned int StorageUtil::extractPartitionId(const bmqp::Event& event); // ------------------ template -void StorageUtil::loadDifference(bsl::vector* result, - const bsl::vector& baseSet, - const bsl::vector& subtractionSet) +void StorageUtil::loadDifference(bsl::unordered_set* result, + const bsl::unordered_set& baseSet, + const bsl::unordered_set& subtractionSet) { // PRECONDITIONS BSLS_ASSERT_SAFE(result); - typedef typename bsl::vector::const_iterator CIter; + typedef typename bsl::unordered_set::const_iterator CIter; - for (CIter it = baseSet.begin(); it != baseSet.end(); ++it) { - if (subtractionSet.end() == - bsl::find(subtractionSet.begin(), subtractionSet.end(), *it)) { - result->push_back(*it); + for (CIter it = baseSet.cbegin(); it != baseSet.cend(); ++it) { + if (subtractionSet.end() == subtractionSet.find(*it)) { + result->emplace(*it); } } } template void StorageUtil::loadAddedAndRemovedEntries( - bsl::vector* addedEntries, - bsl::vector* removedEntries, - const bsl::vector& existingEntries, - const bsl::vector& newEntries) + bsl::unordered_set* addedEntries, + bsl::unordered_set* removedEntries, + const bsl::unordered_set& existingEntries, + const bsl::unordered_set& newEntries) { // PRECONDITIONS BSLS_ASSERT_SAFE(addedEntries); diff --git a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h index 886589775..fa754df9c 100644 --- a/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h +++ b/src/groups/mqb/mqbi/mqbi_clusterstatemanager.h @@ -102,9 +102,9 @@ class ClusterStateManager { AfterPartitionPrimaryAssignmentCb; /// Pair of (appId, appKey) - typedef bsl::pair AppIdInfo; - typedef bsl::unordered_set AppIdInfos; - typedef AppIdInfos::const_iterator AppIdInfosCIter; + typedef bsl::pair AppInfo; + typedef bsl::unordered_set AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; struct QueueAssignmentResult { enum Enum { @@ -226,7 +226,7 @@ class ClusterStateManager { virtual void registerQueueInfo(const bmqt::Uri& uri, int partitionId, const mqbu::StorageKey& queueKey, - const AppIdInfos& appIdInfos, + const AppInfos& appIdInfos, bool forceUpdate) = 0; /// Unassign the queue in the specified `advisory` by applying the diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.cpp b/src/groups/mqb/mqbi/mqbi_queueengine.cpp index 582956c6b..b26c4a293 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.cpp +++ b/src/groups/mqb/mqbi/mqbi_queueengine.cpp @@ -36,13 +36,13 @@ QueueEngine::~QueueEngine() } void QueueEngine::afterAppIdRegistered( - BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfo& appIdKeyPair) { // NOTHING } void QueueEngine::afterAppIdUnregistered( - BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppIdKeyPair& appIdKeyPair) + BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfo& appIdKeyPair) { // NOTHING } diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.h b/src/groups/mqb/mqbi/mqbi_queueengine.h index cf6f27195..14a77e13a 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.h +++ b/src/groups/mqb/mqbi/mqbi_queueengine.h @@ -198,14 +198,14 @@ class QueueEngine { /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void - afterAppIdRegistered(const mqbi::Storage::AppIdKeyPair& appIdKeyPair); + afterAppIdRegistered(const mqbi::Storage::AppInfo& appIdKeyPair); /// Called after the specified `appIdKeyPair` has been dynamically /// unregistered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void - afterAppIdUnregistered(const mqbi::Storage::AppIdKeyPair& appIdKeyPair); + afterAppIdUnregistered(const mqbi::Storage::AppInfo& appIdKeyPair); /// Called after creation of a new storage for the specified /// `appIdKeyPair`. diff --git a/src/groups/mqb/mqbi/mqbi_storage.h b/src/groups/mqb/mqbi/mqbi_storage.h index 73748f33e..acd32fa15 100644 --- a/src/groups/mqb/mqbi/mqbi_storage.h +++ b/src/groups/mqb/mqbi/mqbi_storage.h @@ -376,12 +376,12 @@ class Storage { public: // PUBLIC TYPES - /// `AppIdKeyPair` is an alias for an (appId, appKey) pairing + /// `AppInfo` is an alias for an (appId, appKey) pairing /// representing unique virtual storage identification. - typedef bsl::pair AppIdKeyPair; + typedef bsl::pair AppInfo; - /// `AppIdKeyPairs` is an alias for a list of pairs of appId and appKey - typedef bsl::vector AppIdKeyPairs; + /// `AppInfos` is an alias for a set of pairs of appId and appKey + typedef bsl::unordered_set AppInfos; typedef mwcc::Array @@ -649,7 +649,7 @@ class Storage { /// Load into the specified `buffer` the list of pairs of appId and /// appKey for all the virtual storages registered with this instance. - virtual void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const = 0; + virtual void loadVirtualStorageDetails(AppInfos* buffer) const = 0; /// Return the number of auto confirmed Apps for the current message. virtual unsigned int numAutoConfirms() const = 0; diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index a157f66dc..e0397e582 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -168,9 +168,9 @@ class StorageManagerIterator { class StorageManager : public mqbi::AppKeyGenerator { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; - typedef AppIdKeyPairs::const_iterator AppIdKeyPairsCIter; + typedef mqbi::Storage::AppInfo AppInfo; + typedef mqbi::Storage::AppInfos AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; typedef bsl::unordered_set AppIds; typedef AppIds::iterator AppIdsIter; @@ -237,7 +237,7 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual void registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) = 0; /// Synchronously unregister the queue with the specified `uri` from the @@ -259,8 +259,8 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) = 0; + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) = 0; virtual void registerQueueReplica(int partitionId, const bmqt::Uri& uri, @@ -276,7 +276,7 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual void updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) = 0; diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp index 3264e0110..34ddd037d 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp @@ -60,7 +60,7 @@ void StorageManager::registerQueue( BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey, BSLS_ANNOTATION_UNUSED int partitionId, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& appIdKeyPairs, + BSLS_ANNOTATION_UNUSED const AppInfos& appIdKeyPairs, BSLS_ANNOTATION_UNUSED mqbi::Domain* domain) { // NOTHING @@ -77,8 +77,8 @@ int StorageManager::updateQueuePrimary( BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey, BSLS_ANNOTATION_UNUSED int partitionId, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& addedIdKeyPairs, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& removedIdKeyPairs) + BSLS_ANNOTATION_UNUSED const AppInfos& addedIdKeyPairs, + BSLS_ANNOTATION_UNUSED const AppInfos& removedIdKeyPairs) { return 0; } @@ -106,7 +106,7 @@ void StorageManager::updateQueueReplica( BSLS_ANNOTATION_UNUSED int partitionId, BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& queueKey, - BSLS_ANNOTATION_UNUSED const AppIdKeyPairs& appIdKeyPairs, + BSLS_ANNOTATION_UNUSED const AppInfos& appIdKeyPairs, BSLS_ANNOTATION_UNUSED mqbi::Domain* domain, BSLS_ANNOTATION_UNUSED bool allowDuplicate) { diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h index 028e17a3f..77e032778 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h @@ -80,7 +80,7 @@ class StorageManager : public mqbi::StorageManager { virtual void registerQueue(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE; /// Synchronously unregister the queue with the specified `uri` from the @@ -103,8 +103,8 @@ class StorageManager : public mqbi::StorageManager { virtual int updateQueuePrimary(const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, int partitionId, - const AppIdKeyPairs& addedIdKeyPairs, - const AppIdKeyPairs& removedIdKeyPairs) + const AppInfos& addedIdKeyPairs, + const AppInfos& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE; virtual void @@ -124,7 +124,7 @@ class StorageManager : public mqbi::StorageManager { updateQueueReplica(int partitionId, const bmqt::Uri& uri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 6e680641e..0d3fa8efc 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -272,9 +272,9 @@ struct DataStoreRecordKeyLess { class DataStoreConfigQueueInfo { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; private: // DATA @@ -282,7 +282,7 @@ class DataStoreConfigQueueInfo { int d_partitionId; - AppIdKeyPairs d_appIdKeyPairs; + AppInfos d_appIdKeyPairs; public: // TRAITS @@ -300,14 +300,14 @@ class DataStoreConfigQueueInfo { void setPartitionId(int value); - void addAppIdKeyPair(const AppIdKeyPair& value); + void addAppInfo(const AppInfo& value); // ACCESSORS const bsl::string& canonicalQueueUri() const; int partitionId() const; - const AppIdKeyPairs& appIdKeyPairs() const; + const AppInfos& appIdKeyPairs() const; }; // ===================== @@ -336,15 +336,15 @@ class DataStoreConfig { typedef Records::const_iterator RecordConstIterator; - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef bsl::function QueueCreationCb; @@ -542,9 +542,9 @@ class DataStore : public mqbi::DispatcherClient { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef DataStoreConfig::QueueKeyInfoMap QueueKeyInfoMap; @@ -597,7 +597,7 @@ class DataStore : public mqbi::DispatcherClient { virtual int writeQueueCreationRecord(DataStoreRecordHandle* handle, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bsls::Types::Uint64 timestamp, bool isNewQueue) = 0; @@ -890,10 +890,9 @@ inline void DataStoreConfigQueueInfo::setPartitionId(int value) d_partitionId = value; } -inline void -DataStoreConfigQueueInfo::addAppIdKeyPair(const AppIdKeyPair& value) +inline void DataStoreConfigQueueInfo::addAppInfo(const AppInfo& value) { - d_appIdKeyPairs.push_back(value); + d_appIdKeyPairs.insert(value); } // ACCESSORS @@ -907,7 +906,7 @@ inline int DataStoreConfigQueueInfo::partitionId() const return d_partitionId; } -inline const DataStoreConfigQueueInfo::AppIdKeyPairs& +inline const DataStoreConfigQueueInfo::AppInfos& DataStoreConfigQueueInfo::appIdKeyPairs() const { return d_appIdKeyPairs; diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index 8c37e08ec..b119e126a 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -128,9 +128,9 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef ReplicatedStorage::RecordHandles RecordHandles; @@ -331,8 +331,8 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// Load into the specified 'buffer' the list of pairs of appId and appKey // for all the virtual storages registered with this instance. - virtual void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const - BSLS_KEYWORD_OVERRIDE; + virtual void + loadVirtualStorageDetails(AppInfos* buffer) const BSLS_KEYWORD_OVERRIDE; /// Store in the specified 'msgSize' the size, in bytes, of the message /// having the specified 'msgGUID' if found and return success, or return @@ -717,7 +717,7 @@ inline bool FileBackedStorage::hasVirtualStorage(const bsl::string& appId, } inline void -FileBackedStorage::loadVirtualStorageDetails(AppIdKeyPairs* buffer) const +FileBackedStorage::loadVirtualStorageDetails(AppInfos* buffer) const { return d_virtualStorageCatalog.loadVirtualStorageDetails(buffer); } diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 6b273c445..15514f9e8 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -2020,13 +2020,15 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, FileStoreProtocol::k_HASH_LENGTH, appIdsAreaLen); - AppIdKeyPairs appIdKeyPairs; - FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - appIdsBlock, - numAppIds); - - for (size_t n = 0; n < appIdKeyPairs.size(); ++n) { - const AppIdKeyPair& p = appIdKeyPairs[n]; + AppInfos appIdKeyPairs; + FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + appIdsBlock, + numAppIds); + + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit) { + const AppInfo& p = *cit; if (0 == deletedAppKeysOffsets.count(p.second)) { // This appKey is not deleted. Add it to the list // of 'alive' appId/appKey pairs for this queue. @@ -2035,7 +2037,7 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, // StorageMgr because we have recovered all // appId/appKey pairs by that time. - qinfo.addAppIdKeyPair(p); + qinfo.addAppInfo(p); BALL_LOG_INFO << partitionDesc() @@ -4269,7 +4271,7 @@ int FileStore::writeQueueCreationRecord( } bmqt::Uri quri; - AppIdKeyPairs appIdKeyPairs; + AppInfos appIdKeyPairs; if (!d_isFSMWorkflow) { // Check qlist offset in the replicated journal record sent by the // primary vs qlist offset maintained by self. A mismatch means that @@ -4303,9 +4305,9 @@ int FileStore::writeQueueCreationRecord( queueRecHeaderLen + paddedUriLen + FileStoreProtocol::k_HASH_LENGTH, appIdsAreaSize); - FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - appIdsBlock, - queueRecHeader->numAppIds()); + FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + appIdsBlock, + queueRecHeader->numAppIds()); } BALL_LOG_INFO_BLOCK @@ -4318,10 +4320,11 @@ int FileStore::writeQueueCreationRecord( BALL_LOG_OUTPUT_STREAM << ", queue [" << quri << "]" << ", with [" << appIdKeyPairs.size() << "] appId/appKey pairs "; - for (size_t n = 0; n < appIdKeyPairs.size(); ++n) { - BALL_LOG_OUTPUT_STREAM << " [" << appIdKeyPairs[n].first - << ", " << appIdKeyPairs[n].second - << "]"; + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit) { + BALL_LOG_OUTPUT_STREAM << " [" << cit->first << ", " + << cit->second << "]"; } } } @@ -5595,7 +5598,7 @@ int FileStore::writeMessageRecord(mqbi::StorageMessageAttributes* attributes, int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bsls::Types::Uint64 timestamp, bool isNewQueue) { @@ -5656,9 +5659,11 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, totalLength = sizeof(QueueRecordHeader) + queueUri.asString().length() + queueUriPadding + FileStoreProtocol::k_HASH_LENGTH; - - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { - const AppIdKeyPair& appIdKeyPair = appIdKeyPairs[i]; + size_t i = 0; + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit, ++i) { + const AppInfo& appIdKeyPair = *cit; BSLS_ASSERT_SAFE(!appIdKeyPair.first.empty()); BSLS_ASSERT_SAFE(!appIdKeyPair.second.isNull()); appIdWords[i] = bmqp::ProtocolUtil::calcNumWordsAndPadding( @@ -5763,7 +5768,10 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, qlistFilePos += FileStoreProtocol::k_HASH_LENGTH; // 3) Append AppIds and AppKeys - for (size_t i = 0; i < appIdKeyPairs.size(); ++i) { + size_t i = 0; + for (AppInfos::const_iterator cit = appIdKeyPairs.cbegin(); + cit != appIdKeyPairs.cend(); + ++cit, ++i) { // Append AppIdHeader. OffsetPtr appIdHeader(qlistFile.block(), @@ -5775,10 +5783,8 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, // Append AppId. OffsetPtr appId(qlistFile.block(), qlistFilePos); - bsl::memcpy(appId.get(), - appIdKeyPairs[i].first.c_str(), - appIdKeyPairs[i].first.length()); - qlistFilePos += appIdKeyPairs[i].first.length(); + bsl::memcpy(appId.get(), cit->first.c_str(), cit->first.length()); + qlistFilePos += cit->first.length(); // Append padding after AppId. @@ -5791,7 +5797,7 @@ int FileStore::writeQueueCreationRecord(DataStoreRecordHandle* handle, // above for explanation). char appIdHash[mqbs::FileStoreProtocol::k_HASH_LENGTH] = {0}; - bsl::memcpy(appIdHash, appIdKeyPairs[i].second.data(), k_KEY_LEN); + bsl::memcpy(appIdHash, cit->second.data(), k_KEY_LEN); OffsetPtr appHash(qlistFile.block(), qlistFilePos); bsl::memcpy(appHash.get(), appIdHash, diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index 84ec766b4..dcc77686a 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -214,8 +214,8 @@ class FileStore : public DataStore { typedef DataStoreConfig::QueueKeyInfoMapConstIter QueueKeyInfoMapConstIter; typedef DataStoreConfig::QueueKeyInfoMapInsertRc QueueKeyInfoMapInsertRc; - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfo AppInfo; + typedef mqbi::Storage::AppInfos AppInfos; typedef StorageCollectionUtil::StoragesMap StoragesMap; typedef StorageCollectionUtil::StorageMapIter StorageMapIter; @@ -774,7 +774,7 @@ class FileStore : public DataStore { int writeQueueCreationRecord(DataStoreRecordHandle* handle, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const AppIdKeyPairs& appIdKeyPairs, + const AppInfos& appIdKeyPairs, bsls::Types::Uint64 timestamp, bool isNewQueue) BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp index c235787ab..d5c826a4b 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp @@ -86,7 +86,7 @@ const int k_NODE_ID = 12345; // ALIASES typedef mqbs::FileStoreTestUtil_Record Record; -typedef mqbs::DataStore::AppIdKeyPairs AppIdKeyPairs; +typedef mqbs::DataStore::AppInfos AppInfos; typedef mqbs::FileStore::SyncPointOffsetPairs SyncPointOffsetPairs; typedef bsl::pair HandleRecordPair; @@ -369,7 +369,7 @@ struct Tester { bmqt::Uri(rec.d_uri, s_allocator_p), rec.d_queueKey, - AppIdKeyPairs(), + AppInfos(), rec.d_timestamp, true); // isNewQueue diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp index 42e3c1b7c..1053ff5cf 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.cpp @@ -330,10 +330,11 @@ int FileStoreProtocolUtil::calculateMd5Digest( return 0; } -void FileStoreProtocolUtil::loadAppIdKeyPairs( - bsl::vector >* appIdKeyPairs, - const MemoryBlock& appIdsBlock, - unsigned int numAppIds) +void FileStoreProtocolUtil::loadAppInfos( + bsl::unordered_set >* + appIdKeyPairs, + const MemoryBlock& appIdsBlock, + unsigned int numAppIds) { // PRECONDITIONS BSLS_ASSERT_SAFE(appIdKeyPairs); @@ -351,7 +352,7 @@ void FileStoreProtocolUtil::loadAppIdKeyPairs( const char* appIdBegin = appIdsBlock.base() + offset + sizeof(AppIdHeader); - appIdKeyPairs->emplace_back( + appIdKeyPairs->emplace( bsl::string(appIdBegin, paddedLen - appIdBegin[paddedLen - 1], appIdKeyPairs->get_allocator()), diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h index 15f71d71d..d91c05d9a 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.h @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -108,10 +109,11 @@ struct FileStoreProtocolUtil { const mwcu::BlobPosition& startPos, unsigned int length); - static void loadAppIdKeyPairs( - bsl::vector >* appIdKeyPairs, - const MemoryBlock& appIdsBlock, - unsigned int numAppIds); + static void + loadAppInfos(bsl::unordered_set >* + appIdKeyPairs, + const MemoryBlock& appIdsBlock, + unsigned int numAppIds); }; } // close package namespace diff --git a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp index b5db9e7c6..db2ce1bd2 100644 --- a/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestoreprotocolutil.t.cpp @@ -495,25 +495,25 @@ static void test3_lastJournalSyncPoint() } } -static void test4_loadAppIdKeyPairs() +static void test4_loadAppInfos() // ------------------------------------------------------------------------ // Testing: -// loadAppIdKeyPairs() +// loadAppInfos() // ------------------------------------------------------------------------ { - typedef bsl::pair AppIdKeyPair; - typedef bsl::vector AppIdKeyPairs; + typedef bsl::pair AppInfo; + typedef bsl::vector AppInfos; { // No appIds. char* p = static_cast(s_allocator_p->allocate(1)); mqbs::MemoryBlock mb(p, 1); - AppIdKeyPairs appIdKeyPairs(s_allocator_p); + AppInfos appIdKeyPairs(s_allocator_p); - mqbs::FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - mb, - 0); // no appIds + mqbs::FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + mb, + 0); // no appIds ASSERT_EQ(0u, appIdKeyPairs.size()); @@ -563,11 +563,11 @@ static void test4_loadAppIdKeyPairs() // Test. mqbs::MemoryBlock mb(p, totalSize); - AppIdKeyPairs appIdKeyPairs(s_allocator_p); + AppInfos appIdKeyPairs(s_allocator_p); - mqbs::FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - mb, - 1); // 1 appId + mqbs::FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + mb, + 1); // 1 appId ASSERT_EQ(1U, appIdKeyPairs.size()); ASSERT_EQ(appId, appIdKeyPairs[0].first); @@ -652,11 +652,11 @@ static void test4_loadAppIdKeyPairs() // Test. mqbs::MemoryBlock mb(p, totalSize); - AppIdKeyPairs appIdKeyPairs(s_allocator_p); + AppInfos appIdKeyPairs(s_allocator_p); - mqbs::FileStoreProtocolUtil::loadAppIdKeyPairs(&appIdKeyPairs, - mb, - numAppIds); + mqbs::FileStoreProtocolUtil::loadAppInfos(&appIdKeyPairs, + mb, + numAppIds); ASSERT_EQ(static_cast(numAppIds), appIdKeyPairs.size()); @@ -911,7 +911,7 @@ int main(int argc, char* argv[]) switch (_testCase) { case 0: case 5: test5_calculateMd5Digest(); break; - case 4: test4_loadAppIdKeyPairs(); break; + case 4: test4_loadAppInfos(); break; case 3: test3_lastJournalSyncPoint(); break; case 2: test2_lastJournalRecord(); break; case 1: test1_hasBmqHeader(); break; diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index e48ea3b21..89e6a5563 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -168,9 +168,9 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef ReplicatedStorage::RecordHandles RecordHandles; @@ -516,8 +516,8 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// Load into the specified 'buffer' the list of pairs of appId and /// appKey for all the virtual storages registered with this instance. - virtual void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const - BSLS_KEYWORD_OVERRIDE; + virtual void + loadVirtualStorageDetails(AppInfos* buffer) const BSLS_KEYWORD_OVERRIDE; virtual unsigned int numAutoConfirms() const BSLS_KEYWORD_OVERRIDE; @@ -767,8 +767,7 @@ inline bool InMemoryStorage::hasReceipt(const bmqt::MessageGUID&) const return true; } -inline void -InMemoryStorage::loadVirtualStorageDetails(AppIdKeyPairs* buffer) const +inline void InMemoryStorage::loadVirtualStorageDetails(AppInfos* buffer) const { return d_virtualStorageCatalog.loadVirtualStorageDetails(buffer); diff --git a/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp b/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp index 1d268a495..bd8e8e4b8 100644 --- a/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp +++ b/src/groups/mqb/mqbs/mqbs_storageprintutil.cpp @@ -175,11 +175,11 @@ void StoragePrintUtil::printRecoveredStorages( out << " ["; } - AppIdKeyPairs appIdKeyPairs; + AppInfos appIdKeyPairs; rs->loadVirtualStorageDetails(&appIdKeyPairs); BSLS_ASSERT_SAFE(numVS == appIdKeyPairs.size()); - for (AppIdKeyPairsCIter vit = appIdKeyPairs.begin(); + for (AppInfosCIter vit = appIdKeyPairs.begin(); vit != appIdKeyPairs.end(); ++vit) { BSLS_ASSERT_SAFE(rs->hasVirtualStorage(vit->second)); diff --git a/src/groups/mqb/mqbs/mqbs_storageprintutil.h b/src/groups/mqb/mqbs/mqbs_storageprintutil.h index 0a20a02b5..6c420dba1 100644 --- a/src/groups/mqb/mqbs/mqbs_storageprintutil.h +++ b/src/groups/mqb/mqbs/mqbs_storageprintutil.h @@ -66,8 +66,8 @@ namespace mqbs { struct StoragePrintUtil { private: // PRIVATE TYPES - typedef mqbi::StorageManager::AppIdKeyPairs AppIdKeyPairs; - typedef AppIdKeyPairs::const_iterator AppIdKeyPairsCIter; + typedef mqbi::StorageManager::AppInfos AppInfos; + typedef AppInfos::const_iterator AppInfosCIter; public: // TYPES diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp index e75bc5c9c..3a8b41ef1 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp @@ -565,8 +565,7 @@ bool VirtualStorageCatalog::hasVirtualStorage(const bsl::string& appId, return hasVs; } -void VirtualStorageCatalog::loadVirtualStorageDetails( - AppIdKeyPairs* buffer) const +void VirtualStorageCatalog::loadVirtualStorageDetails(AppInfos* buffer) const { // PRECONDITIONS BSLS_ASSERT_SAFE(buffer); @@ -575,7 +574,7 @@ void VirtualStorageCatalog::loadVirtualStorageDetails( cit != d_virtualStorages.end(); ++cit) { BSLS_ASSERT_SAFE(cit->key2() == cit->value()->appKey()); - buffer->push_back(bsl::make_pair(cit->key1(), cit->key2())); + buffer->emplace(bsl::make_pair(cit->key1(), cit->key2())); } } diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h index 6b95f79d1..a3e988064 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h @@ -76,9 +76,9 @@ class VirtualStorageCatalog { public: // TYPES - typedef mqbi::Storage::AppIdKeyPair AppIdKeyPair; + typedef mqbi::Storage::AppInfo AppInfo; - typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs; + typedef mqbi::Storage::AppInfos AppInfos; typedef unsigned int Ordinal; @@ -278,7 +278,7 @@ class VirtualStorageCatalog { /// Load into the specified 'buffer' the list of pairs of appId and /// appKey for all the virtual storages registered with this instance. - void loadVirtualStorageDetails(AppIdKeyPairs* buffer) const; + void loadVirtualStorageDetails(AppInfos* buffer) const; /// Return the number of messages in the virtual storage associated with /// the specified 'appKey'. Behavior is undefined unless a virtual