diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index f1f5d8d91..2723c0830 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -2848,6 +2848,7 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain, oldCfgAppIds, newCfgAppIds); + // TODO: This should be one call - one QueueUpdateAdvisory for all Apps bsl::unordered_set::const_iterator it = addedIds.cbegin(); for (; it != addedIds.cend(); ++it) { dispatcher()->execute( diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 8822876f4..609df8b01 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -139,29 +139,27 @@ void createQueueUriKey(bmqt::Uri* out, } void afterAppIdRegisteredDispatched( - mqbi::Queue* queue, - const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo) + mqbi::Queue* queue, + const mqbc::ClusterStateQueueInfo::AppInfos& appInfos) { // executed by the *QUEUE DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); - queue->queueEngine()->afterAppIdRegistered( - mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second)); + queue->queueEngine()->afterAppIdRegistered(appInfos); } void afterAppIdUnregisteredDispatched( - mqbi::Queue* queue, - const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo) + mqbi::Queue* queue, + const mqbc::ClusterStateQueueInfo::AppInfos& appInfos) { // executed by the *QUEUE DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); - queue->queueEngine()->afterAppIdUnregistered( - mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second)); + queue->queueEngine()->afterAppIdUnregistered(appInfos); } void handleHolderDummy(const bsl::shared_ptr& handle) @@ -4375,10 +4373,6 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(!d_cluster_p->isRemote()); - if (!d_cluster_p->isCSLModeEnabled()) { - return; // RETURN - } - if (!uri.isValid()) { // This is an appID update for the entire domain, instead of any // individual queue. Nothing to do for the queue helper. @@ -4394,53 +4388,51 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri, const int partitionId = qiter->second->partitionId(); BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID); - for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); - ++cit) { + if (d_cluster_p->isCSLModeEnabled()) { if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { // Note: In non-CSL mode, the queue creation callback is // invoked at replica nodes when they receive a queue creation // record from the primary in the partition stream. - mqbi::Storage::AppInfos one(1, d_allocator_p); - one.emplace(*cit); - d_storageManager_p->updateQueueReplica( partitionId, uri, qiter->second->key(), - one, + addedAppIds, d_clusterState_p->domainStates() .at(uri.qualifiedDomain()) ->domain()); } - if (queue) { - d_cluster_p->dispatcher()->execute( - bdlf::BindUtil::bind(afterAppIdRegisteredDispatched, - queue, - *cit), - queue); + + for (AppInfosCIter cit = removedAppIds.cbegin(); + cit != removedAppIds.cend(); + ++cit) { + if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { + // Note: In non-CSL mode, the queue deletion callback is + // invoked at replica nodes when they receive a queue deletion + // record from the primary in the partition stream. + d_storageManager_p->unregisterQueueReplica( + partitionId, + uri, + qiter->second->key(), + cit->second); + } } } - for (AppInfosCIter cit = removedAppIds.cbegin(); - cit != removedAppIds.cend(); - ++cit) { - if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) { - // Note: In non-CSL mode, the queue deletion callback is - // invoked at replica nodes when they receive a queue deletion - // record from the primary in the partition stream. - d_storageManager_p->unregisterQueueReplica(partitionId, - uri, - qiter->second->key(), - cit->second); - } - if (queue) { - d_cluster_p->dispatcher()->execute( - bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched, - queue, - *cit), - queue); - } + if (queue) { + // TODO: replace with one call + d_cluster_p->dispatcher()->execute( + bdlf::BindUtil::bind(afterAppIdRegisteredDispatched, + queue, + addedAppIds), + queue); + + d_cluster_p->dispatcher()->execute( + bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched, + queue, + removedAppIds), + queue); } bmqu::Printer printer1(&addedAppIds); diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 750da298c..1f1928ffd 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -68,32 +68,6 @@ void queueHolderDummy(const bsl::shared_ptr& queue) BALL_LOG_INFO << "Deleted queue '" << queue->uri().canonical() << "'"; } -void afterAppIdRegisteredDispatched(mqbi::Queue* queue, - const bsl::string& appId) -{ - // executed by the *QUEUE DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); - - queue->queueEngine()->afterAppIdRegistered( - mqbi::Storage::AppInfo(appId, mqbu::StorageKey())); -} - -void afterAppIdUnregisteredDispatched(mqbi::Queue* queue, - const bsl::string& appId) -{ - // executed by the *QUEUE DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue)); - - // Note: Inputing nullKey here is okay since this routine will be removed - // when we switch to CSL workflow. - queue->queueEngine()->afterAppIdUnregistered( - mqbi::Storage::AppInfo(appId, mqbu::StorageKey())); -} - /// Validates an application subscription. bool validdateSubscriptionExpression(bsl::ostream& errorDescription, const mqbconfm::Expression& expression, @@ -463,51 +437,6 @@ int Domain::configure(bsl::ostream& errorDescription, BSLS_ASSERT_OPT(oldConfig.has_value()); BSLS_ASSERT_OPT(d_config.has_value()); - // In non-CSL mode, manually dispatch AppId registration callbacks. - if (!d_cluster_sp->isCSLModeEnabled() && - d_config.value().mode().isFanoutValue()) { - // Compute list of added and removed App IDs. - bsl::unordered_set oldCfgAppIds( - oldConfig.value().mode().fanout().appIDs().cbegin(), - oldConfig.value().mode().fanout().appIDs().cend(), - d_allocator_p); - bsl::unordered_set newCfgAppIds( - d_config.value().mode().fanout().appIDs().cbegin(), - d_config.value().mode().fanout().appIDs().cend(), - d_allocator_p); - - bsl::unordered_set addedIds, removedIds; - mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds, - &removedIds, - oldCfgAppIds, - newCfgAppIds); - - bslmt::LockGuard guard(&d_mutex); - - // Invoke callbacks for each added and removed ID on each queue. - bsl::unordered_set::const_iterator it = - addedIds.cbegin(); - QueueMap::const_iterator qIt; - for (; it != addedIds.cend(); it++) { - for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) { - d_dispatcher_p->execute( - bdlf::BindUtil::bind(afterAppIdRegisteredDispatched, - qIt->second.get(), - *it), - qIt->second.get()); - } - } - for (it = removedIds.cbegin(); it != removedIds.cend(); ++it) { - for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) { - d_dispatcher_p->execute( - bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched, - qIt->second.get(), - *it), - qIt->second.get()); - } - } - } - // Notify the 'cluster' of the updated configuration, so it can write // any needed update-advisories to the CSL. d_cluster_sp->onDomainReconfigured(*this, diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index c300dc02d..e7aa17af5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -161,7 +161,7 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure) d_allocator_p); } - rc = d_queueEngine_mp->configure(errorDescription); + rc = d_queueEngine_mp->configure(errorDescription, isReconfigure); if (rc != 0) { return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN } diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.cpp b/src/groups/mqb/mqbblp/mqbblp_queue.cpp index 3fc107fee..b3c565103 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queue.cpp @@ -343,7 +343,7 @@ void Queue::convertToLocalDispatched() d_state.setId(bmqp::QueueId::k_PRIMARY_QUEUE_ID); createLocal(); - rc = d_localQueue_mp->configure(errorDescription, true); + rc = d_localQueue_mp->configure(errorDescription, false); if (rc != 0) { BALL_LOG_ERROR << "#QUEUE_CONVERTION_FAILURE " << d_state.uri() @@ -483,7 +483,6 @@ Queue::Queue(const bmqt::Uri& uri, // storage. d_state.setStorageManager(storageManager) - .setAppKeyGenerator(storageManager) .setMiscWorkThreadPool(threadPool) .setRoutingConfig(routingCfg) .setMessageThrottleConfig(messageThrottleConfig); diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp index dfa95e75b..bca29a0fb 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp @@ -170,22 +170,18 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value) return *this; } -void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key) +void QueueConsumptionMonitor::registerSubStream(const bsl::string& id) { // Should always be called from the queue thread, but will be invoked from // the cluster thread once upon queue creation. // PRECONDITIONS - BSLS_ASSERT_SAFE(key != mqbu::StorageKey::k_NULL_KEY || - d_subStreamInfos.empty()); - BSLS_ASSERT_SAFE(d_subStreamInfos.find(mqbu::StorageKey::k_NULL_KEY) == - d_subStreamInfos.end()); - BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end()); + BSLS_ASSERT_SAFE(d_subStreamInfos.find(id) == d_subStreamInfos.end()); - d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo())); + d_subStreamInfos.insert(bsl::make_pair(id, SubStreamInfo())); } -void QueueConsumptionMonitor::unregisterSubStream(const mqbu::StorageKey& key) +void QueueConsumptionMonitor::unregisterSubStream(const bsl::string& id) { // executed by the *QUEUE DISPATCHER* thread @@ -193,7 +189,7 @@ void QueueConsumptionMonitor::unregisterSubStream(const mqbu::StorageKey& key) BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - SubStreamInfoMapConstIter iter = d_subStreamInfos.find(key); + SubStreamInfoMapConstIter iter = d_subStreamInfos.find(id); BSLS_ASSERT_SAFE(iter != d_subStreamInfos.end()); d_subStreamInfos.erase(iter); } @@ -231,7 +227,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) iter != last; ++iter) { SubStreamInfo& info = iter->second; - const mqbu::StorageKey& appKey = iter->first; + const bsl::string& id = iter->first; if (info.d_messageSent) { // Queue is 'alive' because at least one message was sent // since the last 'timer'. @@ -241,7 +237,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) if (info.d_state == State::e_IDLE) { // object was in idle state - onTransitionToAlive(&info, appKey); + onTransitionToAlive(&info, id); continue; // CONTINUE } @@ -253,7 +249,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) // No delivered messages in the last 'maxIdleTime'. // Call callback to log alarm if there are undelivered messages. - const bool haveUndelivered = d_loggingCb(appKey, + const bool haveUndelivered = d_loggingCb(id, info.d_state == State::e_ALIVE); @@ -269,16 +265,15 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) // so transition to alive. if (info.d_state == State::e_IDLE) { info.d_lastKnownGoodTimer = d_currentTimer; - onTransitionToAlive(&info, appKey); + onTransitionToAlive(&info, id); } } } } } -void QueueConsumptionMonitor::onTransitionToAlive( - SubStreamInfo* subStreamInfo, - const mqbu::StorageKey& appKey) +void QueueConsumptionMonitor::onTransitionToAlive(SubStreamInfo* subStreamInfo, + const bsl::string& id) { // executed by the *QUEUE DISPATCHER* thread @@ -291,12 +286,7 @@ void QueueConsumptionMonitor::onTransitionToAlive( bdlma::LocalSequentialAllocator<2048> localAllocator(0); bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator); - bsl::string appId; - - if (!appKey.isNull() && - d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) { - uriBuilder.setId(appId); - } + uriBuilder.setId(id); bmqt::Uri uri(&localAllocator); uriBuilder.uri(&uri); diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h index 0d570e64d..7fde6762f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h @@ -82,7 +82,7 @@ // // // 15 seconds later - T + 60s // // consume first message -// monitor.onMessageSent(mqbu::StorageKey::k_NULL_KEY); +// monitor.onMessageSent(id); // // // 15 seconds later - T + 75s // monitor.onTimer(bsls::TimeUtil::getTimer()); // log INFO: back to active @@ -95,7 +95,7 @@ // // // 15 seconds later - T + 120s // // consume second message -// monitor.onMessageSent(mqbu::StorageKey::k_NULL_KEY); +// monitor.onMessageSent(id); // // // 15 seconds later - T + 135s // monitor.onTimer(bsls::TimeUtil::getTimer()); // log INFO: back to active @@ -105,7 +105,6 @@ #include #include -#include // BDE #include @@ -211,10 +210,10 @@ class QueueConsumptionMonitor { }; /// Callback function to log alarm info when queue state transitions to - /// idle. First argument is the app key, second argument is a boolean flag + /// idle. First argument is the app id, second argument is a boolean flag /// to enable logging. If `enableLog` is `false`, logging is skipped. /// Return `true` if there are un-delivered messages and `false` otherwise. - typedef bsl::function + typedef bsl::function LoggingCb; private: @@ -239,10 +238,7 @@ class QueueConsumptionMonitor { State::Enum d_state; // The current state. }; - typedef bsl::unordered_map > - SubStreamInfoMap; + typedef bsl::unordered_map SubStreamInfoMap; typedef SubStreamInfoMap::iterator SubStreamInfoMapIter; @@ -273,20 +269,20 @@ class QueueConsumptionMonitor { // ACCESSORS - /// Return the `SubStreamInfo` corresponding to the specified `key`. - const SubStreamInfo& subStreamInfo(const mqbu::StorageKey& key) const; + /// Return the `SubStreamInfo` corresponding to the specified `id`. + const SubStreamInfo& subStreamInfo(const bsl::string& id) const; - /// Return the `SubStreamInfo` corresponding to the specified `key`. It - /// is an error to specify a `key` that has not been previously + /// Return the `SubStreamInfo` corresponding to the specified `id`. It + /// is an error to specify an `id` that has not been previously /// registered via `registerSubStream`. - SubStreamInfo& subStreamInfo(const mqbu::StorageKey& key); + SubStreamInfo& subStreamInfo(const bsl::string& id); // MANIPULATORS /// Update the specified `subStreamInfo`, associated to the specified - /// `appKey`, and write log, upon transition to alive state. - void onTransitionToAlive(SubStreamInfo* subStreamInfo, - const mqbu::StorageKey& appKey); + /// `id`, and write log, upon transition to alive state. + void onTransitionToAlive(SubStreamInfo* subStreamInfo, + const bsl::string& id); public: // TRAITS @@ -317,15 +313,12 @@ class QueueConsumptionMonitor { /// this object. QueueConsumptionMonitor& setMaxIdleTime(bsls::Types::Int64 value); - /// Register the substream identified by the specified `key`. - /// `key` may be `StorageKey::k_NULL_KEY`, in which case no other key may - /// be registered via this function. It is illegal to register the same - /// substream more than once. - void registerSubStream(const mqbu::StorageKey& key); + /// Register the substream identified by the specified `id`. + void registerSubStream(const bsl::string& id); - /// Stop monitoring the substream identified by the specified `key`. - /// `key` must have been previously registered via `registerSubStream`. - void unregisterSubStream(const mqbu::StorageKey& key); + /// Stop monitoring the substream identified by the specified `id`. + /// `id` must have been previously registered via `registerSubStream`. + void unregisterSubStream(const bsl::string& id); /// Put the object back in construction state. void reset(); @@ -337,17 +330,17 @@ class QueueConsumptionMonitor { void onTimer(bsls::Types::Int64 currentTimer); /// Notify the monitor that one or more messages were sent during the - /// current time period for the substream specified by `key`. It is an - /// error to specify a `key` that has not been previously registered via + /// current time period for the substream specified by `id`. It is an + /// error to specify an `id` that has not been previously registered via /// `registerSubStream`. - void onMessageSent(const mqbu::StorageKey& key); + void onMessageSent(const bsl::string& id); // ACCESSORS /// Return the current activity status for the monitored queue for the - /// substream specified by `key`. It is an error to specify a `key` + /// substream specified by `id`. It is an error to specify a `id` /// that has not been previously registered via `registerSubStream`. - State::Enum state(const mqbu::StorageKey& key) const; + State::Enum state(const bsl::string& id) const; }; // FREE OPERATORS @@ -383,7 +376,7 @@ bsl::ostream& operator<<(bsl::ostream& stream, // ----------------------------- inline QueueConsumptionMonitor::SubStreamInfo& -QueueConsumptionMonitor::subStreamInfo(const mqbu::StorageKey& key) +QueueConsumptionMonitor::subStreamInfo(const bsl::string& id) { // executed by the *QUEUE DISPATCHER* thread @@ -391,13 +384,13 @@ QueueConsumptionMonitor::subStreamInfo(const mqbu::StorageKey& key) BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - SubStreamInfoMapIter iter = d_subStreamInfos.find(key); + SubStreamInfoMapIter iter = d_subStreamInfos.find(id); BSLS_ASSERT_SAFE(iter != d_subStreamInfos.end()); return iter->second; } inline const QueueConsumptionMonitor::SubStreamInfo& -QueueConsumptionMonitor::subStreamInfo(const mqbu::StorageKey& key) const +QueueConsumptionMonitor::subStreamInfo(const bsl::string& id) const { // executed by the *QUEUE DISPATCHER* thread @@ -405,12 +398,12 @@ QueueConsumptionMonitor::subStreamInfo(const mqbu::StorageKey& key) const BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - SubStreamInfoMapConstIter iter = d_subStreamInfos.find(key); + SubStreamInfoMapConstIter iter = d_subStreamInfos.find(id); BSLS_ASSERT_SAFE(iter != d_subStreamInfos.end()); return iter->second; } -inline void QueueConsumptionMonitor::onMessageSent(const mqbu::StorageKey& key) +inline void QueueConsumptionMonitor::onMessageSent(const bsl::string& id) { // executed by the *QUEUE DISPATCHER* thread @@ -424,11 +417,11 @@ inline void QueueConsumptionMonitor::onMessageSent(const mqbu::StorageKey& key) return; // RETURN } - subStreamInfo(key).d_messageSent = true; + subStreamInfo(id).d_messageSent = true; } inline QueueConsumptionMonitor::State::Enum -QueueConsumptionMonitor::state(const mqbu::StorageKey& key) const +QueueConsumptionMonitor::state(const bsl::string& id) const { // executed by the *QUEUE DISPATCHER* thread @@ -436,7 +429,7 @@ QueueConsumptionMonitor::state(const mqbu::StorageKey& key) const BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - return subStreamInfo(key).d_state; + return subStreamInfo(id).d_state; } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp index 421ee6bed..a196b8e35 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp @@ -65,7 +65,7 @@ static mqbconfm::Domain getDomainConfig() struct Test : bmqtst::Test { // PUBLIC DATA - mqbu::StorageKey d_storageKey; + bsl::string d_id; mqbmock::Dispatcher d_dispatcher; bdlbb::PooledBlobBufferFactory d_bufferFactory; mqbmock::Cluster d_cluster; @@ -74,19 +74,19 @@ struct Test : bmqtst::Test { QueueState d_queueState; QueueConsumptionMonitor d_monitor; mqbs::InMemoryStorage d_storage; - bsl::set d_haveUndelivered; + bsl::set d_haveUndelivered; // CREATORS Test(); ~Test() BSLS_KEYWORD_OVERRIDE; // MANIPULATORS - void putMessage(mqbu::StorageKey key = mqbu::StorageKey::k_NULL_KEY); - bool loggingCb(const mqbu::StorageKey& appKey, bool enableLog); + void putMessage(const bsl::string& id = bsl::string()); + bool loggingCb(const bsl::string& id, bool enableLog); }; Test::Test() -: d_storageKey(mqbu::StorageKey::k_NULL_KEY) +: d_id() , d_dispatcher(s_allocator_p) , d_bufferFactory(1024, s_allocator_p) , d_cluster(&d_bufferFactory, s_allocator_p) @@ -95,7 +95,7 @@ Test::Test() , d_queueState(&d_queue, bmqt::Uri("bmq://bmq.test.local/test_queue"), 802701, - d_storageKey, + mqbu::StorageKey::k_NULL_KEY, 1, &d_domain, d_cluster._resources(), @@ -103,12 +103,12 @@ Test::Test() , d_monitor(&d_queueState, bdlf::BindUtil::bind(&Test::loggingCb, this, - bdlf::PlaceHolders::_1, // appKey + bdlf::PlaceHolders::_1, // id bdlf::PlaceHolders::_2), // enableLog s_allocator_p) , d_storage(d_queue.uri(), - d_storageKey, + mqbu::StorageKey::k_NULL_KEY, mqbs::DataStore::k_INVALID_PARTITION_ID, getDomainConfig(), d_domain.capacityMeter(), @@ -153,17 +153,17 @@ Test::~Test() d_domain.unregisterQueue(&d_queue); } -void Test::putMessage(mqbu::StorageKey key) +void Test::putMessage(const bsl::string& id) { - d_monitor.onMessageSent(key); - d_haveUndelivered.insert(key); + d_monitor.onMessageSent(id); + d_haveUndelivered.insert(id); } -bool Test::loggingCb(const mqbu::StorageKey& appKey, const bool enableLog) +bool Test::loggingCb(const bsl::string& id, const bool enableLog) { BALL_LOG_SET_CATEGORY("MQBBLP.QUEUECONSUMPTIONMONITORTEST"); - bool haveUndelivered = d_haveUndelivered.contains(appKey); + bool haveUndelivered = d_haveUndelivered.contains(id); if (enableLog && haveUndelivered) { BMQTSK_ALARMLOG_ALARM("QUEUE_STUCK") @@ -190,17 +190,15 @@ TEST_F(Test, doNotMonitor) bmqtst::ScopedLogObserver observer(ball::Severity::INFO, s_allocator_p); - d_monitor.registerSubStream(d_storageKey); + d_monitor.registerSubStream(d_id); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(0); d_monitor.onTimer(1000000); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(observer.records().size(), 0U); } @@ -220,16 +218,14 @@ TEST_F(Test, emptyQueue) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream(d_storageKey); + d_monitor.registerSubStream(d_id); d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); } @@ -252,28 +248,24 @@ TEST_F(Test, putAliveIdleSendAlive) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream(d_storageKey); + d_monitor.registerSubStream(d_id); putMessage(); d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME - 1); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(logObserver.records().size(), ++expectedLogRecords); ASSERT(bmqtst::ScopedLogObserverUtil::recordMessageMatch( logObserver.records().back(), @@ -281,12 +273,10 @@ TEST_F(Test, putAliveIdleSendAlive) s_allocator_p)); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_IDLE); - d_monitor.onMessageSent(d_storageKey); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_IDLE); + d_monitor.onMessageSent(d_id); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(3 * k_MAX_IDLE_TIME + 2); @@ -295,8 +285,7 @@ TEST_F(Test, putAliveIdleSendAlive) logObserver.records().back(), "no longer appears to be stuck", s_allocator_p)); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); } TEST_F(Test, putAliveIdleEmptyAlive) @@ -312,23 +301,20 @@ TEST_F(Test, putAliveIdleEmptyAlive) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream(d_storageKey); + d_monitor.registerSubStream(d_id); putMessage(); d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_IDLE); - d_haveUndelivered.erase(d_storageKey); + d_haveUndelivered.erase(d_id); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); } TEST_F(Test, changeMaxIdleTime) @@ -345,36 +331,30 @@ TEST_F(Test, changeMaxIdleTime) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream(d_storageKey); + d_monitor.registerSubStream(d_id); putMessage(); d_monitor.onTimer(0); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_IDLE); bmqtst::ScopedLogObserver logObserver(ball::Severity::INFO, s_allocator_p); d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME * 2); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), 0u); d_monitor.onTimer(k_MAX_IDLE_TIME * 2); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(k_MAX_IDLE_TIME * 2 + k_MAX_IDLE_TIME * 2); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(k_MAX_IDLE_TIME * 2 + k_MAX_IDLE_TIME * 2 + 1); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_IDLE); } TEST_F(Test, reset) @@ -390,13 +370,12 @@ TEST_F(Test, reset) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream(d_storageKey); + d_monitor.registerSubStream(d_id); putMessage(); d_monitor.onTimer(0); - ASSERT_EQ(d_monitor.state(d_storageKey), - QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(d_id), QueueConsumptionMonitor::State::e_ALIVE); bmqtst::ScopedLogObserver logObserver(ball::Severity::INFO, s_allocator_p); @@ -425,37 +404,39 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) mqbu::StorageKey key1, key2; key1.fromHex("ABCDEF1234"); key2.fromHex("1234ABCDEF"); + bsl::string id1("app1"); + bsl::string id2("app2"); d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); bmqu::MemOutStream errorDescription(s_allocator_p); - d_storage.addVirtualStorage(errorDescription, "app1", key1); - d_storage.addVirtualStorage(errorDescription, "app2", key2); + d_storage.addVirtualStorage(errorDescription, id1, key1); + d_storage.addVirtualStorage(errorDescription, id2, key2); - d_monitor.registerSubStream(key1); - d_monitor.registerSubStream(key2); + d_monitor.registerSubStream(id1); + d_monitor.registerSubStream(id2); - putMessage(key1); - putMessage(key2); + putMessage(id1); + putMessage(id2); d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME - 1); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords += 2); @@ -467,12 +448,12 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) } d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_IDLE); - d_monitor.onMessageSent(key1); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); + d_monitor.onMessageSent(id1); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(3 * k_MAX_IDLE_TIME + 2); @@ -483,10 +464,10 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) "to be stuck.", s_allocator_p)); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_IDLE); - d_monitor.onMessageSent(key2); + d_monitor.onMessageSent(id2); d_monitor.onTimer(3 * k_MAX_IDLE_TIME + 3); ASSERT_EQ(logObserver.records().size(), expectedLogRecords += 1); ASSERT(bmqtst::ScopedLogObserverUtil::recordMessageMatch( @@ -494,8 +475,8 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) "Queue 'bmq://bmq.test.local/test_queue\\?id=app2' no longer appears " "to be stuck.", s_allocator_p)); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id1), QueueConsumptionMonitor::State::e_ALIVE); + ASSERT_EQ(d_monitor.state(id2), QueueConsumptionMonitor::State::e_ALIVE); } TEST_F(Test, usage) @@ -512,7 +493,7 @@ TEST_F(Test, usage) monitor.setMaxIdleTime(20); - d_monitor.registerSubStream(d_storageKey); + d_monitor.registerSubStream(d_id); putMessage(); putMessage(); @@ -531,7 +512,7 @@ TEST_F(Test, usage) monitor.onTimer(T += 15); // nothing is logged ASSERT_EQ(logObserver.records().size(), 1u); // 15 seconds later - T + 60s - consume first message, inform monitor: - monitor.onMessageSent(d_storageKey); + monitor.onMessageSent(d_id); // 15 seconds later - T + 75s monitor.onTimer(T += 15); // log INFO: back to active @@ -543,7 +524,7 @@ TEST_F(Test, usage) monitor.onTimer(T += 15); // log ALARM ASSERT_EQ(logObserver.records().size(), 3u); // 15 seconds later - T + 120s - d_haveUndelivered.erase(d_storageKey); + d_haveUndelivered.erase(d_id); monitor.onTimer(T += 15); // log INFO: back to active ASSERT_EQ(logObserver.records().size(), 4u); diff --git a/src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp b/src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp index 4cae090d1..ccdd413ca 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp @@ -499,8 +499,6 @@ void QueueEngineTester::init(const mqbconfm::Domain& domainConfig, d_allocator_p), d_allocator_p); - d_queueState_mp->setAppKeyGenerator(&d_appKeyGenerator); - bmqp_ctrlmsg::RoutingConfiguration routingConfig; if (domainConfig.mode().isBroadcastValue()) { diff --git a/src/groups/mqb/mqbblp/mqbblp_queueenginetester.h b/src/groups/mqb/mqbblp/mqbblp_queueenginetester.h index 519fd1802..bdd19a6a8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueenginetester.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueenginetester.h @@ -98,7 +98,6 @@ #include #include #include -#include #include #include #include @@ -237,7 +236,6 @@ class QueueEngineTester { // dropped (i.e. fully released) but // needed to stay alive to correctly // test post-drop state - mqbmock::AppKeyGenerator d_appKeyGenerator; size_t d_messageCount; @@ -466,8 +464,6 @@ class QueueEngineTester { /// Note that pointers to these handles may be left dangling. void dropHandles(); - mqbmock::AppKeyGenerator& appKeyGenerator(); - /// Load into the specified `value` previously cached parameters sent /// upstream for the specified `appId`. bool getUpstreamParameters(bmqp_ctrlmsg::StreamParameters* value, @@ -623,7 +619,7 @@ QueueEngineTester::createQueueEngineHelper(mqbi::QueueEngine* engine) BSLS_ASSERT_OPT(d_mockQueue_sp); bmqu::MemOutStream errorDescription(d_allocator_p); - int rc = engine->configure(errorDescription); + int rc = engine->configure(errorDescription, false); BSLS_ASSERT_OPT(rc == 0); // Set the engine on the Queue @@ -650,11 +646,6 @@ inline T* QueueEngineTester::createQueueEngine() return result; } -inline mqbmock::AppKeyGenerator& QueueEngineTester::appKeyGenerator() -{ - return d_appKeyGenerator; -} - inline void QueueEngineTester::synchronizeScheduler() { d_mockCluster_mp->waitForScheduler(); diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.h b/src/groups/mqb/mqbblp/mqbblp_queuestate.h index 1d2468d4c..8a7a9a2b8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.h +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.h @@ -62,9 +62,6 @@ namespace mqbcmd { class QueueState; } namespace mqbi { -class AppKeyGenerator; -} -namespace mqbi { class Storage; } namespace mqbi { @@ -148,9 +145,6 @@ class QueueState { mqbi::StorageManager* d_storageManager_p; // Storage manager to use. - mqbi::AppKeyGenerator* d_appKeyGenerator_p; - // App key generator to use. - const mqbi::ClusterResources d_resources; bdlmt::FixedThreadPool* d_miscWorkThreadPool_p; @@ -221,7 +215,6 @@ class QueueState { QueueState& setPartitionId(int value); QueueState& setStorage(StorageMp& value); QueueState& setStorageManager(mqbi::StorageManager* value); - QueueState& setAppKeyGenerator(mqbi::AppKeyGenerator* value); QueueState& setRoutingConfig(const bmqp_ctrlmsg::RoutingConfiguration& routingConfig); QueueState& setMessageThrottleConfig( @@ -306,8 +299,6 @@ class QueueState { mqbi::Queue* queue() const; mqbi::Storage* storage() const; mqbi::StorageManager* storageManager() const; - mqbi::AppKeyGenerator* appKeyGenerator() const; - bool isCSLModeEnabled() const; const bmqp_ctrlmsg::RoutingConfiguration& routingConfig() const; const mqbcfg::MessageThrottleConfig& messageThrottleConfig() const; const bmqt::Uri& uri() const; @@ -404,12 +395,6 @@ inline QueueState& QueueState::setStorageManager(mqbi::StorageManager* value) return *this; } -inline QueueState& QueueState::setAppKeyGenerator(mqbi::AppKeyGenerator* value) -{ - d_appKeyGenerator_p = value; - return *this; -} - inline QueueState& QueueState::setRoutingConfig( const bmqp_ctrlmsg::RoutingConfiguration& routingConfig) { @@ -598,11 +583,6 @@ inline mqbi::StorageManager* QueueState::storageManager() const return d_storageManager_p; } -inline mqbi::AppKeyGenerator* QueueState::appKeyGenerator() const -{ - return d_appKeyGenerator_p; -} - inline const bmqp_ctrlmsg::RoutingConfiguration& QueueState::routingConfig() const { @@ -620,11 +600,6 @@ inline const bmqt::Uri& QueueState::uri() const return d_uri; } -inline bool QueueState::isCSLModeEnabled() const -{ - return d_domain_p->cluster()->isCSLModeEnabled(); -} - inline const QueueState::SubQueues& QueueState::subQueues() const { return d_subStreams; diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 5796bed4a..559af8142 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -929,7 +929,8 @@ RelayQueueEngine::~RelayQueueEngine() // MANIPULATORS int RelayQueueEngine::configure( - BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription) + BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription, + BSLS_ANNOTATION_UNUSED bool isReconfigure) { return 0; } diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h index dcb06963b..5a32aaede 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h @@ -381,10 +381,11 @@ class RelayQueueEngine : public mqbi::QueueEngine { // MANIPULATORS // (virtual mqbi::QueueEngine) - /// Configure this instance. Return zero on success, non-zero value + /// Configure this instance. The specified `isReconfigure` flag indicate + /// if queue is being reconfigured. Return zero on success, non-zero value /// otherwise and populate the specified `errorDescription`. - virtual int - configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; + virtual int configure(bsl::ostream& errorDescription, + bool isReconfigure) BSLS_KEYWORD_OVERRIDE; /// Reset the internal state of this engine. If the optionally specified /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index 30595ba0c..d5cf95dee 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -152,7 +152,7 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription, RelayQueueEngine(d_state_p, mqbconfm::Domain(), d_allocator_p), d_allocator_p); - rc = d_queueEngine_mp->configure(errorDescription); + rc = d_queueEngine_mp->configure(errorDescription, isReconfigure); if (rc != 0) { return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN } @@ -256,7 +256,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription, bdlma::LocalSequentialAllocator<1024> localAllocator(d_allocator_p); bmqu::MemOutStream errorDesc(&localAllocator); - rc = d_queueEngine_mp->configure(errorDesc); + rc = d_queueEngine_mp->configure(errorDesc, isReconfigure); if (rc != 0) { BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE") << d_state_p->domain()->cluster()->name() << ": Partition [" diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 39b5d038a..4e6425b41 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -138,7 +137,7 @@ void RootQueueEngine::deliverMessages(AppState* app) } if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(numMessages > 0)) { - d_consumptionMonitor.onMessageSent(app->appKey()); + d_consumptionMonitor.onMessageSent(app->appId()); } if (app->isReadyForDelivery()) { @@ -152,9 +151,9 @@ void RootQueueEngine::deliverMessages(AppState* app) } RootQueueEngine::Apps::iterator -RootQueueEngine::makeSubStream(const bsl::string& appId, - const AppKeyCount& appKey, - unsigned int upstreamSubQueueId) +RootQueueEngine::makeSubStream(const bsl::string& appId, + const mqbu::StorageKey& appKey, + unsigned int upstreamSubQueueId) { AppStateSp app(new (*d_allocator_p) AppState(d_queueState_p->queue(), @@ -162,14 +161,12 @@ RootQueueEngine::makeSubStream(const bsl::string& appId, d_queueState_p->routingContext(), upstreamSubQueueId, appId, - appKey.first, + appKey, d_allocator_p), d_allocator_p); - bsl::pair rc = d_apps.insert(appId, - appKey, - app); - BSLS_ASSERT_SAFE(rc.second == Apps::e_INSERTED); + bsl::pair rc = d_apps.emplace(appId, app); + BSLS_ASSERT_SAFE(rc.second); return rc.first; } @@ -268,7 +265,6 @@ RootQueueEngine::RootQueueEngine(QueueState* queueState, bdlf::PlaceHolders::_2), // enableLog allocator) , d_apps(allocator) -, d_nullKeyCount(0) , d_hasAutoSubscriptions(false) , d_isFanout(domainConfig.mode().isFanoutValue()) , d_scheduler_p(queueState->scheduler()) @@ -299,7 +295,8 @@ RootQueueEngine::RootQueueEngine(QueueState* queueState, // MANIPULATORS // (virtual mqbi::QueueEngine) -int RootQueueEngine::configure(bsl::ostream& errorDescription) +int RootQueueEngine::configure(bsl::ostream& errorDescription, + bool isReconfigure) { enum RcEnum { // Return values @@ -325,14 +322,15 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription) for (numApps = 0; numApps < cfgAppIds.size(); ++numApps) { if (initializeAppId(cfgAppIds[numApps], errorDescription, - bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID)) { + bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID, + isReconfigure)) { return rc_APP_INITIALIZATION_ERROR; // RETURN } } for (unsigned int i = 0; i < subscriptions.size(); ++i) { - Apps::iterator itApp = d_apps.findByKey1(subscriptions[i].appId()); + Apps::iterator itApp = d_apps.find(subscriptions[i].appId()); if (itApp != d_apps.end()) { - int rc = itApp->value()->setSubscription( + int rc = itApp->second->setSubscription( subscriptions[i].expression()); if (rc != 0) { @@ -344,8 +342,8 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription) << d_queueState_p->queue()->description() << "' failed to compile auto subscription: '" << subscriptions[i].expression().text() - << "' for the '" << itApp->key1() - << "' app, rc: " << rc << ", reason: '" + << "' for the '" << itApp->first << "' app, rc: " << rc + << ", reason: '" << bmqeval::ErrorType::toString(errorType) << "'"; return rc_AUTO_SUBSCRIPTION_ERROR; // RETURN } @@ -362,7 +360,8 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription) numApps = 1; if (initializeAppId(bmqp::ProtocolUtil::k_DEFAULT_APP_ID, errorDescription, - bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID)) { + bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID, + isReconfigure)) { return rc_APP_INITIALIZATION_ERROR; // RETURN } // TODO: what is auto subscription "appId" for priority/broadcast? @@ -379,12 +378,11 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription) int rc = 0; if (subscriptions.size() == 1) { - rc = itApp->value()->setSubscription( - subscriptions[0].expression()); + rc = itApp->second->setSubscription(subscriptions[0].expression()); } else { mqbconfm::Expression empty(d_allocator_p); - rc = itApp->value()->setSubscription(empty); + rc = itApp->second->setSubscription(empty); } if (rc != 0) { @@ -412,21 +410,27 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription) int RootQueueEngine::initializeAppId(const bsl::string& appId, bsl::ostream& errorDescription, - unsigned int upstreamSubQueueId) + unsigned int upstreamSubQueueId, + bool isReconfigure) { - Apps::iterator iter = d_apps.findByKey1(appId); + Apps::iterator iter = d_apps.find(appId); if (iter != d_apps.end()) { mqbconfm::Expression empty(d_allocator_p); - iter->value()->setSubscription(empty); + iter->second->setSubscription(empty); // Don't reconfigure an AppId that is already registered. return 0; // RETURN } - mqbu::StorageKey appKey; + mqbu::StorageKey appKey = mqbu::StorageKey::k_NULL_KEY; unsigned int ordinal = 0; - if (!d_queueState_p->storage()->hasVirtualStorage(appId, + + // Do not attempt to find VirtualStorage if this is reconfiguration. + // Reconfiguration results in 'afterAppIdRegistered' which results in + // 'registerStorage' which calls 'authorize'. + if (!isReconfigure && + !d_queueState_p->storage()->hasVirtualStorage(appId, &appKey, &ordinal)) { BALL_LOG_ERROR << "#QUEUE_STORAGE_NOTFOUND " @@ -438,20 +442,24 @@ int RootQueueEngine::initializeAppId(const bsl::string& appId, << appId << "], queue: '" << d_queueState_p->queue()->description() << "'"; + // TODO: handle w/o asserting BSLS_ASSERT_SAFE(false && "Virtual storage does not exist for appId"); return -1; // RETURN } - BSLS_ASSERT_SAFE(!appKey.isNull()); - iter = makeSubStream(appId, AppKeyCount(appKey, 0), upstreamSubQueueId); + iter = makeSubStream(appId, appKey, upstreamSubQueueId); - iter->value()->authorize(appKey, ordinal); + if (!isReconfigure) { + BSLS_ASSERT_SAFE(!appKey.isNull()); + iter->second->authorize(appKey, ordinal); - d_consumptionMonitor.registerSubStream(appKey); + d_consumptionMonitor.registerSubStream(appId); - BALL_LOG_INFO << "Found virtual storage for appId [" << appId - << "], queue [" << d_queueState_p->uri() << "], appKey [" - << appKey << "], ordinal [" << ordinal << "]"; + BALL_LOG_INFO << "Found virtual storage for appId [" << appId + << "], queue [" << d_queueState_p->uri() << "], appKey [" + << appKey << "], ordinal [" << ordinal << "]"; + } + // else 'registerStorage' will update the authorization status of the App. return 0; } @@ -459,8 +467,8 @@ int RootQueueEngine::initializeAppId(const bsl::string& appId, void RootQueueEngine::resetState(bool isShuttingDown) { for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { - it->value()->undoRouting(); - it->value()->routing()->reset(); + it->second->undoRouting(); + it->second->routing()->reset(); } d_consumptionMonitor.reset(); @@ -487,13 +495,13 @@ void RootQueueEngine::rebuildSelectedApp( { BSLS_ASSERT_SAFE(handle); - const AppStateSp& app = itApp->value(); + const AppStateSp& app = itApp->second; BSLS_ASSERT_SAFE(app->routing()); bmqu::MemOutStream errorStream(d_allocator_p); - app->routing()->loadApp(itApp->key1().c_str(), + app->routing()->loadApp(itApp->first.c_str(), handle, &errorStream, info, @@ -540,7 +548,7 @@ int RootQueueEngine::rebuildInternalState(bsl::ostream& errorDescription) if (!previous) { continue; // CONTINUE } - Apps::iterator itApp = d_apps.findByKey1(previous->appId()); + Apps::iterator itApp = d_apps.find(previous->appId()); unsigned int upstreamSubQueueId = previous->upstreamSubQueueId(); if (itApp == d_apps.end()) { @@ -551,27 +559,12 @@ int RootQueueEngine::rebuildInternalState(bsl::ostream& errorDescription) // won't be part of 'd_apps'. So we explicitly update 'd_apps' // with this appId with the similar logic that we execute when // invoking 'getHandle' with an unregistered appId. - - AppKeyCount key2; - - if (d_queueState_p->isCSLModeEnabled()) { - key2 = AppKeyCount(mqbu::StorageKey::k_NULL_KEY, - d_nullKeyCount++); - } - else { - key2 = AppKeyCount( - d_queueState_p->appKeyGenerator()->generateAppKey( - previous->appId(), - d_queueState_p->partitionId()), - 0); - } - itApp = makeSubStream(previous->appId(), - key2, + mqbu::StorageKey::k_NULL_KEY, bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID); } - AppStateSp& app = itApp->value(); + AppStateSp& app = itApp->second; app->routing() = previous->routing(); app->setUpstreamSubQueueId(upstreamSubQueueId); @@ -772,7 +765,7 @@ mqbi::QueueHandle* RootQueueEngine::getHandle( if (handleParameters.readCount()) { // Ensure appId is authorized const bsl::string& appId = subStreamInfo.appId(); - Apps::iterator iter = d_apps.findByKey1(appId); + Apps::iterator iter = d_apps.find(appId); if (iter == d_apps.end()) { BMQTSK_ALARMLOG_ALARM("FANOUT_UNREGISTERED_APPID") @@ -782,35 +775,22 @@ mqbi::QueueHandle* RootQueueEngine::getHandle( " of this AppId" << BMQTSK_ALARMLOG_END; - AppKeyCount key2; - if (d_queueState_p->isCSLModeEnabled()) { - key2 = AppKeyCount(mqbu::StorageKey::k_NULL_KEY, - d_nullKeyCount++); - } - else { - key2 = AppKeyCount( - d_queueState_p->appKeyGenerator()->generateAppKey( - appId, - d_queueState_p->partitionId()), - 0); - } iter = makeSubStream(appId, - key2, + mqbu::StorageKey::k_NULL_KEY, bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID); } BSLS_ASSERT_SAFE(iter != d_apps.end()); - BSLS_ASSERT_SAFE(iter->key1() == subStreamInfo.appId()); + BSLS_ASSERT_SAFE(iter->first == subStreamInfo.appId()); // Do not insert the handle to the AppState::d_consumers until // configureHandle (which specifies priority) - d_queueState_p->adopt(iter->value()); - upstreamSubQueueId = iter->value()->upstreamSubQueueId(); + d_queueState_p->adopt(iter->second); + upstreamSubQueueId = iter->second->upstreamSubQueueId(); - if (!iter->value()->isAuthorized()) { - if (iter->value()->authorize()) { - d_consumptionMonitor.registerSubStream( - iter->value()->appKey()); + if (!iter->second->isAuthorized()) { + if (iter->second->authorize()) { + d_consumptionMonitor.registerSubStream(appId); } } } @@ -935,12 +915,10 @@ void RootQueueEngine::configureHandle( return; // RETURN } - Apps::iterator iter = d_apps.findByKey1(appId); + Apps::iterator iter = d_apps.find(appId); BSLS_ASSERT_SAFE(iter != d_apps.end()); - const AppStateSp& affectedApp = iter->value(); - - BSLS_ASSERT_SAFE(affectedApp->appKey() == iter->key2().first); + const AppStateSp& affectedApp = iter->second; // prepare the App for rebuilding consumers affectedApp->undoRouting(); @@ -973,7 +951,7 @@ void RootQueueEngine::configureHandle( BALL_LOG_INFO << "Rebuilt active consumers of the highest " << "priority for queue '" << d_queueState_p->queue()->description() << "', appId = '" - << iter->key1() << "'. Now there are " + << iter->first << "'. Now there are " << affectedApp->routing()->priorityCount() << " consumers."; // Inform the requester of the success before attempting to deliver new @@ -1130,11 +1108,11 @@ void RootQueueEngine::releaseHandle( mqbi::QueueHandleReleaseResult result = proctor.releaseStream(copy); if (copy.readCount()) { - Apps::iterator itApp = d_apps.findByKey1(citer->first); + Apps::iterator itApp = d_apps.find(citer->first); BSLS_ASSERT_SAFE(itApp != d_apps.end()); - AppState* app(itApp->value().get()); + AppState* app(itApp->second.get()); if (result.hasNoHandleStreamConsumers()) { // No re-delivery attempts until entire handle stops consuming @@ -1215,10 +1193,10 @@ void RootQueueEngine::releaseHandle( // state. On the surface it results in alarm being // (re)generated if the unauthorized app is used again // after all previous clients are gone. - if (!itApp->value()->isAuthorized()) { + if (!itApp->second->isAuthorized()) { BALL_LOG_INFO << "There are no more clients for the unauthorized" - << " appId [" << itApp->key1() + << " appId [" << itApp->first << "] and the appId was not registered with BMQ." << "Removing this appId from queue engine since " << "all clients have gone away. The 'unregistered" @@ -1304,12 +1282,12 @@ void RootQueueEngine::afterNewMessage( // Assume, all Apps need to deliver (some may be at capacity) for (Apps::iterator iter = d_apps.begin(); iter != d_apps.end(); ++iter) { - AppStateSp& app = iter->value(); + AppStateSp& app = iter->second; if (d_appsDeliveryContext.processApp(*app, app->ordinal())) { // Consider this message as sent out - d_consumptionMonitor.onMessageSent(iter->key2().first); + d_consumptionMonitor.onMessageSent(iter->first); // Report queue time metric per App // Report 'queue time' metric for all active appIds @@ -1590,11 +1568,9 @@ void RootQueueEngine::afterQueuePurged(const bsl::string& appId, d_queueState_p->queue())); if (appKey.isNull()) { - // NOTE: Since in CSL mode when a consumer opens the queue with an - // unauthorized appId, we insert an item having a pair of (appId, - // nullKey) as its key to d_apps. Thus, to avoid accidentally treating - // a nullKey resulting from unauthorized appId as wildcard matching, we - // add an additional assert that the appId must be empty. + // 'mqbu::StorageKey::k_NULL_KEY' indicates the entire queue in which + // case there must be 'bmqp::ProtocolUtil::k_NULL_APP_ID' + BSLS_ASSERT_SAFE(appId == bmqp::ProtocolUtil::k_NULL_APP_ID); d_storageIter_mp->reset(); @@ -1602,10 +1578,10 @@ void RootQueueEngine::afterQueuePurged(const bsl::string& appId, return; // RETURN } - Apps::iterator iter = d_apps.findByKey2(AppKeyCount(appKey, 0)); + Apps::iterator iter = d_apps.find(appId); BSLS_ASSERT_SAFE(iter != d_apps.end()); - BSLS_ASSERT_SAFE(iter->key1() == appId); - iter->value()->clear(); + BSLS_ASSERT_SAFE(iter->first == appId); + iter->second->clear(); } void RootQueueEngine::onTimer(bsls::Types::Int64 currentTimer) @@ -1620,8 +1596,8 @@ void RootQueueEngine::onTimer(bsls::Types::Int64 currentTimer) } bsl::ostream& -RootQueueEngine::logAppSubscriptionInfo(bsl::ostream& stream, - const mqbu::StorageKey& appKey) const +RootQueueEngine::logAppSubscriptionInfo(bsl::ostream& stream, + const bsl::string& appId) const { // executed by the *QUEUE DISPATCHER* thread @@ -1630,14 +1606,14 @@ RootQueueEngine::logAppSubscriptionInfo(bsl::ostream& stream, d_queueState_p->queue())); // Get AppState by appKey. - Apps::const_iterator cItApp = d_apps.findByKey2(AppKeyCount(appKey, 0)); + Apps::const_iterator cItApp = d_apps.find(appId); if (cItApp == d_apps.end()) { - BALL_LOG_WARN << "No app found for appKey: " << appKey; - stream << "\nSubscription info: no app found for appKey: " << appKey; + BALL_LOG_WARN << "No app found for appId: " << appId; + stream << "\nSubscription info: no app found for appId: " << appId; return stream; // RETURN } - const AppStateSp& app = cItApp->value(); + const AppStateSp& app = cItApp->second; return logAppSubscriptionInfo(stream, app); } @@ -1744,8 +1720,8 @@ RootQueueEngine::logAppSubscriptionInfo(bsl::ostream& stream, return stream; } -bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey, - bool enableLog) const +bool RootQueueEngine::logAlarmCb(const bsl::string& appId, + bool enableLog) const { // executed by the *QUEUE DISPATCHER* thread @@ -1754,12 +1730,12 @@ bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey, d_queueState_p->queue())); // Get AppState by appKey. - Apps::const_iterator cItApp = d_apps.findByKey2(AppKeyCount(appKey, 0)); + Apps::const_iterator cItApp = d_apps.find(appId); if (cItApp == d_apps.end()) { - BALL_LOG_WARN << "No app found for appKey: " << appKey; + BALL_LOG_WARN << "No app found for appId: " << appId; return false; // RETURN } - const AppStateSp& app = cItApp->value(); + const AppStateSp& app = cItApp->second; // Check if there are un-delivered messages bslma::ManagedPtr headIt = head(app); @@ -1870,7 +1846,7 @@ bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey, } void RootQueueEngine::afterAppIdRegistered( - const mqbi::Storage::AppInfo& appIdKeyPair) + const mqbi::Storage::AppInfos& addedAppIds) { // executed by the *QUEUE DISPATCHER* thread @@ -1880,102 +1856,48 @@ void RootQueueEngine::afterAppIdRegistered( if (!d_isFanout) { BALL_LOG_ERROR << "RootQueueEngine::afterAppIdRegistered() should " - << "never be called for a non-Fanout queue. Received " - << "call to register appId '" << appIdKeyPair.first - << "', appKey '" << appIdKeyPair.second << "'."; + << "never be called for a non-Fanout queue: " + << d_queueState_p->uri(); return; // RETURN; } - // We need to handle 2 scenarios here: a consumer with the specified - // 'appId' may have already opened the queue, or otherwise. + for (mqbi::Storage::AppInfos::const_iterator cit = addedAppIds.cbegin(); + cit != addedAppIds.cend(); + ++cit) { + // We need to handle 2 scenarios here: a consumer with the specified + // 'appId' may have already opened the queue, or otherwise. - const bsl::string& appId = appIdKeyPair.first; - const mqbu::StorageKey& appKey = appIdKeyPair.second; - if (d_queueState_p->isCSLModeEnabled()) { - BSLS_ASSERT_SAFE(!appKey.isNull()); - } - else { - BSLS_ASSERT_SAFE(appKey.isNull()); - } + const bsl::string& appId = cit->first; + const mqbu::StorageKey& appKey = cit->second; - Apps::iterator iter = d_apps.findByKey1(appId); - mqbu::StorageKey key; + BSLS_ASSERT_SAFE(!appKey.isNull()); - if (iter == d_apps.end()) { - // No consumer has opened the queue with 'appId'. - if (d_queueState_p->isCSLModeEnabled()) { - key = appKey; - } - else { - // TODO_CSL Remove this snippet when pre-CSL workflow has been - // retired from all clusters. + Apps::const_iterator citApp = d_apps.find(appId); - key = d_queueState_p->appKeyGenerator()->generateAppKey( - appId, - d_queueState_p->partitionId()); - } + if (citApp == d_apps.end()) { + // No consumer has opened the queue with 'appId'. - iter = makeSubStream(appId, - AppKeyCount(key, 0), - bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID); - } - else { - // A consumer has already opened the queue with 'appId'. - if (d_queueState_p->isCSLModeEnabled()) { - // Two possible scenarios: - // - // 1. When a consumer opened the queue with an unauthorized appId, - // we inserted an item having a pair of (appId, nullKey) as its - // key to d_apps. Now, we replace the nullKey with the actual - // queue key from the appId registration. - // - // 2. The appId was previously registered and then unregistered. - // However, the old appId/appKey pair is still lingering because - // during RootQueueEngine::afterAppIdUnregistered() we still - // keep the app but invalidate the iterator. If not all - // consumers using the old appId have closed the queue, then - // there will still be an "ghost" entry in d_apps. For this - // very reason, we *cannot* assert that the appKey we have is a - // nullKey. - - // Cannot assert due to Scenario (2.) above - // BSLS_ASSERT_SAFE(iter->key2().first.isNull()); - - key = appKey; - - AppStateSp appStateSp = iter->value(); - d_apps.erase(iter); - - iter = d_apps.insert(appId, AppKeyCount(key, 0), appStateSp).first; + citApp = makeSubStream(appId, + appKey, + bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID); } - else { - // TODO_CSL Remove this snippet when pre-CSL workflow has been - // retired from all clusters. - key = iter->key2().first; - } + BSLS_ASSERT_SAFE(!citApp->second->isAuthorized()); - // Update the 'isAuthorized' boolean in app's state to true, now that - // the appId has been registered. This is critical to ensure that the - // now-registered appId continues to be available across restarts of - // its consumers. + // 'registerStorage' will update the authorization status of the App. } - BSLS_ASSERT_SAFE(!key.isNull()); - - mqbi::Storage::AppInfos one(1, d_allocator_p); - one.emplace(mqbi::Storage::AppInfo(appId, key)); d_queueState_p->storageManager()->updateQueuePrimary( d_queueState_p->uri(), d_queueState_p->key(), d_queueState_p->partitionId(), - one, + addedAppIds, mqbi::Storage::AppInfos()); } void RootQueueEngine::afterAppIdUnregistered( - const mqbi::Storage::AppInfo& appIdKeyPair) + const mqbi::Storage::AppInfos& removedAppIds) { // executed by the *QUEUE DISPATCHER* thread @@ -1984,28 +1906,29 @@ void RootQueueEngine::afterAppIdUnregistered( d_queueState_p->queue())); if (!d_isFanout) { - BALL_LOG_ERROR << "It should be not possible to unregister appId '" - << appIdKeyPair.first << "', appKey '" - << appIdKeyPair.second << "' for a non-Fanout queue."; + BALL_LOG_ERROR << "Invalid queue type for unregistering appId." + << d_queueState_p->uri(); return; // RETURN } - const bsl::string& appId = appIdKeyPair.first; - Apps::iterator iter = d_apps.findByKey1(appId); - BSLS_ASSERT_SAFE(iter != d_apps.end()); + for (mqbi::Storage::AppInfos::const_iterator cit = removedAppIds.cbegin(); + cit != removedAppIds.cend(); + ++cit) { + const bsl::string& appId = cit->first; + const mqbu::StorageKey& appKey = cit->second; - const mqbu::StorageKey& appKey = d_queueState_p->isCSLModeEnabled() - ? appIdKeyPair.second - : iter->key2().first; - BSLS_ASSERT_SAFE(iter->key2().first == appKey); + Apps::iterator iter = d_apps.find(appId); + BSLS_ASSERT_SAFE(iter != d_apps.end()); - // we still keep the app but invalidate the authorization - iter->value()->unauthorize(); + BSLS_ASSERT_SAFE(iter->second->isAuthorized()); + + // we still keep the app but invalidate the authorization + iter->second->unauthorize(); + + // Do a best effort to confirm the messages and remove the storage. If + // either fails, just log the condition. - // Do a best effort to confirm the messages and remove the storage. If - // either fails, just log the condition. - { const mqbi::StorageResult::Enum rc = d_queueState_p->storage()->removeAll(appKey); if (rc != mqbi::StorageResult::e_SUCCESS) { @@ -2016,20 +1939,18 @@ void RootQueueEngine::afterAppIdUnregistered( << "' [reason: " << mqbi::StorageResult::toAscii(rc) << "]"; } + + d_consumptionMonitor.unregisterSubStream(appId); } - mqbi::Storage::AppInfos one(1, d_allocator_p); - one.emplace(mqbi::Storage::AppInfo(appId, appKey)); d_queueState_p->storageManager()->updateQueuePrimary( d_queueState_p->uri(), d_queueState_p->key(), d_queueState_p->partitionId(), mqbi::Storage::AppInfos(), - one); + removedAppIds); // No need to log in case of failure because 'updateQueuePrimary' does it // (even in case of success FTM). - - d_consumptionMonitor.unregisterSubStream(appKey); } void RootQueueEngine::registerStorage(const bsl::string& appId, @@ -2047,18 +1968,18 @@ void RootQueueEngine::registerStorage(const bsl::string& appId, << ") now has storage: [App Id: " << appId << ", key: " << appKey << ", ordinal: " << appOrdinal << "]"; - Apps::iterator iter = d_apps.findByKey1(appId); + Apps::iterator iter = d_apps.find(appId); BSLS_ASSERT_SAFE(iter != d_apps.end()); - BSLS_ASSERT_SAFE(iter->key2().first == appKey); - iter->value()->authorize(appKey, appOrdinal); + iter->second->authorize(appKey, appOrdinal); - d_consumptionMonitor.registerSubStream(appKey); + d_consumptionMonitor.registerSubStream(appId); } -void RootQueueEngine::unregisterStorage(const bsl::string& appId, - const mqbu::StorageKey& appKey, - unsigned int appOrdinal) +void RootQueueEngine::unregisterStorage( + const bsl::string& appId, + BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& appKey, + BSLS_ANNOTATION_UNUSED unsigned int appOrdinal) { // executed by the *QUEUE DISPATCHER* thread @@ -2066,14 +1987,11 @@ void RootQueueEngine::unregisterStorage(const bsl::string& appId, BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - Apps::iterator iter = d_apps.findByKey1(appId); + Apps::iterator iter = d_apps.find(appId); BSLS_ASSERT_SAFE(iter != d_apps.end()); - BSLS_ASSERT_SAFE(iter->key2().first == appKey); // we still keep the app but invalidate the authorization - iter->value()->unauthorize(); - - (void)appOrdinal; + iter->second->unauthorize(); } mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions( @@ -2103,9 +2021,9 @@ mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions( putHeader.messageGUID()); for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { - AppStateSp& app = it->value(); + AppStateSp& app = it->second; if (!app->evaluateAutoSubcription()) { - result = d_queueState_p->storage()->autoConfirm(it->key2().first, + result = d_queueState_p->storage()->autoConfirm(app->appKey(), timestamp); if (result != mqbi::StorageResult::e_SUCCESS) { @@ -2185,14 +2103,14 @@ void RootQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const ++iter) { consumerStates.resize(consumerStates.size() + 1); mqbcmd::ConsumerState& consumerState = consumerStates.back(); - consumerState.appId() = iter->key1(); + consumerState.appId() = iter->first; - if (d_queueState_p->storage()->hasVirtualStorage(iter->key1())) { - const bool isAtEndOfStorage = iter->value()->isAtEndOfStorage() && + if (d_queueState_p->storage()->hasVirtualStorage(iter->first)) { + const bool isAtEndOfStorage = iter->second->isAtEndOfStorage() && d_storageIter_mp->atEnd(); consumerState.isAtEndOfStorage().makeValue(isAtEndOfStorage); - consumerState.status() = (!iter->value()->hasConsumers() + consumerState.status() = (!iter->second->hasConsumers() ? mqbcmd::ConsumerStatus::REGISTERED : mqbcmd::ConsumerStatus::ALIVE); } @@ -2200,7 +2118,7 @@ void RootQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const consumerState.status() = mqbcmd::ConsumerStatus::UNAUTHORIZED; } - iter->value()->loadInternals(&consumerState.appState()); + iter->second->loadInternals(&consumerState.appState()); } d_queueState_p->routingContext().loadInternals( @@ -2210,10 +2128,10 @@ void RootQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const bool RootQueueEngine::hasHandle(const bsl::string& appId, mqbi::QueueHandle* handle) const { - Apps::iterator iter = - const_cast(this)->d_apps.findByKey1(appId); + Apps::iterator iter = const_cast(this)->d_apps.find( + appId); - return (iter != d_apps.end() && iter->value()->find(handle)); + return (iter != d_apps.end() && iter->second->find(handle)); } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index 7e1fd8fb6..edc41cdbb 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -87,15 +87,8 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { /// instead of bsl::unique_ptr typedef bsl::shared_ptr AppStateSp; - /// Pair of (AppKey, number of times the key has shown up before). Note - /// that non-null keys *always* have a count of 0 as we do *not* allow - /// duplicate keys. If multiple consumers open a queue with different - /// unregistered appIds, then there will be nullKeys showing up multiple - /// times. The actual value of second field (the count) is meaningless. - typedef bsl::pair AppKeyCount; - - /// (appId, appKeyCount) -> AppStateSp - typedef bmqc::TwoKeyHashMap Apps; + /// appId -> AppStateSp + typedef bsl::unordered_map Apps; private: // DATA @@ -105,18 +98,7 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { QueueConsumptionMonitor d_consumptionMonitor; Apps d_apps; - // Map of (appId, appKeyCount) to - // AppState - - unsigned int d_nullKeyCount; - // Number of times the nullKey has shown - // up before. Needed because multiple - // consumers could open a queue with - // different unregistered appIds, - // resulting in multiple instances of - // nullKeys. We need to differentiate - // them to use them as keys in - // bmqc::TwoKeyHashMap. + // Map of appId to AppState bool d_hasAutoSubscriptions; // Does this queue engine have any auto subscriptions configured @@ -182,7 +164,8 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { /// THREAD: This method is called from any thread. int initializeAppId(const bsl::string& appId, bsl::ostream& errorDescription, - unsigned int upstreamSubQueueId); + unsigned int upstreamSubQueueId, + bool isReconfigure); /// Return true if the specified `handle` is registered for the /// specified `appId`. Return false otherwise. @@ -197,9 +180,9 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { const Apps::iterator& itApp, const Routers::AppContext* previous); - Apps::iterator makeSubStream(const bsl::string& appId, - const AppKeyCount& appKey, - unsigned int upstreamSubQueueId); + Apps::iterator makeSubStream(const bsl::string& appId, + const mqbu::StorageKey& appKey, + unsigned int upstreamSubQueueId); bool validate(unsigned int upstreamSubQueueId) const; @@ -209,7 +192,7 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { /// If there are un-delivered messages for the specified `appKey` and /// `enableLog` is `true` it logs alarm data. Return `true` if there are /// un-delivered messages and `false` otherwise. - bool logAlarmCb(const mqbu::StorageKey& appKey, bool enableLog) const; + bool logAlarmCb(const bsl::string& appId, bool enableLog) const; public: // TRAITS @@ -261,10 +244,11 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { // MANIPULATORS // (virtual mqbi::QueueEngine) - /// Configure this instance. Return zero on success, non-zero value + /// Configure this instance. The specified `isReconfigure` flag indicates + /// if queue is being reconfigured. Return zero on success, non-zero value /// otherwise and populate the specified `errorDescription`. - virtual int - configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; + virtual int configure(bsl::ostream& errorDescription, + bool isReconfigure) BSLS_KEYWORD_OVERRIDE; /// Reset the internal state of this engine. If the optionally specified /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs @@ -395,19 +379,19 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { virtual void onTimer(bsls::Types::Int64 currentTimer) BSLS_KEYWORD_OVERRIDE; - /// Called after the specified `appIdKeyPair` has been dynamically + /// Called after the specified `addedAppIds` have been dynamically /// registered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void afterAppIdRegistered( - const mqbi::Storage::AppInfo& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; + const mqbi::Storage::AppInfos& addedAppIds) BSLS_KEYWORD_OVERRIDE; - /// Called after the specified `appIdKeyPair` has been dynamically + /// Called after the specified `removedAppIds` have been dynamically /// unregistered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void afterAppIdUnregistered( - const mqbi::Storage::AppInfo& appIdKeyPair) BSLS_KEYWORD_OVERRIDE; + const mqbi::Storage::AppInfos& removedAppIds) BSLS_KEYWORD_OVERRIDE; /// Called after creation of a new storage for the specified /// `appIdKeyPair`. @@ -459,17 +443,17 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { virtual void loadInternals(mqbcmd::QueueEngine* out) const BSLS_KEYWORD_OVERRIDE; - /// Log appllication subscription info for the specified `appKey` into the + /// Log application subscription info for the specified `appId` into the /// specified `stream`. /// /// THREAD: This method is called from the Queue's /// dispatcher thread. virtual bsl::ostream& logAppSubscriptionInfo( - bsl::ostream& stream, - const mqbu::StorageKey& appKey) const BSLS_KEYWORD_OVERRIDE; + bsl::ostream& stream, + const bsl::string& appId) const BSLS_KEYWORD_OVERRIDE; private: - /// Log appllication subscription info for the specified `appState` into + /// Log application subscription info for the specified `appState` into /// the specified `stream`. bsl::ostream& logAppSubscriptionInfo(bsl::ostream& stream, const AppStateSp& appState) const; diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.t.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.t.cpp index 99eeedb10..abc1844ad 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.t.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.t.cpp @@ -1464,7 +1464,7 @@ static void test9_priorityCreateAndConfigure() bmqu::MemOutStream errorDescription(s_allocator_p); errorDescription.reset(); - int rc = queueEngineMp->configure(errorDescription); + int rc = queueEngineMp->configure(errorDescription, false); ASSERT_EQ(errorDescription.length(), 0U); ASSERT_EQ(rc, 0); @@ -2779,7 +2779,7 @@ static void test22_createAndConfigure() // configuring it successfully bmqu::MemOutStream errorDescription(s_allocator_p); errorDescription.reset(); - int rc = queueEngineMp->configure(errorDescription); + int rc = queueEngineMp->configure(errorDescription, false); ASSERT_EQ(errorDescription.length(), 0U); ASSERT_EQ(rc, 0); diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index e319f48f3..dcd41f5e1 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -1271,17 +1271,6 @@ void StorageManager::updateQueueReplica(int partitionId, d_fileStores[partitionId]->dispatchEvent(queueEvent); } -mqbu::StorageKey StorageManager::generateAppKey(const bsl::string& appId, - int partitionId) -{ - // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' - // or by *CLUSTER DISPATCHER* thread. - - return mqbc::StorageUtil::generateAppKey(&d_appKeysVec[partitionId], - &d_appKeysLock, - appId); -} - void StorageManager::setQueue(mqbi::Queue* queue, const bmqt::Uri& uri, int partitionId) diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index 86a5a366c..6ff9ba213 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -35,7 +35,6 @@ #include #include #include -#include #include #include #include @@ -567,13 +566,6 @@ class StorageManager : public mqbi::StorageManager { mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; - /// Return a unique appKey for the specified `appId` for a queue - /// assigned to the specified `partitionId`. This routine can be - /// invoked by any thread. - virtual mqbu::StorageKey - generateAppKey(const bsl::string& appId, - int partitionId) BSLS_KEYWORD_OVERRIDE; - /// Set the queue instance associated with the file-backed storage for /// the specified `uri` mapped to the specified `partitionId` to the /// specified `queue` value. Note that this method *does* *not* diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index 0f09d5f34..8b9ae5668 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -1263,7 +1263,7 @@ void ClusterUtil::populateAppInfos( void ClusterUtil::registerAppId(ClusterData* clusterData, ClusterStateLedger* ledger, - const ClusterState& clusterState, + ClusterState& clusterState, const bsl::string& appId, const mqbi::Domain* domain, bslma::Allocator* allocator) @@ -1368,6 +1368,29 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, queueUpdate.addedAppIds().push_back(appIdInfo); queueAdvisory.queueUpdates().push_back(queueUpdate); + + if (!clusterData->cluster().isCSLModeEnabled()) { + // In CSL mode, we update the queue to ClusterState upon CSL + // commit callback of QueueUpdateAdvisory. + + // In non-CSL mode this is the shortcut to call Primary CQH + // instead of waiting for the quorum of acks in the ledger. + + AppInfos addedApps(allocator); + mqbc::ClusterUtil::parseQueueInfo(&addedApps, + queueUpdate.addedAppIds(), + allocator); + + BSLA_MAYBE_UNUSED const int assignRc = + clusterState.updateQueue(queueUpdate.uri(), + queueUpdate.domain(), + addedApps, + AppInfos(allocator)); + BSLS_ASSERT_SAFE(assignRc == 0); + + BALL_LOG_INFO << clusterData->cluster().description() + << ": Queue updated: " << queueAdvisory; + } } } @@ -1390,7 +1413,7 @@ void ClusterUtil::registerAppId(ClusterData* clusterData, void ClusterUtil::unregisterAppId(ClusterData* clusterData, ClusterStateLedger* ledger, - const ClusterState& clusterState, + ClusterState& clusterState, const bsl::string& appId, const mqbi::Domain* domain, bslma::Allocator* allocator) @@ -1444,7 +1467,6 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, // QueueUpdateAdvisory to indicate that we are updating appIds for the // entire domain. - bmqt::Uri uriii("bmq://bmq.test.mmap.priority/q1"); bmqp_ctrlmsg::QueueInfoUpdate queueUpdate; queueUpdate.uri() = ""; queueUpdate.partitionId() = mqbs::DataStore::k_INVALID_PARTITION_ID; @@ -1497,6 +1519,29 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, return; // RETURN } + + if (!clusterData->cluster().isCSLModeEnabled()) { + // In CSL mode, we update the queue to ClusterState upon CSL + // commit callback of QueueUpdateAdvisory. + + // In non-CSL mode this is the shortcut to call Primary CQH + // instead of waiting for the quorum of acks in the ledger. + + AppInfos removedApps(allocator); + mqbc::ClusterUtil::parseQueueInfo(&removedApps, + queueUpdate.removedAppIds(), + allocator); + + BSLA_MAYBE_UNUSED const int assignRc = + clusterState.updateQueue(queueUpdate.uri(), + queueUpdate.domain(), + AppInfos(allocator), + removedApps); + BSLS_ASSERT_SAFE(assignRc == 0); + + BALL_LOG_INFO << clusterData->cluster().description() + << ": Queue updated: " << queueAdvisory; + } } } @@ -2303,10 +2348,18 @@ int ClusterUtil::latestLedgerLSN(bmqp_ctrlmsg::LeaderMessageSequence* out, void ClusterUtil::parseQueueInfo(mqbi::ClusterStateManager::AppInfos* out, const bmqp_ctrlmsg::QueueInfo& queueInfo, bslma::Allocator* allocator) +{ + parseQueueInfo(out, queueInfo.appIds(), allocator); +} + +void ClusterUtil::parseQueueInfo( + mqbi::ClusterStateManager::AppInfos* out, + const bsl::vector& apps, + bslma::Allocator* allocator) { for (bsl::vector::const_iterator cit = - queueInfo.appIds().cbegin(); - cit != queueInfo.appIds().cend(); + apps.cbegin(); + cit != apps.cend(); ++cit) { out->emplace(mqbi::ClusterStateManager::AppInfo( bsl::string(cit->appId(), allocator), diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.h b/src/groups/mqb/mqbc/mqbc_clusterutil.h index 970fdb3e0..3eb4f648e 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.h +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.h @@ -291,7 +291,7 @@ struct ClusterUtil { /// dispatcher thread. static void registerAppId(ClusterData* clusterData, ClusterStateLedger* ledger, - const ClusterState& clusterState, + ClusterState& clusterState, const bsl::string& appId, const mqbi::Domain* domain, bslma::Allocator* allocator); @@ -305,7 +305,7 @@ struct ClusterUtil { /// dispatcher thread. static void unregisterAppId(ClusterData* clusterData, ClusterStateLedger* ledger, - const ClusterState& clusterState, + ClusterState& clusterState, const bsl::string& appId, const mqbi::Domain* domain, bslma::Allocator* allocator); @@ -416,6 +416,10 @@ struct ClusterUtil { static void parseQueueInfo(mqbi::ClusterStateManager::AppInfos* out, const bmqp_ctrlmsg::QueueInfo& queueInfo, bslma::Allocator* allocator); + static void + parseQueueInfo(mqbi::ClusterStateManager::AppInfos* out, + const bsl::vector& apps, + bslma::Allocator* allocator); }; // ============================================================================ diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index d472aa809..fcd681715 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3857,17 +3857,6 @@ void StorageManager::updateQueueReplica(int partitionId, d_fileStores[partitionId]->dispatchEvent(queueEvent); } -mqbu::StorageKey StorageManager::generateAppKey(const bsl::string& appId, - int partitionId) -{ - // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' - // or by *CLUSTER DISPATCHER* thread. - - return StorageUtil::generateAppKey(&d_appKeysVec[partitionId], - &d_appKeysLock, - appId); -} - void StorageManager::setQueue(mqbi::Queue* queue, const bmqt::Uri& uri, int partitionId) diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index 7fea414f3..5ba16299b 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -904,13 +904,6 @@ class StorageManager mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; - /// Return a unique appKey for the specified `appId` for a queue - /// assigned to the specified `partitionId`. This routine can be - /// invoked by any thread. - virtual mqbu::StorageKey - generateAppKey(const bsl::string& appId, - int partitionId) BSLS_KEYWORD_OVERRIDE; - /// Set the queue instance associated with the file-backed storage for /// the specified `uri` mapped to the specified `partitionId` to the /// specified `queue` value. Note that this method *does* *not* diff --git a/src/groups/mqb/mqbi/mqbi_appkeygenerator.cpp b/src/groups/mqb/mqbi/mqbi_appkeygenerator.cpp deleted file mode 100644 index 6ab010d6e..000000000 --- a/src/groups/mqb/mqbi/mqbi_appkeygenerator.cpp +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2018-2023 Bloomberg Finance L.P. -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// mqbi_appkeygenerator.cpp -*-C++-*- -#include - -#include -namespace BloombergLP { -namespace mqbi { - -// --------------------- -// class AppKeyGenerator -// --------------------- - -// CREATORS -AppKeyGenerator::~AppKeyGenerator() -{ -} - -} // close package namespace -} // close enterprise namespace diff --git a/src/groups/mqb/mqbi/mqbi_appkeygenerator.h b/src/groups/mqb/mqbi/mqbi_appkeygenerator.h deleted file mode 100644 index ecb33bfab..000000000 --- a/src/groups/mqb/mqbi/mqbi_appkeygenerator.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2018-2023 Bloomberg Finance L.P. -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// mqbi_appkeygenerator.h -*-C++-*- -#ifndef INCLUDED_MQBI_APPKEYGENERATOR -#define INCLUDED_MQBI_APPKEYGENERATOR - -//@PURPOSE: Provide an interface for an AppKeyGenerator. -// -//@CLASSES: -// mqbi::AppKeyGenerator: interface for an AppKeyGenerator -// -//@DESCRIPTION: 'mqbi::AppKeyGenerator' is an interface for a generator of -// 'mqbu::StorageKey' objects. - -// MQB - -// BDE -#include - -namespace BloombergLP { - -// FORWARD DECLARATIONS -namespace mqbu { -class StorageKey; -} - -namespace mqbi { - -// ===================== -// class AppKeyGenerator -// ===================== - -/// Interface for an AppKeyGenerator. -class AppKeyGenerator { - public: - // CREATORS - - /// Destroy this object. - virtual ~AppKeyGenerator(); - - // MANIPULATORS - - /// Return a unique appKey for the specified `appId` for a queue - /// assigned to the specified `partitionId`. Behavior is undefined - /// unless this method is invoked in the dispatcher thread of the - /// `partitionId`. - virtual mqbu::StorageKey generateAppKey(const bsl::string& appId, - int partitionId) = 0; -}; - -} // close package namespace -} // close enterprise namespace - -#endif diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.cpp b/src/groups/mqb/mqbi/mqbi_queueengine.cpp index aeb77a513..6509b7b97 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.cpp +++ b/src/groups/mqb/mqbi/mqbi_queueengine.cpp @@ -36,13 +36,13 @@ QueueEngine::~QueueEngine() } void QueueEngine::afterAppIdRegistered( - BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfo& appIdKeyPair) + BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfos& addedAppIds) { // NOTHING } void QueueEngine::afterAppIdUnregistered( - BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfo& appIdKeyPair) + BSLS_ANNOTATION_UNUSED const mqbi::Storage::AppInfos& removedAppIds) { // NOTHING } @@ -64,8 +64,8 @@ void QueueEngine::unregisterStorage( } bsl::ostream& -QueueEngine::logAppSubscriptionInfo(bsl::ostream& stream, - const mqbu::StorageKey& appKey) const +QueueEngine::logAppSubscriptionInfo(bsl::ostream& stream, + const bsl::string& appId) const { return stream; }; diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.h b/src/groups/mqb/mqbi/mqbi_queueengine.h index b2ad21082..5a858bb32 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.h +++ b/src/groups/mqb/mqbi/mqbi_queueengine.h @@ -70,9 +70,11 @@ class QueueEngine { // MANIPULATORS - /// Configure this instance. Return zero on success, non-zero value + /// Configure this instance. The specified `isReconfigure` flag indicates + /// if queue is being reconfigured. Return zero on success, non-zero value /// otherwise and populate the specified `errorDescription`. - virtual int configure(bsl::ostream& errorDescription) = 0; + virtual int configure(bsl::ostream& errorDescription, + bool isReconfigure) = 0; /// Reset the internal state of this engine. If the optionally specified /// 'isShuttingDown' is 'true', clear the routing state but keep the Apps @@ -193,19 +195,19 @@ class QueueEngine { /// THREAD: This method is called from the Queue's dispatcher thread. virtual void onTimer(bsls::Types::Int64 currentTimer) = 0; - /// Called after the specified `appIdKeyPair` has been dynamically + /// Called after the specified `addedAppIds` have been dynamically /// registered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void - afterAppIdRegistered(const mqbi::Storage::AppInfo& appIdKeyPair); + afterAppIdRegistered(const mqbi::Storage::AppInfos& addedAppIds); - /// Called after the specified `appIdKeyPair` has been dynamically + /// Called after the specified `removedAppIds` have been dynamically /// unregistered. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void - afterAppIdUnregistered(const mqbi::Storage::AppInfo& appIdKeyPair); + afterAppIdUnregistered(const mqbi::Storage::AppInfos& removedAppIds); /// Called after creation of a new storage for the specified /// `appIdKeyPair`. @@ -247,13 +249,13 @@ class QueueEngine { /// this queue engine and associated queue handles. virtual void loadInternals(mqbcmd::QueueEngine* out) const = 0; - /// Log appllication subscription info for the specified `appKey` into the + /// Log appllication subscription info for the specified `appId` into the /// specified `stream`. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual bsl::ostream& - logAppSubscriptionInfo(bsl::ostream& stream, - const mqbu::StorageKey& appKey) const; + logAppSubscriptionInfo(bsl::ostream& stream, + const bsl::string& appId) const; }; } // close package namespace diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index e0397e582..4d5ee2bec 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -31,7 +31,6 @@ // MQB -#include #include #include #include @@ -165,7 +164,7 @@ class StorageManagerIterator { // ==================== /// Storage Manager, in charge of all the partitions. -class StorageManager : public mqbi::AppKeyGenerator { +class StorageManager { public: // TYPES typedef mqbi::Storage::AppInfo AppInfo; @@ -208,7 +207,7 @@ class StorageManager : public mqbi::AppKeyGenerator { // CREATORS /// Destructor - ~StorageManager() BSLS_KEYWORD_OVERRIDE; + virtual ~StorageManager(); // MANIPULATORS @@ -280,12 +279,6 @@ class StorageManager : public mqbi::AppKeyGenerator { mqbi::Domain* domain = 0, bool allowDuplicate = false) = 0; - /// Return a unique appKey for the specified `appId` for a queue - /// assigned to the specified `partitionId`. This routine can be - /// invoked by any thread. - mqbu::StorageKey generateAppKey(const bsl::string& appId, - int partitionId) BSLS_KEYWORD_OVERRIDE = 0; - /// Set the queue instance associated with the file-backed storage for /// the specified `uri` mapped to the specified `partitionId` to the /// specified `queue` value. Note that this method *does* *not* diff --git a/src/groups/mqb/mqbi/package/mqbi.mem b/src/groups/mqb/mqbi/package/mqbi.mem index 850c8cb66..d8d713f24 100644 --- a/src/groups/mqb/mqbi/package/mqbi.mem +++ b/src/groups/mqb/mqbi/package/mqbi.mem @@ -1,4 +1,3 @@ -mqbi_appkeygenerator mqbi_cluster mqbi_clusterstatemanager mqbi_dispatcher diff --git a/src/groups/mqb/mqbmock/mqbmock_appkeygenerator.cpp b/src/groups/mqb/mqbmock/mqbmock_appkeygenerator.cpp deleted file mode 100644 index e0fec21a1..000000000 --- a/src/groups/mqb/mqbmock/mqbmock_appkeygenerator.cpp +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2018-2023 Bloomberg Finance L.P. -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// mqbmock_appkeygenerator.cpp -*-C++-*- -#include - -#include -namespace BloombergLP { -namespace mqbmock { - -// --------------------- -// class AppKeyGenerator -// --------------------- - -AppKeyGenerator::~AppKeyGenerator() -{ - // NOTHING -} - -mqbu::StorageKey AppKeyGenerator::generateAppKey( - BSLS_ANNOTATION_UNUSED const bsl::string& appId, - BSLS_ANNOTATION_UNUSED int partitionId) -{ - return d_key; -} - -} // close package namespace -} // close enterprise namespace diff --git a/src/groups/mqb/mqbmock/mqbmock_appkeygenerator.h b/src/groups/mqb/mqbmock/mqbmock_appkeygenerator.h deleted file mode 100644 index 92a6b3868..000000000 --- a/src/groups/mqb/mqbmock/mqbmock_appkeygenerator.h +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2018-2023 Bloomberg Finance L.P. -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// mqbmock_appkeygenerator.h -*-C++-*- -#ifndef INCLUDED_MQBMOCK_APPKEYGENERATOR -#define INCLUDED_MQBMOCK_APPKEYGENERATOR - -//@PURPOSE: Provide a mock implementation of 'mqbi::AppKeyGenerator'. -// -//@CLASSES: -// mqbmock::AppKeyGenerator: mock AppKeyGenerator implementation -// -//@DESCRIPTION: This component provides a mock implementation, -// 'mqbmock::AppKeyGenerator'. - -// MQB - -#include -#include - -// BDE -#include - -namespace BloombergLP { -namespace mqbmock { - -// ===================== -// class AppKeyGenerator -// ===================== - -class AppKeyGenerator : public mqbi::AppKeyGenerator { - // DATA - mqbu::StorageKey d_key; - - // NOT IMPLEMENTED - AppKeyGenerator& operator=(const AppKeyGenerator&) BSLS_CPP11_DELETED; - - public: - // CREATORS - virtual ~AppKeyGenerator() BSLS_CPP11_OVERRIDE; - - // MANIPULATORS - - /// Return a unique appKey for the specified `appId` for a queue - /// assigned to the specified `partitionId`. Behavior is undefined - /// unless this method is invoked in the dispatcher thread of the - /// `partitionId`. - virtual mqbu::StorageKey - generateAppKey(const bsl::string& appId, - int partitionId) BSLS_CPP11_OVERRIDE; - - /// Set the key returned by `generateAppKey` to the specified `key`. - void setKey(const mqbu::StorageKey& key); -}; - -// ============================================================================ -// INLINE DEFINITIONS -// ============================================================================ - -// --------------------- -// class AppKeyGenerator -// --------------------- - -inline void AppKeyGenerator::setKey(const mqbu::StorageKey& key) -{ - d_key = key; -} - -} // close package namespace -} // close enterprise namespace - -#endif diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp index 208a65d22..51b0a1c10 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp @@ -41,7 +41,8 @@ QueueEngine::~QueueEngine() // MANIPULATORS int QueueEngine::configure( - BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription) + BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription, + BSLS_ANNOTATION_UNUSED bool isReconfigure) { return 0; } diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.h b/src/groups/mqb/mqbmock/mqbmock_queueengine.h index b156ba56a..1de541c35 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.h +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.h @@ -83,10 +83,11 @@ class QueueEngine : public mqbi::QueueEngine { // MANIPULATORS // (virtual mqbi::QueueEngine) - /// Configure this instance. Return zero on success, non-zero value + /// Configure this instance. The specified `isReconfigure` flag indicate + /// if queue is being reconfigured. Return zero on success, non-zero value /// otherwise and populate the specified `errorDescription`. - virtual int - configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; + virtual int configure(bsl::ostream& errorDescription, + bool isReconfigure) BSLS_KEYWORD_OVERRIDE; /// Reset the internal state of this engine. If the optionally specified /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp index 34ddd037d..236cf17b3 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp @@ -113,13 +113,6 @@ void StorageManager::updateQueueReplica( // NOTHING } -mqbu::StorageKey -StorageManager::generateAppKey(BSLS_ANNOTATION_UNUSED const bsl::string& appId, - BSLS_ANNOTATION_UNUSED int partitionId) -{ - return mqbu::StorageKey(); -} - void StorageManager::setQueue(BSLS_ANNOTATION_UNUSED mqbi::Queue* queue, BSLS_ANNOTATION_UNUSED const bmqt::Uri& uri, BSLS_ANNOTATION_UNUSED int partitionId) diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h index 77e032778..bde6f1924 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h @@ -128,12 +128,6 @@ class StorageManager : public mqbi::StorageManager { mqbi::Domain* domain = 0, bool allowDuplicate = false) BSLS_KEYWORD_OVERRIDE; - /// Return a unique appKey for the specified `appId` for a queue - /// assigned to the specified `partitionId`. This routine can be - /// invoked by any thread. - mqbu::StorageKey generateAppKey(const bsl::string& appId, - int partitionId) BSLS_KEYWORD_OVERRIDE; - /// Set the queue instance associated with the file-backed storage for /// the specified `uri` mapped to the specified `partitionId` to the /// specified `queue` value. Note that this method *does* *not* diff --git a/src/groups/mqb/mqbmock/package/mqbmock.mem b/src/groups/mqb/mqbmock/package/mqbmock.mem index 98846d1b6..9cd1517fc 100644 --- a/src/groups/mqb/mqbmock/package/mqbmock.mem +++ b/src/groups/mqb/mqbmock/package/mqbmock.mem @@ -1,4 +1,3 @@ -mqbmock_appkeygenerator mqbmock_cluster mqbmock_clusterstateledger mqbmock_clusterstateledgeriterator diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index 247b14031..f3074bc5a 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -1035,8 +1035,7 @@ FileBackedStorage::logAppsSubscriptionInfoCb(bsl::ostream& stream) const for (mqbi::Storage::AppInfos::const_iterator cit = appInfos.begin(); cit != appInfos.end(); ++cit) { - queue()->queueEngine()->logAppSubscriptionInfo(stream, - cit->second); + queue()->queueEngine()->logAppSubscriptionInfo(stream, cit->first); } } diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index 03f0c5988..2acd7159d 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -616,8 +616,7 @@ InMemoryStorage::logAppsSubscriptionInfoCb(bsl::ostream& stream) const for (mqbi::Storage::AppInfos::const_iterator cit = appInfos.begin(); cit != appInfos.end(); ++cit) { - queue()->queueEngine()->logAppSubscriptionInfo(stream, - cit->second); + queue()->queueEngine()->logAppSubscriptionInfo(stream, cit->first); } } diff --git a/src/integration-tests/test_appids.py b/src/integration-tests/test_appids.py index d6296102e..d0e333433 100644 --- a/src/integration-tests/test_appids.py +++ b/src/integration-tests/test_appids.py @@ -30,7 +30,7 @@ pytestmark = order(3) -authorized_app_ids = ["foo", "bar", "baz"] +default_app_ids = ["foo", "bar", "baz"] timeout = 60 max_msgs = 3 @@ -51,14 +51,14 @@ def test_open_alarm_authorize_post(cluster: Cluster): producer = next(proxies).create_client("producer") producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True) - all_app_ids = authorized_app_ids + ["quux"] + all_app_ids = default_app_ids + ["quux"] # --------------------------------------------------------------------- # Create a consumer for each authorized substream. consumers = {} - for app_id in authorized_app_ids: + for app_id in default_app_ids: consumer = next(proxies).create_client(app_id) consumers[app_id] = consumer consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True) @@ -111,7 +111,7 @@ def test_open_alarm_authorize_post(cluster: Cluster): # --------------------------------------------------------------------- # Authorize 'quux'. - set_app_ids(cluster, authorized_app_ids + ["quux"]) + set_app_ids(cluster, default_app_ids + ["quux"]) # --------------------------------------------------------------------- # Check that all substreams are alive. @@ -133,7 +133,7 @@ def test_open_alarm_authorize_post(cluster: Cluster): leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE) # pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe - for app_id in authorized_app_ids: + for app_id in default_app_ids: test_logger.info(f"Check if {app_id} has seen 2 messages") assert wait_until( lambda: len( @@ -171,7 +171,7 @@ def test_create_authorize_open_post(cluster: Cluster): # --------------------------------------------------------------------- # Authorize 'quux'. - set_app_ids(cluster, authorized_app_ids + ["quux"]) + set_app_ids(cluster, default_app_ids + ["quux"]) # --------------------------------------------------------------------- # Create a consumer for 'quux. This should succeed. @@ -198,7 +198,7 @@ def test_load_domain_authorize_open_post(cluster: Cluster): # --------------------------------------------------------------------- # Authorize 'quux'. - set_app_ids(cluster, authorized_app_ids + ["quux"]) + set_app_ids(cluster, default_app_ids + ["quux"]) # --------------------------------------------------------------------- # Create a consumer for 'quux. This should succeed. @@ -221,7 +221,7 @@ def _test_authorize_before_domain_loaded(cluster): # --------------------------------------------------------------------- # Authorize 'quux'. - set_app_ids(cluster, authorized_app_ids + ["quux"]) + set_app_ids(cluster, default_app_ids + ["quux"]) # --------------------------------------------------------------------- # Create the queue. @@ -249,9 +249,9 @@ def _test_command_errors(cluster): proxies = cluster.proxy_cycle() next(proxies).create_client("producer") - set_app_ids(cluster, authorized_app_ids + ["quux"]) + set_app_ids(cluster, default_app_ids + ["quux"]) - set_app_ids(cluster, authorized_app_ids) + set_app_ids(cluster, default_app_ids) def test_unregister_in_presence_of_queues(cluster: Cluster): @@ -276,7 +276,7 @@ def test_unregister_in_presence_of_queues(cluster: Cluster): # message posted while 'foo' was still valid. foo.wait_push_event() - set_app_ids(cluster, [a for a in authorized_app_ids if a not in ["foo"]]) + set_app_ids(cluster, [a for a in default_app_ids if a not in ["foo"]]) @attempt(3) def _(): @@ -320,7 +320,7 @@ def _(): assert Client.e_SUCCESS == foo.close(tc.URI_FANOUT_FOO, block=True) # Re-authorize - set_app_ids(cluster, authorized_app_ids) + set_app_ids(cluster, default_app_ids) foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True) producer.post(tc.URI_FANOUT, ["after-reauthorize"], block=True) @@ -408,7 +408,7 @@ def test_unauthorized_appid_doesnt_hold_messages(cluster: Cluster): # consume all the messages in all the authorized substreams # pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe - for app_id in authorized_app_ids: + for app_id in default_app_ids: appid_uri = f"{tc.URI_FANOUT}?id={app_id}" consumer = next(proxies).create_client(app_id) consumer.open(appid_uri, flags=["read"], succeed=True) @@ -446,7 +446,7 @@ def test_deauthorized_appid_doesnt_hold_messages(cluster: Cluster): # --------------------------------------------------------------------- # unauthorize 'bar' and 'baz' - set_app_ids(cluster, [a for a in authorized_app_ids if a not in ["bar", "baz"]]) + set_app_ids(cluster, [a for a in default_app_ids if a not in ["bar", "baz"]]) # --------------------------------------------------------------------- # fill queue to capacity @@ -537,3 +537,107 @@ def test_two_consumers_of_unauthorized_app(multi_node: Cluster): # shutdown and wait leader.stop() + + +@tweak.cluster.cluster_attributes.is_cslmode_enabled(False) +@tweak.cluster.cluster_attributes.is_fsmworkflow(False) +def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster): + leader = cluster.last_known_leader + proxies = cluster.proxy_cycle() + + producer = next(proxies).create_client("producer") + producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True) + + all_app_ids = default_app_ids + ["quux"] + + # --------------------------------------------------------------------- + # Create a consumer for each authorized substream. + + consumers = {} + + for app_id in all_app_ids: + consumer = next(proxies).create_client(app_id) + consumers[app_id] = consumer + consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True) + + # --------------------------------------------------------------------- + # Authorize 'quux'. + set_app_ids(cluster, default_app_ids + ["quux"]) + + # --------------------------------------------------------------------- + # Post a message. + producer.post(tc.URI_FANOUT, ["msg1"], succeed=True, wait_ack=True) + + # --------------------------------------------------------------------- + # Post a second message. + + producer.post(tc.URI_FANOUT, ["msg2"]) + assert producer.outputs_regex(r"MESSAGE.*ACK", timeout) + + # --------------------------------------------------------------------- + # Ensure that all substreams get 2 messages + + leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE) + # pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe + for app_id in all_app_ids: + test_logger.info(f"Check if {app_id} has seen 2 messages") + assert wait_until( + lambda: len( + consumers[app_id].list(f"{tc.URI_FANOUT}?id={app_id}", block=True) + ) + == 2, + 3, + ) + + # Save one confirm to the storage for 'quux' only + consumers["quux"].confirm(f"{tc.URI_FANOUT}?id=quux", "+1", succeed=True) + + for app_id in all_app_ids: + assert ( + consumers[app_id].close(f"{tc.URI_FANOUT}?id={app_id}", block=True) + == Client.e_SUCCESS + ) + + cluster.stop_nodes() + + # Reconfigure the cluster from non-FSM to FSM mode + for broker in cluster.configurator.brokers.values(): + my_clusters = broker.clusters.my_clusters + if len(my_clusters) > 0: + my_clusters[0].cluster_attributes.is_cslmode_enabled = True + my_clusters[0].cluster_attributes.is_fsmworkflow = True + cluster.deploy_domains() + + cluster.start_nodes(wait_leader=True, wait_ready=True) + # For a standard cluster, states have already been restored as part of + # leader re-election. + if cluster.is_single_node: + producer.wait_state_restored() + + for app_id in all_app_ids: + consumer = next(proxies).create_client(app_id) + consumers[app_id] = consumer + consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True) + + # pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe + for app_id in default_app_ids: + test_logger.info(f"Check if {app_id} has seen 2 messages") + assert wait_until( + lambda: len( + consumers[app_id].list(f"{tc.URI_FANOUT}?id={app_id}", block=True) + ) + == 2, + 3, + ) + + assert wait_until( + lambda: len(consumers["quux"].list(f"{tc.URI_FANOUT}?id=quux", block=True)) + == 1, + 3, + ) + + for app_id in all_app_ids: + assert ( + consumers[app_id].close(f"{tc.URI_FANOUT}?id={app_id}", block=True) + == Client.e_SUCCESS + )