Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX[MQB]: race in admin domain remove #567

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 24 additions & 34 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,17 +566,21 @@ void DomainManager::stop()
<< k_MAX_WAIT_SECONDS_AT_SHUTDOWN
<< " seconds while shutting down"
<< " bmqbrkr. rc: " << rc << ".";

// Note that 'self' variable will get invalidated when this function
// returns, which will ensure that any pending 'onDomainClosed'
// callbacks are not invoked. So there is no need to explicitly call
// 'self.invalidate()' here.
}

if (d_domainResolver_mp) {
d_domainResolver_mp->stop();
d_domainResolver_mp.clear();
}

// Notice that this invalidation is necessary.
// Without this explicit call, `self` will be invalidated
// when the function returns, which will ensure that any pending
// `onDomainClosed` callbacks are not invoked. But this is not enough
// since we want to prevent a (tiny) possibility where `latch` is
// destructed before `self` and `onDomainClosed` would be called on an
// invalid `latch`.
self.invalidate();
}

int DomainManager::locateDomain(DomainSp* domain,
Expand Down Expand Up @@ -706,7 +710,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
else if (command.isRemoveValue()) {
const bsl::string& name = command.remove().domain();

// First pass
// First round
if (command.remove().finalize().isNull()) {
DomainSp domainSp;

Expand All @@ -717,25 +721,23 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
return -1; // RETURN
}

// 1. Reject if there's any opened or opening queue
// 1. Reject if there's any open queue request on the fly
// Mark DOMAIN PREREMOVE to block openQueue requests
if (!domainSp->tryRemove()) {
bmqu::MemOutStream os;
os << "Trying to remove the domain '" << name
<< "' while there are queues opened or opening";
<< "' while there are open queue requests on the fly or "
"the domain is shutting down";
result->makeError().message() = os.str();
return -1; // RETURN
}

// 2. Mark DOMAIN PREREMOVE to block openQueue requests
domainSp->removeDomainReset();

// 3. Purge inactive queues
// remove virtual storage; add a record in journal file
// 2. Purge and GC
mqbcmd::DomainResult domainResult;
mqbcmd::ClusterResult clusterResult;
mqbi::Cluster* cluster = domainSp->cluster();

cluster->purgeQueueOnDomain(&clusterResult, name);
cluster->purgeAndGCQueueOnDomain(&clusterResult, name);

if (clusterResult.isErrorValue()) {
result->makeError(clusterResult.error());
Expand All @@ -752,18 +754,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
clusterResult.storageResult().purgedQueues().queues();
result->makeDomainResult(domainResult);

// 4. Force GC queues
// unregister Queue from domain;
// remove queue storage from partition
mqbcmd::ClusterResult clusterForceGCResult;
int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
}

// 5. Mark DOMAIN REMOVED to accecpt the second pass

// 3. Mark DOMAIN REMOVED to accecpt the second pass
bmqu::SharedResource<DomainManager> self(this);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful with bmqu::SharedResource. Calling self.acquire() followed by destructing/invalidating without releasing the shared_ptr will result in a deadlock.

bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC);

Expand All @@ -777,18 +768,19 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
bmqsys::Time::nowMonotonicClock().addSeconds(
k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE);

rc = latch.timedWait(timeout);
int rc = latch.timedWait(timeout);
if (0 != rc) {
pniedzielski marked this conversation as resolved.
Show resolved Hide resolved
BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in "
<< k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE
<< " seconds. rc: " << rc << ".";
return rc;
}

// 6. Mark DOMAINS REMOVE command first round as complete
domainSp->removeDomainComplete();
// Refer to `DomainManager::stop` to see why we need to invalidate
// `self` explicitly.
self.invalidate();
return rc; // RETURN
}
// Second pass
// Second round
else {
DomainSp domainSp;

Expand All @@ -802,7 +794,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,

if (!domainSp->isRemoveComplete()) {
bmqu::MemOutStream os;
os << "First pass of DOMAINS REMOVE '" << name
os << "First round of DOMAINS REMOVE '" << name
<< "' is not completed.";
result->makeError().message() = os.str();
return -1; // RETURN
Expand All @@ -823,8 +815,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
result->makeSuccess();
return 0; // RETURN
}

return 0;
}

bmqu::MemOutStream os;
Expand Down
66 changes: 31 additions & 35 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3665,65 +3665,61 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result)
storageResult.clusterStorageSummary();
}

int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

dispatcher()->execute(
bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched,
bdlf::BindUtil::bind(&Cluster::purgeAndGCQueueOnDomainDispatched,
this,
result,
domainName),
this);

