Skip to content

Commit

Permalink
Fix DOMAINS REMOVE bug
Browse files Browse the repository at this point in the history
1. Check queue open status in cluster thread to prevent race when checking
if the queue is actively used
2. Consolidate purge and GC since they're both called in cluster thread
3. Decide whether or not to call d_teardownCb based on function pointer
being nullptr or not, since d_state can be rewritten when shutdown is
called after DOMAINS REMOVE
4. Remove e_REMOVING and e_REMOVED since these states are not necessary
when we check if d_teardownRemoveCb is assigned to decide whether to call it
5. Change e_PREREMOVE to e_REMOVING
6. Replace the use of e_POSTREMOVE to e_STOPPED since there's a chance
the state of a domain could be changed to e_POSTREMOVE after e_STOPPING
in a late unregisterQueue
7. Explicitly invalidate a SharedResource of DomainManage. Commented in the
code for the reason

Signed-off-by: Emelia Lei <[email protected]>
  • Loading branch information
emelialei88 committed Jan 16, 2025
1 parent f8088c9 commit 41f9b5b
Show file tree
Hide file tree
Showing 21 changed files with 273 additions and 299 deletions.
58 changes: 24 additions & 34 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,17 +566,21 @@ void DomainManager::stop()
<< k_MAX_WAIT_SECONDS_AT_SHUTDOWN
<< " seconds while shutting down"
<< " bmqbrkr. rc: " << rc << ".";

// Note that 'self' variable will get invalidated when this function
// returns, which will ensure that any pending 'onDomainClosed'
// callbacks are not invoked. So there is no need to explicitly call
// 'self.invalidate()' here.
}

if (d_domainResolver_mp) {
d_domainResolver_mp->stop();
d_domainResolver_mp.clear();
}

// Notice that this invalidation is necessary.
// Without this explicit call, `self` will be invalidated
// when the function returns, which will ensure that any pending
// `onDomainClosed` callbacks are not invoked. But this is not enough
// since we want to prevent a (tiny) possibility where `latch` is
// destructed before `self` and `onDomainClosed` would be called on an
// invalid `latch`.
self.invalidate();
}

int DomainManager::locateDomain(DomainSp* domain,
Expand Down Expand Up @@ -706,7 +710,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
else if (command.isRemoveValue()) {
const bsl::string& name = command.remove().domain();

// First pass
// First round
if (command.remove().finalize().isNull()) {
DomainSp domainSp;

Expand All @@ -717,25 +721,23 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
return -1; // RETURN
}

// 1. Reject if there's any opened or opening queue
// 1. Reject if there's any open queue request on the fly
// Mark DOMAIN PREREMOVE to block openQueue requests
if (!domainSp->tryRemove()) {
bmqu::MemOutStream os;
os << "Trying to remove the domain '" << name
<< "' while there are queues opened or opening";
<< "' while there are open queue requests on the fly or "
"the domain is shutting down";
result->makeError().message() = os.str();
return -1; // RETURN
}

// 2. Mark DOMAIN PREREMOVE to block openQueue requests
domainSp->removeDomainReset();

// 3. Purge inactive queues
// remove virtual storage; add a record in journal file
// 2. Purge and GC
mqbcmd::DomainResult domainResult;
mqbcmd::ClusterResult clusterResult;
mqbi::Cluster* cluster = domainSp->cluster();

cluster->purgeQueueOnDomain(&clusterResult, name);
cluster->purgeAndGCQueueOnDomain(&clusterResult, name);

if (clusterResult.isErrorValue()) {
result->makeError(clusterResult.error());
Expand All @@ -752,18 +754,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
clusterResult.storageResult().purgedQueues().queues();
result->makeDomainResult(domainResult);

// 4. Force GC queues
// unregister Queue from domain;
// remove queue storage from partition
mqbcmd::ClusterResult clusterForceGCResult;
int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
}

// 5. Mark DOMAIN REMOVED to accecpt the second pass

// 3. Mark DOMAIN REMOVED to accecpt the second pass
bmqu::SharedResource<DomainManager> self(this);
bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC);

Expand All @@ -777,18 +768,19 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
bmqsys::Time::nowMonotonicClock().addSeconds(
k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE);

rc = latch.timedWait(timeout);
int rc = latch.timedWait(timeout);
if (0 != rc) {
BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in "
<< k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE
<< " seconds. rc: " << rc << ".";
return rc;
}

// 6. Mark DOMAINS REMOVE command first round as complete
domainSp->removeDomainComplete();
// Refer to `DomainManager::stop` to see why we need to invalidate
// `self` explicitly.
self.invalidate();
return rc; // RETURN
}
// Second pass
// Second round
else {
DomainSp domainSp;

Expand All @@ -802,7 +794,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,

if (!domainSp->isRemoveComplete()) {
bmqu::MemOutStream os;
os << "First pass of DOMAINS REMOVE '" << name
os << "First round of DOMAINS REMOVE '" << name
<< "' is not completed.";
result->makeError().message() = os.str();
return -1; // RETURN
Expand All @@ -823,8 +815,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
result->makeSuccess();
return 0; // RETURN
}

return 0;
}

