Skip to content

Commit

Permalink
Fix DOMAINS REMOVE bug
Browse files Browse the repository at this point in the history
Check queue open status in cluster thread to prevent race
Decide whether or not to call d_teardownCb based on function pointer
being nullptr or not

Signed-off-by: Emelia Lei <[email protected]>
  • Loading branch information
emelialei88 committed Jan 13, 2025
1 parent e19ff33 commit 8222790
Show file tree
Hide file tree
Showing 21 changed files with 235 additions and 233 deletions.
30 changes: 8 additions & 22 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,25 +717,22 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
return -1; // RETURN
}

// 1. Reject if there's any opened or opening queue
// 1. Reject if there's any open queue request on the fly
// Mark DOMAIN PREREMOVE to block openQueue requests
if (!domainSp->tryRemove()) {
bmqu::MemOutStream os;
os << "Trying to remove the domain '" << name
<< "' while there are queues opened or opening";
<< "' while there are open queue requests on the fly";
result->makeError().message() = os.str();
return -1; // RETURN
}

// 2. Mark DOMAIN PREREMOVE to block openQueue requests
domainSp->removeDomainReset();

// 3. Purge inactive queues
// remove virtual storage; add a record in journal file
// 2. Purge and GC
mqbcmd::DomainResult domainResult;
mqbcmd::ClusterResult clusterResult;
mqbi::Cluster* cluster = domainSp->cluster();

cluster->purgeQueueOnDomain(&clusterResult, name);
cluster->purgeAndGCQueueOnDomain(&clusterResult, name);

if (clusterResult.isErrorValue()) {
result->makeError(clusterResult.error());
Expand All @@ -752,18 +749,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
clusterResult.storageResult().purgedQueues().queues();
result->makeDomainResult(domainResult);

// 4. Force GC queues
// unregister Queue from domain;
// remove queue storage from partition
mqbcmd::ClusterResult clusterForceGCResult;
int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
}

// 5. Mark DOMAIN REMOVED to accecpt the second pass

// 3. Mark DOMAIN REMOVED to accecpt the second pass
bmqu::SharedResource<DomainManager> self(this);
bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC);

Expand All @@ -777,15 +763,15 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
bmqsys::Time::nowMonotonicClock().addSeconds(
k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE);

rc = latch.timedWait(timeout);
int rc = latch.timedWait(timeout);
if (0 != rc) {
BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in "
<< k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE
<< " seconds. rc: " << rc << ".";
return rc;
}

// 6. Mark DOMAINS REMOVE command first round as complete
// 4. Mark DOMAINS REMOVE command first round as complete
domainSp->removeDomainComplete();
}
// Second pass
Expand Down
66 changes: 31 additions & 35 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3665,65 +3665,61 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result)
storageResult.clusterStorageSummary();
}

int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

dispatcher()->execute(
bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched,
bdlf::BindUtil::bind(&Cluster::purgeAndGCQueueOnDomainDispatched,
this,
result,
domainName),
this);

dispatcher()->synchronize(this);

return 0;
}

void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// executed by the *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

// 'true' implies immediate
// Check if there's any live connection to a queue
if (d_clusterOrchestrator.queueHelper().hasActiveQueue(domainName)) {
BALL_LOG_ERROR << "Trying to remove the domain '" << domainName
<< "' while there are queues opened or opening";
result->makeError().message() =
"Trying to remove the domain '" + domainName +
"' while there are queues opened or opening";
return; // RETURN
}

// Purge queues on the given domain
mqbcmd::StorageResult storageResult;
d_storageManager_mp->purgeQueueOnDomain(&storageResult, domainName);
result->makeStorageResult(storageResult);

if (result->isErrorValue()) {
result->makeError(result->error());
return; // RETURN
}

// GC queues on the given domain
const int rc =
d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName);
if (rc == -1 || rc == -3) {
// TBD: We allow the node to not be an active primary for *any*
// partition; this has to be changed once we allow leader != primary
BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: "
<< rc << ")";
result->makeError().message() = "Failed to execute command (rc: " +
bsl::to_string(rc) + ")";
BALL_LOG_ERROR << "Failed to force GC queues on domain '" << domainName
<< "' (rc: " << rc << ")";
result->makeError().message() =
"Failed to force GC queues on domain '" + domainName +
"' (rc: " + bsl::to_string(rc) + ")";
}
else {
// Otherwise the command succeeded.
result->makeSuccess();
}
}

void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

mqbcmd::StorageResult storageResult;

dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain,
d_storageManager_mp.get(),
&storageResult,
domainName),
this);

dispatcher()->synchronize(this);

result->makeStorageResult(storageResult);
}

void Cluster::printClusterStateSummary(bsl::ostream& out,
Expand Down
16 changes: 6 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,18 +673,14 @@ class Cluster : public mqbi::Cluster,
/// Load the cluster state to the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

/// Executed by dispatcher thread.
void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);
void purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);

// MANIPULATORS
// (virtual: mqbnet::SessionEventProcessor)
Expand Down
17 changes: 2 additions & 15 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1342,27 +1342,14 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out)
loadQueuesInfo(&clusterProxyStatus.queuesInfo());
}

void ClusterProxy::purgeQueueOnDomain(
void ClusterProxy::purgeAndGCQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
bmqu::MemOutStream os;
os << "MockCluster::gcQueueOnDomain not implemented!";
result->makeError().message() = os.str();
}

int ClusterProxy::gcQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
// exected by *ANY* thread

bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "GC Queue not supported on a Proxy.";
os << "Purge and GC queue not supported on a Proxy.";
result->makeError().message() = os.str();

return 0;
}

// MANIPULATORS
Expand Down
12 changes: 4 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

void getPrimaryNodes(int* rc,
bsl::ostream& errorDescription,
Expand Down
35 changes: 35 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6222,6 +6222,41 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
return rc_SUCCESS; // RETURN
}

bool ClusterQueueHelper::hasActiveQueue(const bsl::string& domainName)
{
const mqbc::ClusterState::DomainStates& domainStates =
d_clusterState_p->domainStates();

DomainStatesCIter domCit = domainStates.find(domainName);

if (domCit == domainStates.end()) {
return false; // RETURN
}

const UriToQueueInfoMap& queuesInfoPerDomain =
domCit->second->queuesInfo();

for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin();
qCit != queuesInfoPerDomain.cend();
++qCit) {
QueueContextMapIter queueContextIt = d_queues.find(
qCit->second->uri());

if (queueContextIt == d_queues.end()) {
continue;
}

if (queueContextIt->second->d_liveQInfo.d_inFlight != 0 ||
queueContextIt->second->d_liveQInfo
.d_numHandleCreationsInProgress != 0 ||
queueContextIt->second->d_liveQInfo.d_numQueueHandles != 0) {
return true; // RETURN
}
}

return false; // RETURN
}

void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const
{
// executed by the cluster *DISPATCHER* thread
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
int gcExpiredQueues(bool immediate = false,
const bsl::string& domainName = "");

bool hasActiveQueue(const bsl::string& domainName);

/// Start executing multi-step processing of StopRequest or CLOSING node
/// advisory received from the specified `clusterNode`. In the case of
/// StopRequest the specified `request` references the request; in the
Expand Down
Loading

0 comments on commit 8222790

Please sign in to comment.