From 41f9b5b401fd0430f27ae9fd86c52c78516f2fa0 Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Mon, 13 Jan 2025 11:35:52 -0500 Subject: [PATCH] Fix DOMAINS REMOVE bug 1. Check queue open status in cluster thread to prevent race when checking if the queue is actively used 2. Consolidate purge and GC since they're both called in cluster thread 3. Decide whether or not to call d_teardownCb based on function pointer being nullptr or not, since d_state can be rewritten when shutdown is called after DOMAINS REMOVE 4. Remove e_REMOVING and e_REMOVED since these states are not necessary when we check if d_teardownRemoveCb is assigned to decide whether to call it 5. Change e_PREREMOVE to e_REMOVING 6. Replace the use of e_POSTREMOVE to e_STOPPED since there's a chance the state of a domain could be changed to e_POSTREMOVE after e_STOPPING in a late unregisterQueue 7. Explicitly invalidate a SharedResource of DomainManage. Commented in the code for the reason Signed-off-by: Emelia Lei --- src/groups/mqb/mqba/mqba_domainmanager.cpp | 58 +++--- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 66 ++++--- src/groups/mqb/mqbblp/mqbblp_cluster.h | 16 +- src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp | 17 +- src/groups/mqb/mqbblp/mqbblp_clusterproxy.h | 12 +- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 41 +++++ .../mqb/mqbblp/mqbblp_clusterqueuehelper.h | 2 + src/groups/mqb/mqbblp/mqbblp_domain.cpp | 76 +++----- src/groups/mqb/mqbblp/mqbblp_domain.h | 22 +-- src/groups/mqb/mqbblp/mqbblp_queue.h | 8 - src/groups/mqb/mqbc/mqbc_clusterstate.cpp | 3 +- src/groups/mqb/mqbi/mqbi_cluster.h | 10 +- src/groups/mqb/mqbi/mqbi_domain.h | 11 +- src/groups/mqb/mqbi/mqbi_queue.h | 3 - src/groups/mqb/mqbmock/mqbmock_cluster.cpp | 14 +- src/groups/mqb/mqbmock/mqbmock_cluster.h | 12 +- src/groups/mqb/mqbmock/mqbmock_domain.cpp | 12 +- src/groups/mqb/mqbmock/mqbmock_domain.h | 9 +- src/groups/mqb/mqbmock/mqbmock_queue.cpp | 7 - src/groups/mqb/mqbmock/mqbmock_queue.h | 3 - src/integration-tests/test_domain_remove.py | 170 ++++++++++++------ 21 files changed, 273 insertions(+), 299 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index ec9bb24704..19c82153a6 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -566,17 +566,21 @@ void DomainManager::stop() << k_MAX_WAIT_SECONDS_AT_SHUTDOWN << " seconds while shutting down" << " bmqbrkr. rc: " << rc << "."; - - // Note that 'self' variable will get invalidated when this function - // returns, which will ensure that any pending 'onDomainClosed' - // callbacks are not invoked. So there is no need to explicitly call - // 'self.invalidate()' here. } if (d_domainResolver_mp) { d_domainResolver_mp->stop(); d_domainResolver_mp.clear(); } + + // Notice that this invalidation is necessary. + // Without this explicit call, `self` will be invalidated + // when the function returns, which will ensure that any pending + // `onDomainClosed` callbacks are not invoked. But this is not enough + // since we want to prevent a (tiny) possibility where `latch` is + // destructed before `self` and `onDomainClosed` would be called on an + // invalid `latch`. + self.invalidate(); } int DomainManager::locateDomain(DomainSp* domain, @@ -706,7 +710,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, else if (command.isRemoveValue()) { const bsl::string& name = command.remove().domain(); - // First pass + // First round if (command.remove().finalize().isNull()) { DomainSp domainSp; @@ -717,25 +721,23 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, return -1; // RETURN } - // 1. Reject if there's any opened or opening queue + // 1. Reject if there's any open queue request on the fly + // Mark DOMAIN PREREMOVE to block openQueue requests if (!domainSp->tryRemove()) { bmqu::MemOutStream os; os << "Trying to remove the domain '" << name - << "' while there are queues opened or opening"; + << "' while there are open queue requests on the fly or " + "the domain is shutting down"; result->makeError().message() = os.str(); return -1; // RETURN } - // 2. Mark DOMAIN PREREMOVE to block openQueue requests - domainSp->removeDomainReset(); - - // 3. Purge inactive queues - // remove virtual storage; add a record in journal file + // 2. Purge and GC mqbcmd::DomainResult domainResult; mqbcmd::ClusterResult clusterResult; mqbi::Cluster* cluster = domainSp->cluster(); - cluster->purgeQueueOnDomain(&clusterResult, name); + cluster->purgeAndGCQueueOnDomain(&clusterResult, name); if (clusterResult.isErrorValue()) { result->makeError(clusterResult.error()); @@ -752,18 +754,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, clusterResult.storageResult().purgedQueues().queues(); result->makeDomainResult(domainResult); - // 4. Force GC queues - // unregister Queue from domain; - // remove queue storage from partition - mqbcmd::ClusterResult clusterForceGCResult; - int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name); - if (clusterForceGCResult.isErrorValue()) { - result->makeError(clusterForceGCResult.error()); - return -1; // RETURN - } - - // 5. Mark DOMAIN REMOVED to accecpt the second pass - + // 3. Mark DOMAIN REMOVED to accecpt the second pass bmqu::SharedResource self(this); bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC); @@ -777,18 +768,19 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, bmqsys::Time::nowMonotonicClock().addSeconds( k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE); - rc = latch.timedWait(timeout); + int rc = latch.timedWait(timeout); if (0 != rc) { BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " << k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE << " seconds. rc: " << rc << "."; - return rc; } - // 6. Mark DOMAINS REMOVE command first round as complete - domainSp->removeDomainComplete(); + // Refer to `DomainManager::stop` to see why we need to invalidate + // `self` explicitly. + self.invalidate(); + return rc; // RETURN } - // Second pass + // Second round else { DomainSp domainSp; @@ -802,7 +794,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, if (!domainSp->isRemoveComplete()) { bmqu::MemOutStream os; - os << "First pass of DOMAINS REMOVE '" << name + os << "First round of DOMAINS REMOVE '" << name << "' is not completed."; result->makeError().message() = os.str(); return -1; // RETURN @@ -823,8 +815,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, result->makeSuccess(); return 0; // RETURN } - - return 0; } bmqu::MemOutStream os; diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 96c5bd6d03..d8915ca4ae 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -3665,65 +3665,61 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result) storageResult.clusterStorageSummary(); } -int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) +void Cluster::purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) { // exected by *ANY* thread dispatcher()->execute( - bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched, + bdlf::BindUtil::bind(&Cluster::purgeAndGCQueueOnDomainDispatched, this, result, domainName), this); dispatcher()->synchronize(this); - - return 0; } -void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, - const bsl::string& domainName) +void Cluster::purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - // 'true' implies immediate + // Check if there's any live connection to a queue + if (d_clusterOrchestrator.queueHelper().hasActiveQueue(domainName)) { + BALL_LOG_ERROR << "Trying to remove the domain '" << domainName + << "' while there are queues opened or opening"; + result->makeError().message() = + "Trying to remove the domain '" + domainName + + "' while there are queues opened or opening"; + return; // RETURN + } + + // Purge queues on the given domain + mqbcmd::StorageResult storageResult; + d_storageManager_mp->purgeQueueOnDomain(&storageResult, domainName); + result->makeStorageResult(storageResult); + + if (result->isErrorValue()) { + result->makeError(result->error()); + return; // RETURN + } + + // GC queues on the given domain const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName); if (rc == -1 || rc == -3) { // TBD: We allow the node to not be an active primary for *any* // partition; this has to be changed once we allow leader != primary - BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: " - << rc << ")"; - result->makeError().message() = "Failed to execute command (rc: " + - bsl::to_string(rc) + ")"; + BALL_LOG_ERROR << "Failed to force GC queues on domain '" << domainName + << "' (rc: " << rc << ")"; + result->makeError().message() = + "Failed to force GC queues on domain '" + domainName + + "' (rc: " + bsl::to_string(rc) + ")"; } - else { - // Otherwise the command succeeded. - result->makeSuccess(); - } -} - -void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) -{ - // exected by *ANY* thread - - mqbcmd::StorageResult storageResult; - - dispatcher()->execute( - bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain, - d_storageManager_mp.get(), - &storageResult, - domainName), - this); - - dispatcher()->synchronize(this); - - result->makeStorageResult(storageResult); } void Cluster::printClusterStateSummary(bsl::ostream& out, diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 534c8eb78c..452efa3e67 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -673,18 +673,14 @@ class Cluster : public mqbi::Cluster, /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; /// Executed by dispatcher thread. - void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, - const bsl::string& domainName); + void purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName); // MANIPULATORS // (virtual: mqbnet::SessionEventProcessor) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 25d608fd69..f290f7e797 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -1342,27 +1342,14 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out) loadQueuesInfo(&clusterProxyStatus.queuesInfo()); } -void ClusterProxy::purgeQueueOnDomain( +void ClusterProxy::purgeAndGCQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) { - bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; - result->makeError().message() = os.str(); -} - -int ClusterProxy::gcQueueOnDomain( - mqbcmd::ClusterResult* result, - BSLS_ANNOTATION_UNUSED const bsl::string& domainName) -{ - // exected by *ANY* thread - bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p); bmqu::MemOutStream os(&localAllocator); - os << "GC Queue not supported on a Proxy."; + os << "Purge and GC queue not supported on a Proxy."; result->makeError().message() = os.str(); - - return 0; } // MANIPULATORS diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 096b591301..5393c4c7b0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -539,14 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Load the cluster state in the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; void getPrimaryNodes(int* rc, bsl::ostream& errorDescription, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index e088cff811..257670f972 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -6222,6 +6222,47 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate, return rc_SUCCESS; // RETURN } +bool ClusterQueueHelper::hasActiveQueue(const bsl::string& domainName) +{ + // executed by the cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); + + const mqbc::ClusterState::DomainStates& domainStates = + d_clusterState_p->domainStates(); + + DomainStatesCIter domCit = domainStates.find(domainName); + + if (domCit == domainStates.end()) { + return false; // RETURN + } + + const UriToQueueInfoMap& queuesInfoPerDomain = + domCit->second->queuesInfo(); + + for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin(); + qCit != queuesInfoPerDomain.cend(); + ++qCit) { + QueueContextMapConstIter queueContextCIt = d_queues.find( + qCit->second->uri()); + + if (queueContextCIt == d_queues.end()) { + continue; + } + + if (queueContextCIt->second->d_liveQInfo.d_inFlight != 0 || + queueContextCIt->second->d_liveQInfo + .d_numHandleCreationsInProgress != 0 || + queueContextCIt->second->d_liveQInfo.d_numQueueHandles != 0) { + return true; // RETURN + } + } + + return false; // RETURN +} + void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const { // executed by the cluster *DISPATCHER* thread diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 3907bfb137..256c5449e0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -1089,6 +1089,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL int gcExpiredQueues(bool immediate = false, const bsl::string& domainName = ""); + bool hasActiveQueue(const bsl::string& domainName); + /// Start executing multi-step processing of StopRequest or CLOSING node /// advisory received from the specified `clusterNode`. In the case of /// StopRequest the specified `request` references the request; in the diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 9955ff7f00..0e907fd1e6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -14,6 +14,7 @@ // limitations under the License. // mqbblp_domain.cpp -*-C++-*- +#include #include #include @@ -375,8 +376,7 @@ Domain::Domain(const bsl::string& name, Domain::~Domain() { - BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state || - e_POSTREMOVE == d_state) && + BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state) && "'teardown' must be called before the destructor"); } @@ -485,7 +485,7 @@ int Domain::configure(bsl::ostream& errorDescription, void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) { // PRECONDITIONS - BSLS_ASSERT_SAFE(d_state != e_STOPPING && d_state != e_STOPPED); + BSLS_ASSERT_SAFE(d_state != e_STOPPING); BSLS_ASSERT_SAFE(!d_teardownCb); BSLS_ASSERT_SAFE(teardownCb); @@ -507,7 +507,8 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) if (d_queues.empty()) { d_teardownCb(d_name); - d_state = e_STOPPED; + d_teardownCb = bsl::nullptr_t(); + d_state = e_STOPPED; return; // RETURN } @@ -519,7 +520,7 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) void Domain::teardownRemove(const TeardownCb& teardownCb) { - BSLS_ASSERT_SAFE(d_state != e_REMOVING && d_state != e_REMOVED); + // PRECONDITIONS BSLS_ASSERT_SAFE(!d_teardownRemoveCb); BSLS_ASSERT_SAFE(teardownCb); @@ -529,13 +530,13 @@ void Domain::teardownRemove(const TeardownCb& teardownCb) << d_queues.size() << " registered queues."; d_teardownRemoveCb = teardownCb; - d_state = e_REMOVING; d_cluster_sp->unregisterStateObserver(this); if (d_queues.empty()) { d_teardownRemoveCb(d_name); - d_state = e_REMOVED; + d_teardownRemoveCb = bsl::nullptr_t(); + d_state = e_STOPPED; return; // RETURN } @@ -565,8 +566,7 @@ void Domain::openQueue( bmqp_ctrlmsg::Status status; - if (d_state == e_REMOVING || d_state == e_REMOVED || - d_state == e_PREREMOVE || d_state == e_POSTREMOVE) { + if (d_state == e_REMOVING || d_state == e_STOPPED) { status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; @@ -731,20 +731,16 @@ void Domain::unregisterQueue(mqbi::Queue* queue) // Refer to note in 'teardown' routine to see why 'd_state' is updated // while 'd_mutex' is acquired. - if (d_state == e_STOPPING) { - BSLS_ASSERT_SAFE(d_teardownCb); - - if (d_queues.empty()) { + if (d_queues.empty()) { + if (d_teardownCb) { d_teardownCb(d_name); - d_state = e_STOPPED; + d_teardownCb = bsl::nullptr_t(); + d_state = e_STOPPED; } - } - else if (d_state == e_REMOVING) { - BSLS_ASSERT_SAFE(d_teardownRemoveCb); - - if (d_queues.empty()) { + if (d_teardownRemoveCb) { d_teardownRemoveCb(d_name); - d_state = e_REMOVED; + d_teardownRemoveCb = bsl::nullptr_t(); + d_state = e_STOPPED; } } } @@ -920,21 +916,6 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainReset() -{ - bslmt::LockGuard guard(&d_mutex); // LOCK - - d_state = e_PREREMOVE; - d_teardownRemoveCb = bsl::nullptr_t(); -} - -void Domain::removeDomainComplete() -{ - bslmt::LockGuard guard(&d_mutex); // LOCK - - d_state = e_POSTREMOVE; -} - // ACCESSORS int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const @@ -1030,34 +1011,29 @@ void Domain::loadRoutingConfiguration( } } -bool Domain::tryRemove() const +bool Domain::tryRemove() { bslmt::LockGuard guard(&d_mutex); // LOCK - if (d_pendingRequests != 0) { + if (d_state == e_STOPPING) { return false; } - // If there's queue in this domain, check to see if there's any active - // handle to it - if (!d_queues.empty()) { - for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { - // Looks like in RootQueueEngine::releaseHandle, queueHandle is - // removed and r/w counts reset (in `proctor.releaseHandle`) before - // substreams are unregistered; should we check substream? - // handle->subStreamInfos().size() == 0 - if (it->second->hasActiveHandle()) { - return false; - } - } + if (d_pendingRequests != 0) { + return false; } + // Reset d_teardownRemoveCb in case the first round of + // DOMAINS REMOVE fails and we want to call it again + d_state = e_REMOVING; + d_teardownRemoveCb = bsl::nullptr_t(); + return true; } bool Domain::isRemoveComplete() const { - return d_state == e_POSTREMOVE; + return d_state == e_STOPPED; } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index 093e5b5e1f..e66f4b5122 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -112,14 +112,9 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, e_STARTED = 0, e_STOPPING = 1, e_STOPPED = 2, - // Used for teardownRemove function - e_REMOVING = 3, - e_REMOVED = 4, - // Used as flags to indicate - // the start and finish of - // the first round for DOMAINS REMOVE - e_PREREMOVE = 5, - e_POSTREMOVE = 6, + // indicate the start of the + // first round of DOMAINS REMOVE + e_REMOVING = 3 }; private: @@ -337,13 +332,6 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be PREREMOVE - void removeDomainReset() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; - // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -381,8 +369,8 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, const BSLS_KEYWORD_OVERRIDE; /// Check the state of the queues in this domain, return false if there's - /// queues opened or opening. - bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + /// queues opened or opening, or if the domain is closed or closing. + bool tryRemove() BSLS_KEYWORD_OVERRIDE; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h index 4b81866c41..b3c0734454 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.h +++ b/src/groups/mqb/mqbblp/mqbblp_queue.h @@ -448,9 +448,6 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; - - /// Return true if there's queue handle and they're actively used. - bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ @@ -602,11 +599,6 @@ inline bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } -inline bool Queue::hasActiveHandle() const -{ - return d_state.handleCatalog().handlesCount() != 0; -} - } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 472e44d325..6da890132c 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -421,8 +421,9 @@ bool ClusterState::unassignQueue(const bmqt::Uri& uri) } domIt->second->queuesInfo().erase(cit); + if (domIt->second->queuesInfo().empty()) { - domIt->second->setDomain(NULL); + d_domainStates.erase(domIt); } // POSTCONDITIONS diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 894642502f..96b5367668 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -371,13 +371,9 @@ class Cluster : public DispatcherClient { /// Load the cluster state to the specified `out` object. virtual void loadClusterStatus(mqbcmd::ClusterResult* out) = 0; - /// Purge queues in this cluster on a given domain. - virtual void purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) = 0; - - /// Force GC queues in this cluster on a given domain. - virtual int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) = 0; + /// Purge and force GC queues in this cluster on a given domain. + virtual void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) = 0; // ACCESSORS diff --git a/src/groups/mqb/mqbi/mqbi_domain.h b/src/groups/mqb/mqbi/mqbi_domain.h index a80c22de9d..791bb0e918 100644 --- a/src/groups/mqb/mqbi/mqbi_domain.h +++ b/src/groups/mqb/mqbi/mqbi_domain.h @@ -185,13 +185,6 @@ class Domain { virtual int processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) = 0; - /// Mark the state of domain to be PREREMOVE - virtual void removeDomainReset() = 0; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - virtual void removeDomainComplete() = 0; - // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -229,8 +222,8 @@ class Domain { bmqp_ctrlmsg::RoutingConfiguration* config) const = 0; /// Check the state of the queues in this domain, return false if there's - /// queues opened or opening. - virtual bool tryRemove() const = 0; + /// queues opened or opening, or if the domain is closed or closing. + virtual bool tryRemove() = 0; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h index a8077172fb..cd3b7dbf67 100644 --- a/src/groups/mqb/mqbi/mqbi_queue.h +++ b/src/groups/mqb/mqbi/mqbi_queue.h @@ -956,9 +956,6 @@ class Queue : public DispatcherClient { /// Return the Schema Leaner associated with this queue. virtual bmqp::SchemaLearner& schemaLearner() const = 0; - - /// Return true if there's queue handle and they're actively used. - virtual bool hasActiveHandle() const = 0; }; // ======================== diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index 82ffc442d4..75c6adabed 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -489,25 +489,15 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* out) out->makeClusterStatus(); } -void Cluster::purgeQueueOnDomain( +void Cluster::purgeAndGCQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) { bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; + os << "MockCluster::purgeAndGCQueueOnDomain not implemented!"; result->makeError().message() = os.str(); } -int Cluster::gcQueueOnDomain( - mqbcmd::ClusterResult* result, - BSLS_ANNOTATION_UNUSED const bsl::string& domainName) -{ - bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; - result->makeError().message() = os.str(); - return -1; -} - // MANIPULATORS // (specific to mqbmock::Cluster) Cluster& Cluster::_setIsClusterMember(bool value) diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index eb36702e56..098f6e6be1 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -402,14 +402,10 @@ class Cluster : public mqbi::Cluster { /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; // MANIPULATORS // (specific to mqbmock::Cluster) diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.cpp b/src/groups/mqb/mqbmock/mqbmock_domain.cpp index c67ddcd185..57bf4209a1 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_domain.cpp @@ -155,16 +155,6 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainReset() -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); -} - -void Domain::removeDomainComplete() -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); -} - int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const { @@ -236,7 +226,7 @@ void Domain::loadRoutingConfiguration( // NOTHING } -bool Domain::tryRemove() const +bool Domain::tryRemove() { BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); return true; diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.h b/src/groups/mqb/mqbmock/mqbmock_domain.h index 7f4fe059c8..900c08008b 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.h +++ b/src/groups/mqb/mqbmock/mqbmock_domain.h @@ -211,13 +211,6 @@ class Domain : public mqbi::Domain { processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be PREREMOVE - void removeDomainReset() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; - /// Load into the specified `out`, if `out` is not 0, the queue /// corresponding to the specified `uri`, if found. Return 0 on success, /// or a non-zero return code otherwise. @@ -254,7 +247,7 @@ class Domain : public mqbi::Domain { /// Check the state of the queues in this domain, return false if there's /// queues opened or opening. - bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + bool tryRemove() BSLS_KEYWORD_OVERRIDE; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.cpp b/src/groups/mqb/mqbmock/mqbmock_queue.cpp index 0c6963c038..98eaf2ca24 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queue.cpp @@ -527,13 +527,6 @@ bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } -bool Queue::hasActiveHandle() const -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); - - return false; -} - // ------------------- // class HandleFactory // ------------------- diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h index 932f9555a0..1c80082eb3 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.h +++ b/src/groups/mqb/mqbmock/mqbmock_queue.h @@ -430,9 +430,6 @@ class Queue : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; - /// Return true if there's queue handle and they're actively used. - bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; - // ACCESSORS // (specific to mqbi::MockQueue) }; diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 57358f457e..68deabe9c6 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -50,47 +50,6 @@ def write_messages(proxy, uri, n_msgs=5, do_confirm=True): consumer.close(uri, succeed=True) -def test_remove_domain_with_queue_closed(cluster: Cluster): - """ - send DOMAINS REMOVE command after both queue closed - command should succeed - """ - proxies = cluster.proxy_cycle() - proxy = next(proxies) - - # producer and consumer open the queue, - # post and confirm messages and both close - write_messages(proxy, tc.URI_PRIORITY) - - # send remove domain admin command - admin = AdminClient() - leader = cluster.last_known_leader - admin.connect(leader.config.host, int(leader.config.port)) - res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "Purged 0 message(s)" in res - - -def test_remove_domain_with_queue_open(cluster: Cluster): - """ - send DOMAINS REMOVE command with a queue still open - command should fail - """ - proxies = cluster.proxy_cycle() - proxy = next(proxies) - - uri = tc.URI_PRIORITY - producer = proxy.create_client("producer") - producer.open(uri, flags=["write"], succeed=True) - producer.post(uri, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True) - - # send remove domain admin command - admin = AdminClient() - leader = cluster.last_known_leader - admin.connect(leader.config.host, int(leader.config.port)) - res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "while there are queues open" in res - - def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): """ send DOMAINS REMOVE command when the cluster is not healthy @@ -235,16 +194,14 @@ def test_open_queue_after_remove_domain(cluster: Cluster): assert producer.open(uri, flags=["write"], block=True) != Client.e_SUCCESS -def test_remove_domain_with_queue_open(cluster: Cluster): +def test_remove_domain_with_producer_queue_open(cluster: Cluster): """ - issue DOMAINS REMOVE command when both producer and consumer close connections, - both open, or one of them has the connection open + issue DOMAINS REMOVE command when consumer closes connection """ proxies = cluster.proxy_cycle() proxy = next(proxies) # producer produces messages and consumer confirms - # then both close connections producer = proxy.create_client("producer") producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) @@ -259,41 +216,148 @@ def test_remove_domain_with_queue_open(cluster: Cluster): ) consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + # consumer closes connection + consumer.close(tc.URI_PRIORITY, succeed=True) + # send admin command - # when both producer and consumer open admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # close producer and send the command again + +def test_remove_domain_with_consumer_queue_open(cluster: Cluster): + """ + issue DOMAINS REMOVE command when producer closes connection + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # producer closes connection producer.close(tc.URI_PRIORITY, succeed=True) + + # send admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # open producer and close consumer and send the command again + +def test_remove_domain_with_both_queue_open_and_closed(cluster: Cluster): + """ + issue DOMAINS REMOVE command when both producer and consumer have queue open + and both have queue closed + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # send admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert ( + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" + in res + ) + + # close connections and try again + producer.close(tc.URI_PRIORITY, succeed=True) consumer.close(tc.URI_PRIORITY, succeed=True) + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 0 message(s)" in res + + +def test_try_open_removed_domain(cluster: Cluster): + """ + 1. producer send messages and consumer confirms + 2. send DOMAINS REMOVE admin command + 3. close both producer and consumer + 4. try open both, and they should all fail + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # send admin command + # when both producer and consumer open + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # close both and send the command again + # close producer and send the command again producer.close(tc.URI_PRIORITY, succeed=True) + consumer.close(tc.URI_PRIORITY, succeed=True) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "while there are queues open" not in res + assert "Purged 0 message(s)" in res + + # try open producer and consumer again + assert producer.open(tc.URI_PRIORITY, flags=["write"], block=True) < 0 + assert consumer.open(tc.URI_PRIORITY, flags=["read"], block=True) < 0 def test_remove_domain_with_unconfirmed_message(cluster: Cluster): + """ + issue DOMAINS REMOVE command with unconfirmed messages + """ proxies = cluster.proxy_cycle() proxy = next(proxies)