bmqu::MemOutStream os;
Expand Down
66 changes: 31 additions & 35 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3665,65 +3665,61 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result)
storageResult.clusterStorageSummary();
}

int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

dispatcher()->execute(
bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched,
bdlf::BindUtil::bind(&Cluster::purgeAndGCQueueOnDomainDispatched,
this,
result,
domainName),
this);

dispatcher()->synchronize(this);

return 0;
}

void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// executed by the *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

// 'true' implies immediate
// Check if there's any live connection to a queue
if (d_clusterOrchestrator.queueHelper().hasActiveQueue(domainName)) {
BALL_LOG_ERROR << "Trying to remove the domain '" << domainName
<< "' while there are queues opened or opening";
result->makeError().message() =
"Trying to remove the domain '" + domainName +
"' while there are queues opened or opening";
return; // RETURN
}

// Purge queues on the given domain
mqbcmd::StorageResult storageResult;
d_storageManager_mp->purgeQueueOnDomain(&storageResult, domainName);
result->makeStorageResult(storageResult);

if (result->isErrorValue()) {
result->makeError(result->error());
return; // RETURN
}

// GC queues on the given domain
const int rc =
d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName);
if (rc == -1 || rc == -3) {
// TBD: We allow the node to not be an active primary for *any*
// partition; this has to be changed once we allow leader != primary
BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: "
<< rc << ")";
result->makeError().message() = "Failed to execute command (rc: " +
bsl::to_string(rc) + ")";
BALL_LOG_ERROR << "Failed to force GC queues on domain '" << domainName
<< "' (rc: " << rc << ")";
result->makeError().message() =
"Failed to force GC queues on domain '" + domainName +
"' (rc: " + bsl::to_string(rc) + ")";
}
else {
// Otherwise the command succeeded.
result->makeSuccess();
}
}

void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

mqbcmd::StorageResult storageResult;

dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain,
d_storageManager_mp.get(),
&storageResult,
domainName),
this);

dispatcher()->synchronize(this);

result->makeStorageResult(storageResult);
}

void Cluster::printClusterStateSummary(bsl::ostream& out,
Expand Down
16 changes: 6 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,18 +673,14 @@ class Cluster : public mqbi::Cluster,
/// Load the cluster state to the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

/// Executed by dispatcher thread.
void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);
void purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);

// MANIPULATORS
// (virtual: mqbnet::SessionEventProcessor)
Expand Down
17 changes: 2 additions & 15 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1342,27 +1342,14 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out)
loadQueuesInfo(&clusterProxyStatus.queuesInfo());
}

void ClusterProxy::purgeQueueOnDomain(
void ClusterProxy::purgeAndGCQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
bmqu::MemOutStream os;
os << "MockCluster::gcQueueOnDomain not implemented!";
result->makeError().message() = os.str();
}

int ClusterProxy::gcQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
// exected by *ANY* thread

bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "GC Queue not supported on a Proxy.";
os << "Purge and GC queue not supported on a Proxy.";
result->makeError().message() = os.str();

return 0;
}

// MANIPULATORS
Expand Down
12 changes: 4 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

void getPrimaryNodes(int* rc,
bsl::ostream& errorDescription,
Expand Down
41 changes: 41 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6222,6 +6222,47 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
return rc_SUCCESS; // RETURN
}

bool ClusterQueueHelper::hasActiveQueue(const bsl::string& domainName)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));

const mqbc::ClusterState::DomainStates& domainStates =
d_clusterState_p->domainStates();

DomainStatesCIter domCit = domainStates.find(domainName);

if (domCit == domainStates.end()) {
return false; // RETURN
}

const UriToQueueInfoMap& queuesInfoPerDomain =
domCit->second->queuesInfo();

for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin();
qCit != queuesInfoPerDomain.cend();
++qCit) {
QueueContextMapConstIter queueContextCIt = d_queues.find(
qCit->second->uri());

if (queueContextCIt == d_queues.end()) {
continue;
}

if (queueContextCIt->second->d_liveQInfo.d_inFlight != 0 ||
queueContextCIt->second->d_liveQInfo
.d_numHandleCreationsInProgress != 0 ||
queueContextCIt->second->d_liveQInfo.d_numQueueHandles != 0) {
return true; // RETURN
}
}

return false; // RETURN
}

void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const
{
// executed by the cluster *DISPATCHER* thread
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
int gcExpiredQueues(bool immediate = false,
const bsl::string& domainName = "");

bool hasActiveQueue(const bsl::string& domainName);

/// Start executing multi-step processing of StopRequest or CLOSING node
/// advisory received from the specified `clusterNode`. In the case of
/// StopRequest the specified `request` references the request; in the
Expand Down
Loading

0 comments on commit 41f9b5b

Please sign in to comment.