dispatcher()->synchronize(this);

return 0;
}

void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// executed by the *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

// 'true' implies immediate
// Check if there's any live connection to a queue
if (d_clusterOrchestrator.queueHelper().hasActiveQueue(domainName)) {
BALL_LOG_ERROR << "Trying to remove the domain '" << domainName
<< "' while there are queues opened or opening";
result->makeError().message() =
"Trying to remove the domain '" + domainName +
"' while there are queues opened or opening";
return; // RETURN
}

// Purge queues on the given domain
mqbcmd::StorageResult storageResult;
d_storageManager_mp->purgeQueueOnDomain(&storageResult, domainName);
result->makeStorageResult(storageResult);

if (result->isErrorValue()) {
result->makeError(result->error());
return; // RETURN
}

// GC queues on the given domain
const int rc =
d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName);
if (rc == -1 || rc == -3) {
// TBD: We allow the node to not be an active primary for *any*
// partition; this has to be changed once we allow leader != primary
BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: "
<< rc << ")";
result->makeError().message() = "Failed to execute command (rc: " +
bsl::to_string(rc) + ")";
BALL_LOG_ERROR << "Failed to force GC queues on domain '" << domainName
<< "' (rc: " << rc << ")";
result->makeError().message() =
"Failed to force GC queues on domain '" + domainName +
"' (rc: " + bsl::to_string(rc) + ")";
}
else {
// Otherwise the command succeeded.
result->makeSuccess();
}
}

void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

mqbcmd::StorageResult storageResult;

dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain,
d_storageManager_mp.get(),
&storageResult,
domainName),
this);

dispatcher()->synchronize(this);

result->makeStorageResult(storageResult);
}

void Cluster::printClusterStateSummary(bsl::ostream& out,
Expand Down
16 changes: 6 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,18 +673,14 @@ class Cluster : public mqbi::Cluster,
/// Load the cluster state to the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

/// Executed by dispatcher thread.
void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);
void purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);

// MANIPULATORS
// (virtual: mqbnet::SessionEventProcessor)
Expand Down
17 changes: 2 additions & 15 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1342,27 +1342,14 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out)
loadQueuesInfo(&clusterProxyStatus.queuesInfo());
}

void ClusterProxy::purgeQueueOnDomain(
void ClusterProxy::purgeAndGCQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
bmqu::MemOutStream os;
os << "MockCluster::gcQueueOnDomain not implemented!";
result->makeError().message() = os.str();
}

int ClusterProxy::gcQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
// exected by *ANY* thread

bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "GC Queue not supported on a Proxy.";
os << "Purge and GC queue not supported on a Proxy.";
result->makeError().message() = os.str();

return 0;
}

// MANIPULATORS
Expand Down
12 changes: 4 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

void getPrimaryNodes(int* rc,
bsl::ostream& errorDescription,
Expand Down
41 changes: 41 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6222,6 +6222,47 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
return rc_SUCCESS; // RETURN
}

bool ClusterQueueHelper::hasActiveQueue(const bsl::string& domainName)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));

const mqbc::ClusterState::DomainStates& domainStates =
emelialei88 marked this conversation as resolved.
Show resolved Hide resolved
d_clusterState_p->domainStates();

DomainStatesCIter domCit = domainStates.find(domainName);

if (domCit == domainStates.end()) {
return false; // RETURN
}

const UriToQueueInfoMap& queuesInfoPerDomain =
domCit->second->queuesInfo();

for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin();
qCit != queuesInfoPerDomain.cend();
++qCit) {
QueueContextMapConstIter queueContextCIt = d_queues.find(
qCit->second->uri());

if (queueContextCIt == d_queues.end()) {
continue;
}

if (queueContextCIt->second->d_liveQInfo.d_inFlight != 0 ||
queueContextCIt->second->d_liveQInfo
.d_numHandleCreationsInProgress != 0 ||
queueContextCIt->second->d_liveQInfo.d_numQueueHandles != 0) {
return true; // RETURN
}
}

return false; // RETURN
}

void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const
{
// executed by the cluster *DISPATCHER* thread
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
int gcExpiredQueues(bool immediate = false,
const bsl::string& domainName = "");

bool hasActiveQueue(const bsl::string& domainName);

/// Start executing multi-step processing of StopRequest or CLOSING node
/// advisory received from the specified `clusterNode`. In the case of
/// StopRequest the specified `request` references the request; in the
Expand Down
Loading
Loading