Skip to content

Commit

Permalink
Lookup by appId (bloomberg#309)
Browse files Browse the repository at this point in the history
* Lookup by appId

Signed-off-by: dorjesinpo <[email protected]>

* VirtualStorageCatalog::put returns error

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored and alexander-e1off committed Oct 24, 2024
1 parent 7c6050a commit 5e2961b
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 49 deletions.
84 changes: 42 additions & 42 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,28 @@ VirtualStorageCatalog::put(const bmqt::MessageGUID& msgGUID,
const mqbu::StorageKey& appKey)
{
if (!appKey.isNull()) {
VirtualStoragesIter it = d_virtualStorages.find(appKey);
VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());

return it->second->put(msgGUID,
msgSize,
rdaInfo,
subScriptionId); // RETURN
return it->value()->put(msgGUID,
msgSize,
rdaInfo,
subScriptionId); // RETURN
}

// Add guid to all virtual storages.

mqbi::StorageResult::Enum lastRc = mqbi::StorageResult::e_SUCCESS;
for (VirtualStoragesIter it = d_virtualStorages.begin();
it != d_virtualStorages.end();
++it) {
it->second->put(msgGUID, msgSize, rdaInfo, subScriptionId);
mqbi::StorageResult::Enum rc =
it->value()->put(msgGUID, msgSize, rdaInfo, subScriptionId);
if (rc != mqbi::StorageResult::e_SUCCESS) {
lastRc = rc;
}
}

return mqbi::StorageResult::e_SUCCESS; // RETURN
return lastRc; // RETURN
}

bslma::ManagedPtr<mqbi::StorageIterator>
Expand All @@ -87,9 +91,9 @@ VirtualStorageCatalog::getIterator(const mqbu::StorageKey& appKey)
// PRECONDITIONS
BSLS_ASSERT_SAFE(!appKey.isNull());

VirtualStoragesIter it = d_virtualStorages.find(appKey);
VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());
return it->second->getIterator(appKey);
return it->value()->getIterator(appKey);
}

mqbi::StorageResult::Enum VirtualStorageCatalog::getIterator(
Expand All @@ -100,26 +104,26 @@ mqbi::StorageResult::Enum VirtualStorageCatalog::getIterator(
// PRECONDITIONS
BSLS_ASSERT_SAFE(!appKey.isNull());

VirtualStoragesIter it = d_virtualStorages.find(appKey);
VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());
return it->second->getIterator(out, appKey, msgGUID);
return it->value()->getIterator(out, appKey, msgGUID);
}

mqbi::StorageResult::Enum
VirtualStorageCatalog::remove(const bmqt::MessageGUID& msgGUID,
const mqbu::StorageKey& appKey)
{
if (!appKey.isNull()) {
VirtualStoragesIter it = d_virtualStorages.find(appKey);
VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());
return it->second->remove(msgGUID); // RETURN
return it->value()->remove(msgGUID); // RETURN
}

// Remove guid from all virtual storages.
for (VirtualStoragesIter it = d_virtualStorages.begin();
it != d_virtualStorages.end();
++it) {
it->second->remove(msgGUID); // ignore rc
it->value()->remove(msgGUID); // ignore rc
}

return mqbi::StorageResult::e_SUCCESS;
Expand All @@ -129,16 +133,16 @@ mqbi::StorageResult::Enum
VirtualStorageCatalog::removeAll(const mqbu::StorageKey& appKey)
{
if (!appKey.isNull()) {
VirtualStoragesIter it = d_virtualStorages.find(appKey);
VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());
return it->second->removeAll(appKey); // RETURN
return it->value()->removeAll(appKey); // RETURN
}

// Clear all virtual storages.
for (VirtualStoragesIter it = d_virtualStorages.begin();
it != d_virtualStorages.end();
++it) {
it->second->removeAll(it->first); // ignore rc
it->value()->removeAll(it->key2()); // ignore rc
}

return mqbi::StorageResult::e_SUCCESS;
Expand All @@ -152,9 +156,9 @@ int VirtualStorageCatalog::addVirtualStorage(bsl::ostream& errorDescription,
BSLS_ASSERT_SAFE(!appId.empty());
BSLS_ASSERT_SAFE(!appKey.isNull());

VirtualStoragesConstIter cit = d_virtualStorages.find(appKey);
VirtualStoragesConstIter cit = d_virtualStorages.findByKey2(appKey);
if (cit != d_virtualStorages.end()) {
const VirtualStorage* vs = cit->second.get();
const VirtualStorage* vs = cit->value().get();
BSLS_ASSERT_SAFE(!vs->appKey().isNull());

errorDescription << "Virtual storage exists with same appKey. "
Expand All @@ -170,7 +174,7 @@ int VirtualStorageCatalog::addVirtualStorage(bsl::ostream& errorDescription,
appId,
appKey,
d_allocator_p);
d_virtualStorages.insert(bsl::make_pair(appKey, vsp));
d_virtualStorages.insert(appId, appKey, vsp);

return 0;
}
Expand All @@ -184,7 +188,7 @@ bool VirtualStorageCatalog::removeVirtualStorage(
return true; // RETURN
}

VirtualStoragesConstIter it = d_virtualStorages.find(appKey);
VirtualStoragesConstIter it = d_virtualStorages.findByKey2(appKey);
if (it != d_virtualStorages.end()) {
d_virtualStorages.erase(it);
return true; // RETURN
Expand All @@ -199,9 +203,9 @@ VirtualStorageCatalog::virtualStorage(const mqbu::StorageKey& appKey)
// PRECONDITIONS
BSLS_ASSERT_SAFE(!appKey.isNull());

VirtualStoragesIter it = d_virtualStorages.find(appKey);
VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());
return it->second.get();
return it->value().get();
}

