Skip to content

Commit

Permalink
mqbc::StorageUtil, mqbi::StorageMgr: updateQueue -> updateQueuePrimary (
Browse files Browse the repository at this point in the history
#466)

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Oct 16, 2024
1 parent 71b4dab commit 32046aa
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 140 deletions.
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1913,7 +1913,7 @@ void RootQueueEngine::afterAppIdRegistered(

BSLS_ASSERT_SAFE(!key.isNull());

d_queueState_p->storageManager()->updateQueue(
d_queueState_p->storageManager()->updateQueuePrimary(
d_queueState_p->uri(),
d_queueState_p->key(),
d_queueState_p->partitionId(),
Expand Down Expand Up @@ -1966,16 +1966,16 @@ void RootQueueEngine::afterAppIdUnregistered(
}
}

d_queueState_p->storageManager()->updateQueue(
d_queueState_p->storageManager()->updateQueuePrimary(
d_queueState_p->uri(),
d_queueState_p->key(),
d_queueState_p->partitionId(),
mqbi::Storage::AppIdKeyPairs(),
mqbi::Storage::AppIdKeyPairs(1,
mqbi::Storage::AppIdKeyPair(appId,
appKey)));
// No need to log in case of failure because 'updateQueue' does it (even in
// case of success FTM).
// No need to log in case of failure because 'updateQueuePrimary' does it
// (even in case of success FTM).

d_consumptionMonitor.unregisterSubStream(appKey);
}
Expand Down
12 changes: 6 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,11 +1110,11 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId)
d_fileStores[partitionId]->dispatchEvent(queueEvent);
}

int StorageManager::updateQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs)
int StorageManager::updateQueuePrimary(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'

Expand All @@ -1123,7 +1123,7 @@ int StorageManager::updateQueue(const bmqt::Uri& uri,
partitionId < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread());

return mqbc::StorageUtil::updateQueue(
return mqbc::StorageUtil::updateQueuePrimary(
&d_storages[partitionId],
&d_storagesLock,
d_fileStores[partitionId].get(),
Expand Down
12 changes: 6 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,12 @@ class StorageManager : public mqbi::StorageManager {
/// queue is configured in fanout mode.
///
/// THREAD: Executed by the Queue's dispatcher thread.
virtual int
updateQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE;
virtual int updateQueuePrimary(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs)
BSLS_KEYWORD_OVERRIDE;

virtual void
registerQueueReplica(int partitionId,
Expand Down
35 changes: 18 additions & 17 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3718,11 +3718,11 @@ void StorageManager::unregisterQueue(const bmqt::Uri& uri, int partitionId)
d_fileStores[partitionId]->dispatchEvent(queueEvent);
}

int StorageManager::updateQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs)
int StorageManager::updateQueuePrimary(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'

Expand All @@ -3731,18 +3731,19 @@ int StorageManager::updateQueue(const bmqt::Uri& uri,
partitionId < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread());

return StorageUtil::updateQueue(&d_storages[partitionId],
&d_storagesLock,
d_fileStores[partitionId].get(),
&d_appKeysVec[partitionId],
&d_appKeysLock,
d_clusterData_p->identity().description(),
uri,
queueKey,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
true); // isCSLMode
return StorageUtil::updateQueuePrimary(
&d_storages[partitionId],
&d_storagesLock,
d_fileStores[partitionId].get(),
&d_appKeysVec[partitionId],
&d_appKeysLock,
d_clusterData_p->identity().description(),
uri,
queueKey,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
true); // isCSLMode
}

void StorageManager::registerQueueReplica(int partitionId,
Expand Down
12 changes: 6 additions & 6 deletions src/groups/mqb/mqbc/mqbc_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -875,12 +875,12 @@ class StorageManager
/// queue is configured in fanout mode.
///
/// THREAD: Executed by the Queue's dispatcher thread.
virtual int
updateQueue(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs) BSLS_KEYWORD_OVERRIDE;
virtual int updateQueuePrimary(const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs)
BSLS_KEYWORD_OVERRIDE;

virtual void
registerQueueReplica(int partitionId,
Expand Down
108 changes: 54 additions & 54 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ void StorageUtil::registerQueueDispatched(
<< storage->queueKey() << "] with the storage as primary.";
}

void StorageUtil::updateQueueDispatched(
void StorageUtil::updateQueuePrimaryDispatched(
BSLS_ANNOTATION_UNUSED const mqbi::Dispatcher::ProcessorHandle& processor,
mqbs::ReplicatedStorage* storage,
bslmt::Mutex* storagesLock,
Expand All @@ -277,29 +277,29 @@ void StorageUtil::updateQueueDispatched(

bslmt::LockGuard<bslmt::Mutex> guard(storagesLock); // LOCK

// Simply forward to 'updateQueueRaw'.
updateQueueRaw(storage,
fs,
appKeys,
appKeysLock,
clusterDescription,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
isFanout,
isCSLMode);
// Simply forward to 'updateQueuePrimaryRaw'.
updateQueuePrimaryRaw(storage,
fs,
appKeys,
appKeysLock,
clusterDescription,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
isFanout,
isCSLMode);
}

int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isFanout,
bool isCSLMode)
int StorageUtil::updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isFanout,
bool isCSLMode)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'

Expand Down Expand Up @@ -2474,16 +2474,16 @@ void StorageUtil::registerQueue(
}

// Some AppId/Key pairs need to be updated. Invoke
// 'updateQueueDispatched' in the right thread to carry out the
// addition/removal of those pairs.
// 'updateQueuePrimaryDispatched' in the right thread to carry out
// the addition/removal of those pairs.

mqbi::DispatcherEvent* queueEvent = dispatcher->getEvent(
mqbi::DispatcherClientType::e_QUEUE);

(*queueEvent)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(bdlf::BindUtil::bind(
updateQueueDispatched,
updateQueuePrimaryDispatched,
bdlf::PlaceHolders::_1, // processor
storageSp.get(),
storagesLock,
Expand All @@ -2501,13 +2501,13 @@ void StorageUtil::registerQueue(
mqbi::DispatcherClientType::e_QUEUE,
processor);

// Wait for 'updateQueueDispatched' operation to complete. We need
// to wait because 'updateQueueDispatched' creates virtual storages
// corresponding to 'addedAppIdKeyPairs' (if any), and the caller
// of 'registerQueue' expects these virtual storages to be created
// this routine or its caller returns. Before waiting, release the
// 'storagesLock' guard and unlock it to avoid any deadlock b/w
// cluster and partition dispatcher threads.
// Wait for 'updateQueuePrimaryDispatched' operation to complete.
// We need to wait because 'updateQueuePrimaryDispatched' creates
// virtual storages corresponding to 'addedAppIdKeyPairs' (if any),
// and the caller of 'registerQueue' expects these virtual storages
// to be created this routine or its caller returns. Before
// waiting, release the 'storagesLock' guard and unlock it to avoid
// any deadlock b/w cluster and partition dispatcher threads.

guard.release()->unlock();

Expand Down Expand Up @@ -2755,18 +2755,18 @@ void StorageUtil::unregisterQueueDispatched(
fs->dispatcherFlush(true, false);
}

int StorageUtil::updateQueue(StorageSpMap* storageMap,
bslmt::Mutex* storagesLock,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isCSLMode)
int StorageUtil::updateQueuePrimary(StorageSpMap* storageMap,
bslmt::Mutex* storagesLock,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isCSLMode)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'

Expand Down Expand Up @@ -2801,16 +2801,16 @@ int StorageUtil::updateQueue(StorageSpMap* storageMap,
BSLS_ASSERT_SAFE(storageSp->partitionId() == partitionId);
BSLS_ASSERT_SAFE(storageSp->queueKey() == queueKey);

return updateQueueRaw(storageSp.get(),
fs,
appKeys,
appKeysLock,
clusterDescription,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
true, // isFanout
isCSLMode);
return updateQueuePrimaryRaw(storageSp.get(),
fs,
appKeys,
appKeysLock,
clusterDescription,
partitionId,
addedIdKeyPairs,
removedIdKeyPairs,
true, // isFanout
isCSLMode);
}

void StorageUtil::registerQueueReplicaDispatched(
Expand Down
70 changes: 35 additions & 35 deletions src/groups/mqb/mqbc/mqbc_storageutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,34 +212,34 @@ struct StorageUtil {
const AppIdKeyPairs& appIdKeyPairs);

/// THREAD: This method is called from the Queue's dispatcher thread.
static void
updateQueueDispatched(const mqbi::Dispatcher::ProcessorHandle& processor,
mqbs::ReplicatedStorage* storage,
bslmt::Mutex* storagesLock,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isFanout,
bool isCSLMode);
static void updateQueuePrimaryDispatched(
const mqbi::Dispatcher::ProcessorHandle& processor,
mqbs::ReplicatedStorage* storage,
bslmt::Mutex* storagesLock,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isFanout,
bool isCSLMode);

/// StorageManager's storages lock must be locked before calling this
/// method.
///
/// THREAD: Executed by the Queue's dispatcher thread.
static int updateQueueRaw(mqbs::ReplicatedStorage* storage,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isFanout,
bool isCSLMode);
static int updateQueuePrimaryRaw(mqbs::ReplicatedStorage* storage,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isFanout,
bool isCSLMode);

static int
addVirtualStoragesInternal(mqbs::ReplicatedStorage* storage,
Expand Down Expand Up @@ -663,18 +663,18 @@ struct StorageUtil {
/// queue is configured in fanout mode.
///
/// THREAD: Executed by the Queue's dispatcher thread.
static int updateQueue(StorageSpMap* storageMap,
bslmt::Mutex* storagesLock,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isCSLMode);
static int updateQueuePrimary(StorageSpMap* storageMap,
bslmt::Mutex* storagesLock,
mqbs::FileStore* fs,
AppKeys* appKeys,
bslmt::Mutex* appKeysLock,
const bsl::string& clusterDescription,
const bmqt::Uri& uri,
const mqbu::StorageKey& queueKey,
int partitionId,
const AppIdKeyPairs& addedIdKeyPairs,
const AppIdKeyPairs& removedIdKeyPairs,
bool isCSLMode);

static void
registerQueueReplicaDispatched(int* status,
Expand Down
Loading

0 comments on commit 32046aa

Please sign in to comment.