diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index ec9bb24704..19c82153a6 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -566,17 +566,21 @@ void DomainManager::stop() << k_MAX_WAIT_SECONDS_AT_SHUTDOWN << " seconds while shutting down" << " bmqbrkr. rc: " << rc << "."; - - // Note that 'self' variable will get invalidated when this function - // returns, which will ensure that any pending 'onDomainClosed' - // callbacks are not invoked. So there is no need to explicitly call - // 'self.invalidate()' here. } if (d_domainResolver_mp) { d_domainResolver_mp->stop(); d_domainResolver_mp.clear(); } + + // Notice that this invalidation is necessary. + // Without this explicit call, `self` will be invalidated + // when the function returns, which will ensure that any pending + // `onDomainClosed` callbacks are not invoked. But this is not enough + // since we want to prevent a (tiny) possibility where `latch` is + // destructed before `self` and `onDomainClosed` would be called on an + // invalid `latch`. + self.invalidate(); } int DomainManager::locateDomain(DomainSp* domain, @@ -706,7 +710,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, else if (command.isRemoveValue()) { const bsl::string& name = command.remove().domain(); - // First pass + // First round if (command.remove().finalize().isNull()) { DomainSp domainSp; @@ -717,25 +721,23 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, return -1; // RETURN } - // 1. Reject if there's any opened or opening queue + // 1. Reject if there's any open queue request on the fly + // Mark DOMAIN PREREMOVE to block openQueue requests if (!domainSp->tryRemove()) { bmqu::MemOutStream os; os << "Trying to remove the domain '" << name - << "' while there are queues opened or opening"; + << "' while there are open queue requests on the fly or " + "the domain is shutting down"; result->makeError().message() = os.str(); return -1; // RETURN } - // 2. Mark DOMAIN PREREMOVE to block openQueue requests - domainSp->removeDomainReset(); - - // 3. Purge inactive queues - // remove virtual storage; add a record in journal file + // 2. Purge and GC mqbcmd::DomainResult domainResult; mqbcmd::ClusterResult clusterResult; mqbi::Cluster* cluster = domainSp->cluster(); - cluster->purgeQueueOnDomain(&clusterResult, name); + cluster->purgeAndGCQueueOnDomain(&clusterResult, name); if (clusterResult.isErrorValue()) { result->makeError(clusterResult.error()); @@ -752,18 +754,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, clusterResult.storageResult().purgedQueues().queues(); result->makeDomainResult(domainResult); - // 4. Force GC queues - // unregister Queue from domain; - // remove queue storage from partition - mqbcmd::ClusterResult clusterForceGCResult; - int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name); - if (clusterForceGCResult.isErrorValue()) { - result->makeError(clusterForceGCResult.error()); - return -1; // RETURN - } - - // 5. Mark DOMAIN REMOVED to accecpt the second pass - + // 3. Mark DOMAIN REMOVED to accecpt the second pass bmqu::SharedResource self(this); bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC); @@ -777,18 +768,19 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, bmqsys::Time::nowMonotonicClock().addSeconds( k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE); - rc = latch.timedWait(timeout); + int rc = latch.timedWait(timeout); if (0 != rc) { BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " << k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE << " seconds. rc: " << rc << "."; - return rc; } - // 6. Mark DOMAINS REMOVE command first round as complete - domainSp->removeDomainComplete(); + // Refer to `DomainManager::stop` to see why we need to invalidate + // `self` explicitly. + self.invalidate(); + return rc; // RETURN } - // Second pass + // Second round else { DomainSp domainSp; @@ -802,7 +794,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, if (!domainSp->isRemoveComplete()) { bmqu::MemOutStream os; - os << "First pass of DOMAINS REMOVE '" << name + os << "First round of DOMAINS REMOVE '" << name << "' is not completed."; result->makeError().message() = os.str(); return -1; // RETURN @@ -823,8 +815,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, result->makeSuccess(); return 0; // RETURN } - - return 0; } bmqu::MemOutStream os; diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 96c5bd6d03..d8915ca4ae 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -3665,65 +3665,61 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result) storageResult.clusterStorageSummary(); } -int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) +void Cluster::purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) { // exected by *ANY* thread dispatcher()->execute( - bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched, + bdlf::BindUtil::bind(&Cluster::purgeAndGCQueueOnDomainDispatched, this, result, domainName), this); dispatcher()->synchronize(this); - - return 0; } -void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, - const bsl::string& domainName) +void Cluster::purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - // 'true' implies immediate + // Check if there's any live connection to a queue + if (d_clusterOrchestrator.queueHelper().hasActiveQueue(domainName)) { + BALL_LOG_ERROR << "Trying to remove the domain '" << domainName + << "' while there are queues opened or opening"; + result->makeError().message() = + "Trying to remove the domain '" + domainName + + "' while there are queues opened or opening"; + return; // RETURN + } + + // Purge queues on the given domain + mqbcmd::StorageResult storageResult; + d_storageManager_mp->purgeQueueOnDomain(&storageResult, domainName); + result->makeStorageResult(storageResult); + + if (result->isErrorValue()) { + result->makeError(result->error()); + return; // RETURN + } + + // GC queues on the given domain const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName); if (rc == -1 || rc == -3) { // TBD: We allow the node to not be an active primary for *any* // partition; this has to be changed once we allow leader != primary - BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: " - << rc << ")"; - result->makeError().message() = "Failed to execute command (rc: " + - bsl::to_string(rc) + ")"; + BALL_LOG_ERROR << "Failed to force GC queues on domain '" << domainName + << "' (rc: " << rc << ")"; + result->makeError().message() = + "Failed to force GC queues on domain '" + domainName + + "' (rc: " + bsl::to_string(rc) + ")"; } - else { - // Otherwise the command succeeded. - result->makeSuccess(); - } -} - -void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) -{ - // exected by *ANY* thread - - mqbcmd::StorageResult storageResult; - - dispatcher()->execute( - bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain, - d_storageManager_mp.get(), - &storageResult, - domainName), - this); - - dispatcher()->synchronize(this); - - result->makeStorageResult(storageResult); } void Cluster::printClusterStateSummary(bsl::ostream& out, diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 534c8eb78c..452efa3e67 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -673,18 +673,14 @@ class Cluster : public mqbi::Cluster, /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; /// Executed by dispatcher thread. - void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, - const bsl::string& domainName); + void purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName); // MANIPULATORS // (virtual: mqbnet::SessionEventProcessor) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 25d608fd69..f290f7e797 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -1342,27 +1342,14 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out) loadQueuesInfo(&clusterProxyStatus.queuesInfo()); } -void ClusterProxy::purgeQueueOnDomain( +void ClusterProxy::purgeAndGCQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) { - bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; - result->makeError().message() = os.str(); -} - -int ClusterProxy::gcQueueOnDomain( - mqbcmd::ClusterResult* result, - BSLS_ANNOTATION_UNUSED const bsl::string& domainName) -{ - // exected by *ANY* thread - bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p); bmqu::MemOutStream os(&localAllocator); - os << "GC Queue not supported on a Proxy."; + os << "Purge and GC queue not supported on a Proxy."; result->makeError().message() = os.str(); - - return 0; } // MANIPULATORS diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 096b591301..5393c4c7b0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -539,14 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Load the cluster state in the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; void getPrimaryNodes(int* rc, bsl::ostream& errorDescription, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index e088cff811..257670f972 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -6222,6 +6222,47 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate, return rc_SUCCESS; // RETURN } +bool ClusterQueueHelper::hasActiveQueue(const bsl::string& domainName) +{ + // executed by the cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); + + const mqbc::ClusterState::DomainStates& domainStates = + d_clusterState_p->domainStates(); + + DomainStatesCIter domCit = domainStates.find(domainName); + + if (domCit == domainStates.end()) { + return false; // RETURN + } + + const UriToQueueInfoMap& queuesInfoPerDomain = + domCit->second->queuesInfo(); + + for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin(); + qCit != queuesInfoPerDomain.cend(); + ++qCit) { + QueueContextMapConstIter queueContextCIt = d_queues.find( + qCit->second->uri()); + + if (queueContextCIt == d_queues.end()) { + continue; + } + + if (queueContextCIt->second->d_liveQInfo.d_inFlight != 0 || + queueContextCIt->second->d_liveQInfo + .d_numHandleCreationsInProgress != 0 || + queueContextCIt->second->d_liveQInfo.d_numQueueHandles != 0) { + return true; // RETURN + } + } + + return false; // RETURN +} + void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const { // executed by the cluster *DISPATCHER* thread diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 3907bfb137..256c5449e0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -1089,6 +1089,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL int gcExpiredQueues(bool immediate = false, const bsl::string& domainName = ""); + bool hasActiveQueue(const bsl::string& domainName); + /// Start executing multi-step processing of StopRequest or CLOSING node /// advisory received from the specified `clusterNode`. In the case of /// StopRequest the specified `request` references the request; in the diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 9955ff7f00..0e907fd1e6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -14,6 +14,7 @@ // limitations under the License. // mqbblp_domain.cpp -*-C++-*- +#include #include #include @@ -375,8 +376,7 @@ Domain::Domain(const bsl::string& name, Domain::~Domain() { - BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state || - e_POSTREMOVE == d_state) && + BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state) && "'teardown' must be called before the destructor"); } @@ -485,7 +485,7 @@ int Domain::configure(bsl::ostream& errorDescription, void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) { // PRECONDITIONS - BSLS_ASSERT_SAFE(d_state != e_STOPPING && d_state != e_STOPPED); + BSLS_ASSERT_SAFE(d_state != e_STOPPING); BSLS_ASSERT_SAFE(!d_teardownCb); BSLS_ASSERT_SAFE(teardownCb); @@ -507,7 +507,8 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) if (d_queues.empty()) { d_teardownCb(d_name); - d_state = e_STOPPED; + d_teardownCb = bsl::nullptr_t(); + d_state = e_STOPPED; return; // RETURN } @@ -519,7 +520,7 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) void Domain::teardownRemove(const TeardownCb& teardownCb) { - BSLS_ASSERT_SAFE(d_state != e_REMOVING && d_state != e_REMOVED); + // PRECONDITIONS BSLS_ASSERT_SAFE(!d_teardownRemoveCb); BSLS_ASSERT_SAFE(teardownCb); @@ -529,13 +530,13 @@ void Domain::teardownRemove(const TeardownCb& teardownCb) << d_queues.size() << " registered queues."; d_teardownRemoveCb = teardownCb; - d_state = e_REMOVING; d_cluster_sp->unregisterStateObserver(this); if (d_queues.empty()) { d_teardownRemoveCb(d_name); - d_state = e_REMOVED; + d_teardownRemoveCb = bsl::nullptr_t(); + d_state = e_STOPPED; return; // RETURN } @@ -565,8 +566,7 @@ void Domain::openQueue( bmqp_ctrlmsg::Status status; - if (d_state == e_REMOVING || d_state == e_REMOVED || - d_state == e_PREREMOVE || d_state == e_POSTREMOVE) { + if (d_state == e_REMOVING || d_state == e_STOPPED) { status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; @@ -731,20 +731,16 @@ void Domain::unregisterQueue(mqbi::Queue* queue) // Refer to note in 'teardown' routine to see why 'd_state' is updated // while 'd_mutex' is acquired. - if (d_state == e_STOPPING) { - BSLS_ASSERT_SAFE(d_teardownCb); - - if (d_queues.empty()) { + if (d_queues.empty()) { + if (d_teardownCb) { d_teardownCb(d_name); - d_state = e_STOPPED; + d_teardownCb = bsl::nullptr_t(); + d_state = e_STOPPED; } - } - else if (d_state == e_REMOVING) { - BSLS_ASSERT_SAFE(d_teardownRemoveCb); - - if (d_queues.empty()) { + if (d_teardownRemoveCb) { d_teardownRemoveCb(d_name); - d_state = e_REMOVED; + d_teardownRemoveCb = bsl::nullptr_t(); + d_state = e_STOPPED; } } } @@ -920,21 +916,6 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainReset() -{ - bslmt::LockGuard guard(&d_mutex); // LOCK - - d_state = e_PREREMOVE; - d_teardownRemoveCb = bsl::nullptr_t(); -} - -void Domain::removeDomainComplete() -{ - bslmt::LockGuard guard(&d_mutex); // LOCK - - d_state = e_POSTREMOVE; -} - // ACCESSORS int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const @@ -1030,34 +1011,29 @@ void Domain::loadRoutingConfiguration( } } -bool Domain::tryRemove() const +bool Domain::tryRemove() { bslmt::LockGuard guard(&d_mutex); // LOCK - if (d_pendingRequests != 0) { + if (d_state == e_STOPPING) { return false; } - // If there's queue in this domain, check to see if there's any active - // handle to it - if (!d_queues.empty()) { - for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { - // Looks like in RootQueueEngine::releaseHandle, queueHandle is - // removed and r/w counts reset (in `proctor.releaseHandle`) before - // substreams are unregistered; should we check substream? - // handle->subStreamInfos().size() == 0 - if (it->second->hasActiveHandle()) { - return false; - } - } + if (d_pendingRequests != 0) { + return false; } + // Reset d_teardownRemoveCb in case the first round of + // DOMAINS REMOVE fails and we want to call it again + d_state = e_REMOVING; + d_teardownRemoveCb = bsl::nullptr_t(); + return true; } bool Domain::isRemoveComplete() const { - return d_state == e_POSTREMOVE; + return d_state == e_STOPPED; } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index 093e5b5e1f..e66f4b5122 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -112,14 +112,9 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, e_STARTED = 0, e_STOPPING = 1, e_STOPPED = 2, - // Used for teardownRemove function - e_REMOVING = 3, - e_REMOVED = 4, - // Used as flags to indicate - // the start and finish of - // the first round for DOMAINS REMOVE - e_PREREMOVE = 5, - e_POSTREMOVE = 6, + // indicate the start of the + // first round of DOMAINS REMOVE + e_REMOVING = 3 }; private: @@ -337,13 +332,6 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be PREREMOVE - void removeDomainReset() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; - // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -381,8 +369,8 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, const BSLS_KEYWORD_OVERRIDE; /// Check the state of the queues in this domain, return false if there's - /// queues opened or opening. - bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + /// queues opened or opening, or if the domain is closed or closing. + bool tryRemove() BSLS_KEYWORD_OVERRIDE; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h index 4b81866c41..b3c0734454 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.h +++ b/src/groups/mqb/mqbblp/mqbblp_queue.h @@ -448,9 +448,6 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; - - /// Return true if there's queue handle and they're actively used. - bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ @@ -602,11 +599,6 @@ inline bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } -inline bool Queue::hasActiveHandle() const -{ - return d_state.handleCatalog().handlesCount() != 0; -} - } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 472e44d325..6da890132c 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -421,8 +421,9 @@ bool ClusterState::unassignQueue(const bmqt::Uri& uri) } domIt->second->queuesInfo().erase(cit); + if (domIt->second->queuesInfo().empty()) { - domIt->second->setDomain(NULL); + d_domainStates.erase(domIt); } // POSTCONDITIONS diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 894642502f..96b5367668 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -371,13 +371,9 @@ class Cluster : public DispatcherClient { /// Load the cluster state to the specified `out` object. virtual void loadClusterStatus(mqbcmd::ClusterResult* out) = 0; - /// Purge queues in this cluster on a given domain. - virtual void purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) = 0; - - /// Force GC queues in this cluster on a given domain. - virtual int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) = 0; + /// Purge and force GC queues in this cluster on a given domain. + virtual void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) = 0; // ACCESSORS diff --git a/src/groups/mqb/mqbi/mqbi_domain.h b/src/groups/mqb/mqbi/mqbi_domain.h index a80c22de9d..791bb0e918 100644 --- a/src/groups/mqb/mqbi/mqbi_domain.h +++ b/src/groups/mqb/mqbi/mqbi_domain.h @@ -185,13 +185,6 @@ class Domain { virtual int processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) = 0; - /// Mark the state of domain to be PREREMOVE - virtual void removeDomainReset() = 0; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - virtual void removeDomainComplete() = 0; - // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -229,8 +222,8 @@ class Domain { bmqp_ctrlmsg::RoutingConfiguration* config) const = 0; /// Check the state of the queues in this domain, return false if there's - /// queues opened or opening. - virtual bool tryRemove() const = 0; + /// queues opened or opening, or if the domain is closed or closing. + virtual bool tryRemove() = 0; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h index a8077172fb..cd3b7dbf67 100644 --- a/src/groups/mqb/mqbi/mqbi_queue.h +++ b/src/groups/mqb/mqbi/mqbi_queue.h @@ -956,9 +956,6 @@ class Queue : public DispatcherClient { /// Return the Schema Leaner associated with this queue. virtual bmqp::SchemaLearner& schemaLearner() const = 0; - - /// Return true if there's queue handle and they're actively used. - virtual bool hasActiveHandle() const = 0; }; // ======================== diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index 82ffc442d4..75c6adabed 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -489,25 +489,15 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* out) out->makeClusterStatus(); } -void Cluster::purgeQueueOnDomain( +void Cluster::purgeAndGCQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) { bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; + os << "MockCluster::purgeAndGCQueueOnDomain not implemented!"; result->makeError().message() = os.str(); } -int Cluster::gcQueueOnDomain( - mqbcmd::ClusterResult* result, - BSLS_ANNOTATION_UNUSED const bsl::string& domainName) -{ - bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; - result->makeError().message() = os.str(); - return -1; -} - // MANIPULATORS // (specific to mqbmock::Cluster) Cluster& Cluster::_setIsClusterMember(bool value) diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index eb36702e56..098f6e6be1 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -402,14 +402,10 @@ class Cluster : public mqbi::Cluster { /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; // MANIPULATORS // (specific to mqbmock::Cluster) diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.cpp b/src/groups/mqb/mqbmock/mqbmock_domain.cpp index c67ddcd185..57bf4209a1 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_domain.cpp @@ -155,16 +155,6 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainReset() -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); -} - -void Domain::removeDomainComplete() -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); -} - int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const { @@ -236,7 +226,7 @@ void Domain::loadRoutingConfiguration( // NOTHING } -bool Domain::tryRemove() const +bool Domain::tryRemove() { BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); return true; diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.h b/src/groups/mqb/mqbmock/mqbmock_domain.h index 7f4fe059c8..900c08008b 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.h +++ b/src/groups/mqb/mqbmock/mqbmock_domain.h @@ -211,13 +211,6 @@ class Domain : public mqbi::Domain { processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be PREREMOVE - void removeDomainReset() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; - /// Load into the specified `out`, if `out` is not 0, the queue /// corresponding to the specified `uri`, if found. Return 0 on success, /// or a non-zero return code otherwise. @@ -254,7 +247,7 @@ class Domain : public mqbi::Domain { /// Check the state of the queues in this domain, return false if there's /// queues opened or opening. - bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + bool tryRemove() BSLS_KEYWORD_OVERRIDE; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.cpp b/src/groups/mqb/mqbmock/mqbmock_queue.cpp index 0c6963c038..98eaf2ca24 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queue.cpp @@ -527,13 +527,6 @@ bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } -bool Queue::hasActiveHandle() const -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); - - return false; -} - // ------------------- // class HandleFactory // ------------------- diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h index 932f9555a0..1c80082eb3 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.h +++ b/src/groups/mqb/mqbmock/mqbmock_queue.h @@ -430,9 +430,6 @@ class Queue : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; - /// Return true if there's queue handle and they're actively used. - bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; - // ACCESSORS // (specific to mqbi::MockQueue) }; diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 57358f457e..68deabe9c6 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -50,47 +50,6 @@ def write_messages(proxy, uri, n_msgs=5, do_confirm=True): consumer.close(uri, succeed=True) -def test_remove_domain_with_queue_closed(cluster: Cluster): - """ - send DOMAINS REMOVE command after both queue closed - command should succeed - """ - proxies = cluster.proxy_cycle() - proxy = next(proxies) - - # producer and consumer open the queue, - # post and confirm messages and both close - write_messages(proxy, tc.URI_PRIORITY) - - # send remove domain admin command - admin = AdminClient() - leader = cluster.last_known_leader - admin.connect(leader.config.host, int(leader.config.port)) - res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "Purged 0 message(s)" in res - - -def test_remove_domain_with_queue_open(cluster: Cluster): - """ - send DOMAINS REMOVE command with a queue still open - command should fail - """ - proxies = cluster.proxy_cycle() - proxy = next(proxies) - - uri = tc.URI_PRIORITY - producer = proxy.create_client("producer") - producer.open(uri, flags=["write"], succeed=True) - producer.post(uri, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True) - - # send remove domain admin command - admin = AdminClient() - leader = cluster.last_known_leader - admin.connect(leader.config.host, int(leader.config.port)) - res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "while there are queues open" in res - - def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): """ send DOMAINS REMOVE command when the cluster is not healthy @@ -235,16 +194,14 @@ def test_open_queue_after_remove_domain(cluster: Cluster): assert producer.open(uri, flags=["write"], block=True) != Client.e_SUCCESS -def test_remove_domain_with_queue_open(cluster: Cluster): +def test_remove_domain_with_producer_queue_open(cluster: Cluster): """ - issue DOMAINS REMOVE command when both producer and consumer close connections, - both open, or one of them has the connection open + issue DOMAINS REMOVE command when consumer closes connection """ proxies = cluster.proxy_cycle() proxy = next(proxies) # producer produces messages and consumer confirms - # then both close connections producer = proxy.create_client("producer") producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) @@ -259,41 +216,148 @@ def test_remove_domain_with_queue_open(cluster: Cluster): ) consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + # consumer closes connection + consumer.close(tc.URI_PRIORITY, succeed=True) + # send admin command - # when both producer and consumer open admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # close producer and send the command again + +def test_remove_domain_with_consumer_queue_open(cluster: Cluster): + """ + issue DOMAINS REMOVE command when producer closes connection + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # producer closes connection producer.close(tc.URI_PRIORITY, succeed=True) + + # send admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # open producer and close consumer and send the command again + +def test_remove_domain_with_both_queue_open_and_closed(cluster: Cluster): + """ + issue DOMAINS REMOVE command when both producer and consumer have queue open + and both have queue closed + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # send admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert ( + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" + in res + ) + + # close connections and try again + producer.close(tc.URI_PRIORITY, succeed=True) consumer.close(tc.URI_PRIORITY, succeed=True) + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 0 message(s)" in res + + +def test_try_open_removed_domain(cluster: Cluster): + """ + 1. producer send messages and consumer confirms + 2. send DOMAINS REMOVE admin command + 3. close both producer and consumer + 4. try open both, and they should all fail + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # send admin command + # when both producer and consumer open + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # close both and send the command again + # close producer and send the command again producer.close(tc.URI_PRIORITY, succeed=True) + consumer.close(tc.URI_PRIORITY, succeed=True) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "while there are queues open" not in res + assert "Purged 0 message(s)" in res + + # try open producer and consumer again + assert producer.open(tc.URI_PRIORITY, flags=["write"], block=True) < 0 + assert consumer.open(tc.URI_PRIORITY, flags=["read"], block=True) < 0 def test_remove_domain_with_unconfirmed_message(cluster: Cluster): + """ + issue DOMAINS REMOVE command with unconfirmed messages + """ proxies = cluster.proxy_cycle() proxy = next(proxies)