void VirtualStorageCatalog::autoConfirm(const bmqt::MessageGUID& msgGUID,
Expand All @@ -210,10 +214,10 @@ void VirtualStorageCatalog::autoConfirm(const bmqt::MessageGUID& msgGUID,
// PRECONDITIONS
BSLS_ASSERT_SAFE(!appKey.isNull());

VirtualStoragesIter it = d_virtualStorages.find(appKey);
VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(it != d_virtualStorages.end());

it->second->autoConfirm(msgGUID);
it->value()->autoConfirm(msgGUID);
}

// ACCESSORS
Expand All @@ -223,12 +227,12 @@ bool VirtualStorageCatalog::hasVirtualStorage(const mqbu::StorageKey& appKey,
// PRECONDITIONS
BSLS_ASSERT_SAFE(!appKey.isNull());

VirtualStoragesConstIter cit = d_virtualStorages.find(appKey);
VirtualStoragesConstIter cit = d_virtualStorages.findByKey2(appKey);
const bool hasVs = (cit != d_virtualStorages.end());

if (appId) {
if (hasVs) {
*appId = cit->second->appId();
*appId = cit->value()->appId();
return true; // RETURN
}

Expand All @@ -244,30 +248,27 @@ bool VirtualStorageCatalog::hasVirtualStorage(const bsl::string& appId,
// PRECONDITIONS
BSLS_ASSERT_SAFE(!appId.empty());

for (VirtualStoragesConstIter it = d_virtualStorages.begin();
it != d_virtualStorages.end();
++it) {
if (it->second->appId() == appId) {
if (appKey) {
*appKey = it->first;
}
VirtualStoragesConstIter cit = d_virtualStorages.findByKey1(appId);
const bool hasVs = (cit != d_virtualStorages.end());

if (appKey) {
if (hasVs) {
*appKey = cit->key2();
return true; // RETURN
}
}

if (appKey) {
*appKey = mqbu::StorageKey::k_NULL_KEY;
}

return false;
return hasVs;
}

bool VirtualStorageCatalog::hasMessage(const bmqt::MessageGUID& msgGUID) const
{
for (VirtualStoragesConstIter it = d_virtualStorages.begin();
it != d_virtualStorages.end();
++it) {
if (it->second->hasMessage(msgGUID)) {
if (it->value()->hasMessage(msgGUID)) {
return true; // RETURN
}
}
Expand All @@ -284,9 +285,8 @@ void VirtualStorageCatalog::loadVirtualStorageDetails(
for (VirtualStoragesConstIter cit = d_virtualStorages.begin();
cit != d_virtualStorages.end();
++cit) {
BSLS_ASSERT_SAFE(cit->first == cit->second->appKey());
buffer->push_back(
bsl::make_pair(cit->second->appId(), cit->second->appKey()));
BSLS_ASSERT_SAFE(cit->key2() == cit->value()->appKey());
buffer->push_back(bsl::make_pair(cit->key1(), cit->key2()));
}
}

Expand Down
18 changes: 11 additions & 7 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include <mqbs_virtualstorage.h>
#include <mqbu_storagekey.h>

// MWC
#include <mwcc_twokeyhashmap.h>

// BMQ
#include <bmqt_messageguid.h>

Expand All @@ -57,9 +60,10 @@ class VirtualStorageCatalog {
// PRIVATE TYPES
typedef bsl::shared_ptr<VirtualStorage> VirtualStorageSp;

/// appKey -> virtualStorage
typedef bsl::unordered_map<mqbu::StorageKey, VirtualStorageSp>
VirtualStorages;
/// Any(appId, appKey) -> virtualStorage
typedef mwcc::
TwoKeyHashMap<bsl::string, mqbu::StorageKey, VirtualStorageSp>
VirtualStorages;

typedef VirtualStorages::iterator VirtualStoragesIter;

Expand Down Expand Up @@ -234,17 +238,17 @@ inline int VirtualStorageCatalog::numVirtualStorages() const
inline bsls::Types::Int64
VirtualStorageCatalog::numMessages(const mqbu::StorageKey& appKey) const
{
VirtualStoragesConstIter cit = d_virtualStorages.find(appKey);
VirtualStoragesConstIter cit = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(cit != d_virtualStorages.end());
return cit->second->numMessages(appKey);
return cit->value()->numMessages(appKey);
}

inline bsls::Types::Int64
VirtualStorageCatalog::numBytes(const mqbu::StorageKey& appKey) const
{
VirtualStoragesConstIter cit = d_virtualStorages.find(appKey);
VirtualStoragesConstIter cit = d_virtualStorages.findByKey2(appKey);
BSLS_ASSERT_SAFE(cit != d_virtualStorages.end());
return cit->second->numBytes(appKey);
return cit->value()->numBytes(appKey);
}

} // close package namespace
Expand Down

0 comments on commit 5e2961b

Please sign in to comment.