From c658247086ec4e2a9741854f9b293ada11230f89 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Sun, 15 Dec 2024 10:06:37 -0500 Subject: [PATCH] fix: proxy detects non-consecutive duplicates Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/mqb/mqbblp/mqbblp_pushstream.h | 55 ++++++--- .../mqb/mqbblp/mqbblp_relayqueueengine.cpp | 113 ++++++++++++------ .../mqb/mqbblp/mqbblp_relayqueueengine.h | 19 +-- src/groups/mqb/mqbi/mqbi_storage.h | 54 ++++++++- .../mqb/mqbs/mqbs_filebackedstorage.cpp | 18 +-- src/groups/mqb/mqbs/mqbs_filebackedstorage.h | 7 +- src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp | 25 ++-- src/groups/mqb/mqbs/mqbs_inmemorystorage.h | 7 +- src/groups/mqb/mqbs/mqbs_virtualstorage.cpp | 12 +- src/groups/mqb/mqbs/mqbs_virtualstorage.h | 57 +-------- .../mqb/mqbs/mqbs_virtualstoragecatalog.cpp | 21 ++-- .../mqb/mqbs/mqbs_virtualstoragecatalog.h | 12 +- 12 files changed, 240 insertions(+), 160 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.h b/src/groups/mqb/mqbblp/mqbblp_pushstream.h index 4a39a99f0..60c966913 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.h +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.h @@ -120,10 +120,16 @@ struct PushStream { struct App { Elements d_elements; bsl::shared_ptr d_app; + /// Replica deduplicates PUSH for the same App in the same batch. + bmqt::MessageGUID d_lastGUID; App(const bsl::shared_ptr& app); void add(Element* element); void remove(Element* element); + + /// Return 'true' + bool setLastPush(const bmqt::MessageGUID& lastGUID); + const Element* last() const; }; @@ -144,9 +150,10 @@ struct PushStream { const Apps::iterator d_iteratorApp; public: - Element(const bmqp::SubQueueInfo& subscription, - const iterator& iterator, - const Apps::iterator& iteratorApp); + Element(const bmqp::RdaInfo& rda, + unsigned int subscriptionId, + const iterator& iterator, + const Apps::iterator& iteratorApp); /// Return a modifiable reference to the App state associated with this /// Element. @@ -215,11 +222,13 @@ struct PushStream { /// Remove all Elements, Apps, and GUIDs. unsigned int removeAll(); - /// Create new Element associated with the specified `info`, - // `upstreamSubQueueId`, and `iterator`. - Element* create(const bmqp::SubQueueInfo& info, - const iterator& iterator, - const Apps::iterator& iteratorApp); + /// Create new Element associated with the specified `rda`, + /// 'subscriptionId`, `iterator` pointing to the corresponding GUID, and + /// `iteratorApp` pointing to the corresponding App. + Element* create(const bmqp::RdaInfo& rda, + unsigned int subscriptionId, + const iterator& iterator, + const Apps::iterator& iteratorApp); }; // ======================== @@ -430,14 +439,15 @@ inline PushStream::ElementBase::ElementBase() // NOTHING } -inline PushStream::Element::Element(const bmqp::SubQueueInfo& subscription, - const iterator& iterator, - const Apps::iterator& iteratorApp) -: d_app(subscription.rdaInfo()) +inline PushStream::Element::Element(const bmqp::RdaInfo& rda, + unsigned int subscriptionId, + const iterator& iterator, + const Apps::iterator& iteratorApp) +: d_app(rda) , d_iteratorGuid(iterator) , d_iteratorApp(iteratorApp) { - d_app.d_subscriptionId = subscription.id(); + d_app.d_subscriptionId = subscriptionId; } inline void PushStream::Element::eraseGuid(PushStream::Stream& stream) @@ -610,6 +620,16 @@ inline void PushStream::App::remove(Element* element) d_elements.remove(element, e_APP); } +inline bool PushStream::App::setLastPush(const bmqt::MessageGUID& lastGUID) +{ + if (d_lastGUID == lastGUID) { + return false; + } + d_lastGUID = lastGUID; + + return true; +} + inline const PushStream::Element* PushStream::App::last() const { return d_elements.back(); @@ -620,14 +640,15 @@ inline const PushStream::Element* PushStream::App::last() const // ----------------- inline PushStream::Element* -PushStream::create(const bmqp::SubQueueInfo& subscription, - const iterator& it, - const Apps::iterator& iteratorApp) +PushStream::create(const bmqp::RdaInfo& rda, + unsigned int subscriptionId, + const iterator& it, + const Apps::iterator& iteratorApp) { BSLS_ASSERT_SAFE(it != d_stream.end()); Element* element = new (d_pushElementsPool_sp->allocate()) - Element(subscription, it, iteratorApp); + Element(rda, subscriptionId, it, iteratorApp); return element; } diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 5171c0b95..b1e1581a6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -590,6 +590,7 @@ void RelayQueueEngine::deliverMessages() App_State* app = element->app().d_app.get(); BSLS_ASSERT_SAFE(app); + if (!app->isAuthorized()) { // This App got the PUSH (recorded in the PushStream) BMQ_LOGTHROTTLE_ERROR() @@ -602,13 +603,18 @@ void RelayQueueEngine::deliverMessages() d_storageIter_mp->removeCurrentElement(); } - else if (d_appsDeliveryContext.processApp(*app, i)) { - // The current element has made it either to delivery or - // putAside and it can be removed - d_storageIter_mp->removeCurrentElement(); + else if (element->app().setLastPush(d_storageIter_mp->guid())) { + if (d_appsDeliveryContext.processApp(*app, i)) { + // The current element has made it either to delivery or + // putAside and it can be removed + d_storageIter_mp->removeCurrentElement(); + } + else { + // The current element has made it to resumePoint and it + // cannot be removed. + element->app().setLastPush(bmqt::MessageGUID()); + } } - // Else, the current element has made it to resumePoint and it - // cannot be removed. } d_appsDeliveryContext.deliverMessage(); } @@ -1725,22 +1731,28 @@ bool RelayQueueEngine::subscriptionId2upstreamSubQueueId( } unsigned int -RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, - const bmqt::MessageGUID& msgGUID, - const bsl::shared_ptr& appData, - const bmqp::Protocol::SubQueueInfosArray& subscriptions, - bool isOutOfOrder) +RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, + const bmqt::MessageGUID& msgGUID, + const bsl::shared_ptr& appData, + bmqp::Protocol::SubQueueInfosArray& subscriptions, + bool isOutOfOrder) { if (isOutOfOrder) { BSLS_ASSERT_SAFE(subscriptions.size() == 1); // No guarantee of uniqueness. Cannot use PushStream. - unsigned int upstreamSubQueueId; + unsigned int subQueueId; + + unsigned int subscriptionId = subscriptions.begin()->id(); + unsigned int ordinalPlusOne = 0; // Invalid value + + // Reusing 'subscriptions' to 'setPushState()' below. + subscriptions.begin()->setId(ordinalPlusOne); if (subscriptionId2upstreamSubQueueId(msgGUID, - &upstreamSubQueueId, - subscriptions.begin()->id())) { - App_State* app = findApp(upstreamSubQueueId); + &subQueueId, + subscriptionId)) { + App_State* app = findApp(subQueueId); if (app == 0) { BMQ_LOGTHROTTLE_ERROR() @@ -1748,7 +1760,7 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, << "Remote queue: " << d_queueState_p->uri() << " (id: " << d_queueState_p->id() << ") discarding a PUSH message for guid " << msgGUID - << ", with unknown App Id " << upstreamSubQueueId; + << ", with unknown App Id " << subQueueId; return 0; // RETURN } @@ -1761,10 +1773,14 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, attributes->setRefCount(1); - storePush(attributes, msgGUID, appData, true); + // Reusing 'subscriptions' to 'setPushState()' below. + ordinalPlusOne = 1 + app->ordinal(); + subscriptions.begin()->setId(ordinalPlusOne); + + storePush(attributes, msgGUID, appData, subscriptions, true); // Attempt to deliver - processAppRedelivery(upstreamSubQueueId, app); + processAppRedelivery(subQueueId, app); return 1; // RETURN } @@ -1777,17 +1793,21 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, PushStream::iterator itGuid = d_pushStream.findOrAppendMessage(msgGUID); unsigned int count = 0; - for (bmqp::Protocol::SubQueueInfosArray::const_iterator cit = + for (bmqp::Protocol::SubQueueInfosArray::iterator it = subscriptions.begin(); - cit != subscriptions.end(); - ++cit) { - const bmqp::SubQueueInfo& subscription = *cit; - + it != subscriptions.end(); + ++it) { unsigned int subQueueId; + unsigned int subscriptionId = it->id(); + unsigned int ordinalPlusOne = 0; // Invalid value + + // Reusing 'subscriptions' to 'setPushState()' below. + it->setId(ordinalPlusOne); + if (!subscriptionId2upstreamSubQueueId(msgGUID, &subQueueId, - subscription.id())) { + subscriptionId)) { continue; // CONTINUE } @@ -1802,7 +1822,7 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, << "Remote queue: " << d_queueState_p->uri() << " (id: " << d_queueState_p->id() << ") discarding a PUSH message for guid " << msgGUID - << ", with unknown App Id " << subscription.id(); + << ", with unknown App Id " << subscriptionId; continue; // CONTINUE } @@ -1827,9 +1847,12 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, } } - PushStream::Element* element = d_pushStream.create(subscription, - itGuid, - itApp); + PushStream::Element* element = + d_pushStream.create(it->rdaInfo(), subscriptionId, itGuid, itApp); + + // Reusing 'subscriptions' to 'setPushState()' below. + ordinalPlusOne = 1 + itApp->second.d_app->ordinal(); + it->setId(ordinalPlusOne); d_pushStream.add(element); ++count; @@ -1838,7 +1861,7 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes, if (count) { // Pass correct ref count attributes->setRefCount(count); - storePush(attributes, msgGUID, appData, false); + storePush(attributes, msgGUID, appData, subscriptions, false); } return count; } @@ -1862,10 +1885,7 @@ bool RelayQueueEngine::checkForDuplicate(const App_State* app, mqbi::AppMessage& appState = d_realStorageIter_mp->appMessageState( app->ordinal()); - if (!appState.isPushing()) { - appState.setPushState(); - } - else { + if (appState.isPushing()) { BMQ_LOGTHROTTLE_INFO() << "Remote queue: " << d_queueState_p->uri() << " (id: " << d_queueState_p->id() << ", App '" @@ -1878,10 +1898,13 @@ bool RelayQueueEngine::checkForDuplicate(const App_State* app, return true; } -void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes, - const bmqt::MessageGUID& msgGUID, - const bsl::shared_ptr& appData, - bool isOutOfOrder) +void RelayQueueEngine::storePush( + mqbi::StorageMessageAttributes* attributes, + const bmqt::MessageGUID& msgGUID, + const bsl::shared_ptr& appData, + const bmqp::Protocol::SubQueueInfosArray& subscriptions, + + bool isOutOfOrder) { if (d_queueState_p->domain()->cluster()->isRemote()) { // Save the message along with the subIds in the storage. Note that @@ -1889,11 +1912,14 @@ void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes, // in 'options' is subQueueInfos, and we won't store the specified // 'options' in the storage. + mqbi::DataStreamMessage* dataStreamMessage = 0; + mqbi::StorageResult::Enum result = storage()->put( attributes, msgGUID, appData, - bsl::shared_ptr()); // No options + bsl::shared_ptr(), + &dataStreamMessage); // No options if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( result != mqbi::StorageResult::e_SUCCESS)) { @@ -1908,6 +1934,17 @@ void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes, // A redelivery PUSH for one App in the presence of another App // can result in 'e_GUID_NOT_UNIQUE'. } + else { + // Reusing previously cached ordinals. + for (bmqp::Protocol::SubQueueInfosArray::const_iterator cit = + subscriptions.begin(); + cit != subscriptions.end(); + ++cit) { + if (cit->id() > 0) { + dataStreamMessage->app(cit->id() - 1).setPushState(); + } + } + } } } diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h index 710e13182..2435a9ad5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h @@ -351,10 +351,11 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { bool checkForDuplicate(const App_State* app, const bmqt::MessageGUID& msgGUID); - void storePush(mqbi::StorageMessageAttributes* attributes, - const bmqt::MessageGUID& msgGUID, - const bsl::shared_ptr& appData, - bool isOutOfOrder); + void storePush(mqbi::StorageMessageAttributes* attributes, + const bmqt::MessageGUID& msgGUID, + const bsl::shared_ptr& appData, + const bmqp::Protocol::SubQueueInfosArray& subscriptions, + bool isOutOfOrder); void beforeOneAppRemoved(unsigned int upstreamSubQueueId); @@ -541,11 +542,11 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { // (`mqbi::AppMessage`, `upstreamSubQueueId`) pairs for each recognized App /// in the specified `subscriptions`. /// Return number of inserted PushStream Elements. - unsigned int push(mqbi::StorageMessageAttributes* attributes, - const bmqt::MessageGUID& msgGUID, - const bsl::shared_ptr& appData, - const bmqp::Protocol::SubQueueInfosArray& subscriptions, - bool isOutOfOrder); + unsigned int push(mqbi::StorageMessageAttributes* attributes, + const bmqt::MessageGUID& msgGUID, + const bsl::shared_ptr& appData, + bmqp::Protocol::SubQueueInfosArray& subscriptions, + bool isOutOfOrder); // ACCESSORS /// Return the reference count that should be applied to a message diff --git a/src/groups/mqb/mqbi/mqbi_storage.h b/src/groups/mqb/mqbi/mqbi_storage.h index 95267c8af..076de2a7c 100644 --- a/src/groups/mqb/mqbi/mqbi_storage.h +++ b/src/groups/mqb/mqbi/mqbi_storage.h @@ -292,6 +292,26 @@ struct AppMessage { bool isPushing() const; }; +struct DataStreamMessage { + // VST to track the state associated with a GUID (for all Apps). + + int d_size; + // The message size + + bsl::vector d_apps; + // App states for the message + + DataStreamMessage(int size, bslma::Allocator* allocator); + + /// Return reference to the modifiable state of the App corresponding + /// to the specified 'ordinal. + mqbi::AppMessage& app(unsigned int appOrdinal); + + /// Return reference to the non-modifiable state of the App + /// corresponding to the specified 'ordinal. + const mqbi::AppMessage& app(unsigned int appOrdinal) const; +}; + // ===================== // class StorageIterator // ===================== @@ -437,13 +457,16 @@ class Storage { /// Save the message contained in the specified `appData`, `options` and /// the associated `attributes` and `msgGUID` into this storage and the /// associated virtual storage. The `attributes` is an in/out parameter - /// and storage layer can populate certain fields of that struct. + /// and storage layer can populate certain fields of that struct. If the + /// optionally specified `out` is not zero, load the created + /// `DataStreamMessage` into the 'out'. /// Return 0 on success or an non-zero error code on failure. virtual StorageResult::Enum put(StorageMessageAttributes* attributes, const bmqt::MessageGUID& msgGUID, const bsl::shared_ptr& appData, - const bsl::shared_ptr& options) = 0; + const bsl::shared_ptr& options, + mqbi::DataStreamMessage** out = 0) = 0; /// Update the App state corresponding to the specified `msgGUID` and the /// specified `appKey` in the DataStream. Decrement the reference count of @@ -698,6 +721,33 @@ inline bool AppMessage::isPushing() const return d_state == e_PUSH; } +// ----------------------- +// class DataStreamMessage +// ----------------------- + +inline DataStreamMessage::DataStreamMessage(int size, + bslma::Allocator* allocator) +: d_size(size) +, d_apps(allocator) +{ + // NOTHING +} + +inline mqbi::AppMessage& DataStreamMessage::app(unsigned int appOrdinal) +{ + BSLS_ASSERT_SAFE(appOrdinal < d_apps.size()); + + return d_apps[appOrdinal]; +} + +inline const mqbi::AppMessage& +DataStreamMessage::app(unsigned int appOrdinal) const +{ + BSLS_ASSERT_SAFE(appOrdinal < d_apps.size()); + + return d_apps[appOrdinal]; +} + // ------------------------------ // class StorageMessageAttributes // ------------------------------ diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index abe445cfa..5ea7fd38c 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -289,7 +289,8 @@ mqbi::StorageResult::Enum FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes, const bmqt::MessageGUID& msgGUID, const bsl::shared_ptr& appData, - const bsl::shared_ptr& options) + const bsl::shared_ptr& options, + mqbi::DataStreamMessage** out) { const int msgSize = appData->length(); @@ -340,19 +341,21 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes, // if we keep `irc` (like we keep 'DataStoreRecordHandle'). if (d_autoConfirms.empty()) { - d_virtualStorageCatalog.put(msgGUID, msgSize); + d_virtualStorageCatalog.put(msgGUID, msgSize, out); } else { - VirtualStorage::DataStreamMessage* dataStreamMessage = 0; - d_virtualStorageCatalog.put(msgGUID, msgSize, &dataStreamMessage); + mqbi::DataStreamMessage* dataStreamMessage = 0; + if (out == 0) { + out = &dataStreamMessage; + } + d_virtualStorageCatalog.put(msgGUID, msgSize, out); // Move auto confirms to the data record for (AutoConfirms::const_iterator it = d_autoConfirms.begin(); it != d_autoConfirms.end(); ++it) { irc.first->second.d_array.push_back(it->d_confirmRecordHandle); - d_virtualStorageCatalog.autoConfirm(dataStreamMessage, - it->d_appKey); + d_virtualStorageCatalog.autoConfirm(*out, it->d_appKey); } d_autoConfirms.clear(); } @@ -776,7 +779,7 @@ void FileBackedStorage::processMessageRecord( else { if (!d_currentlyAutoConfirming.isUnset()) { if (d_currentlyAutoConfirming == guid) { - VirtualStorage::DataStreamMessage* dataStreamMessage = 0; + mqbi::DataStreamMessage* dataStreamMessage = 0; d_virtualStorageCatalog.put(guid, msgLen, &dataStreamMessage); @@ -795,7 +798,6 @@ void FileBackedStorage::processMessageRecord( else { clearSelection(); } - d_currentlyAutoConfirming = bmqt::MessageGUID(); } d_autoConfirms.clear(); } diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index 382bf7170..cb67cc36d 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -381,13 +381,16 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// Save the message contained in the specified 'appData', 'options' and /// the associated 'attributes' and 'msgGUID' into this storage and the /// associated virtual storage. The 'attributes' is an in/out parameter - /// and storage layer can populate certain fields of that struct. + /// and storage layer can populate certain fields of that struct. If the + /// optionally specified `out` is not zero, load the created + /// `DataStreamMessage` into the 'out'. /// Return 0 on success or an non-zero error code on failure. mqbi::StorageResult::Enum put(mqbi::StorageMessageAttributes* attributes, const bmqt::MessageGUID& msgGUID, const bsl::shared_ptr& appData, - const bsl::shared_ptr& options) BSLS_KEYWORD_OVERRIDE; + const bsl::shared_ptr& options, + mqbi::DataStreamMessage** out = 0) BSLS_KEYWORD_OVERRIDE; /// Get an iterator for data stored in the virtual storage identified by /// the specified 'appKey'. diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index 191d0fb2b..82a06a3e3 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -166,7 +166,8 @@ mqbi::StorageResult::Enum InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes, const bmqt::MessageGUID& msgGUID, const bsl::shared_ptr& appData, - const bsl::shared_ptr& options) + const bsl::shared_ptr& options, + mqbi::DataStreamMessage** out) { const int msgSize = appData->length(); @@ -201,18 +202,20 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes, attributes->arrivalTimepoint()); if (d_autoConfirms.empty()) { - d_virtualStorageCatalog.put(msgGUID, msgSize); + d_virtualStorageCatalog.put(msgGUID, msgSize, out); } else { - VirtualStorage::DataStreamMessage* dataStreamMessage = 0; - d_virtualStorageCatalog.put(msgGUID, msgSize, &dataStreamMessage); + mqbi::DataStreamMessage* dataStreamMessage = 0; + if (out == 0) { + out = &dataStreamMessage; + } + d_virtualStorageCatalog.put(msgGUID, msgSize, out); // Move auto confirms to the data record for (AutoConfirms::const_iterator it = d_autoConfirms.begin(); it != d_autoConfirms.end(); ++it) { - d_virtualStorageCatalog.autoConfirm(dataStreamMessage, - it->d_appKey); + d_virtualStorageCatalog.autoConfirm(*out, it->d_appKey); } d_autoConfirms.clear(); } @@ -245,12 +248,20 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes, mqbi::StorageMessageAttributes& existing = it->second.attributes(); existing.setRefCount(existing.refCount() + attributes->refCount()); // Bump up + + if (out) { + // Proxy can detect duplicates by inspecting 'DataStreamMessage'. + VirtualStorage::DataStreamIterator data = + d_virtualStorageCatalog.get(msgGUID); + BSLS_ASSERT_SAFE(data != d_virtualStorageCatalog.end()); + *out = &data->second; + } } else { d_items.insert(bsl::make_pair(msgGUID, Item(appData, options, *attributes)), attributes->arrivalTimepoint()); - d_virtualStorageCatalog.put(msgGUID, msgSize); + d_virtualStorageCatalog.put(msgGUID, msgSize, out); } // We don't verify uniqueness of the insertion because in the case of a diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index 5ccbac8be..9aa83c5ed 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -302,13 +302,16 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// Save the message contained in the specified 'appData', 'options' and /// the associated 'attributes' and 'msgGUID' into this storage and the /// associated virtual storage. The 'attributes' is an in/out parameter - /// and storage layer can populate certain fields of that struct. + /// and storage layer can populate certain fields of that struct. If the + /// optionally specified `out` is not zero, load the created + /// `DataStreamMessage` into the 'out'. /// Return 0 on success or an non-zero error code on failure. mqbi::StorageResult::Enum put(mqbi::StorageMessageAttributes* attributes, const bmqt::MessageGUID& msgGUID, const bsl::shared_ptr& appData, - const bsl::shared_ptr& options) BSLS_KEYWORD_OVERRIDE; + const bsl::shared_ptr& options, + mqbi::DataStreamMessage** out = 0) BSLS_KEYWORD_OVERRIDE; /// Update the App state corresponding to the specified 'msgGUID' and the /// specified 'appKey' in the DataStream. Decrement the reference count of diff --git a/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp b/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp index dc9b9594c..defd3899d 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp @@ -58,7 +58,7 @@ VirtualStorage::~VirtualStorage() // MANIPULATORS mqbi::StorageResult::Enum -VirtualStorage::confirm(DataStreamMessage* dataStreamMessage) +VirtualStorage::confirm(mqbi::DataStreamMessage* dataStreamMessage) { mqbi::AppMessage& appMessage = dataStreamMessage->app(ordinal()); @@ -77,7 +77,7 @@ VirtualStorage::confirm(DataStreamMessage* dataStreamMessage) } mqbi::StorageResult::Enum -VirtualStorage::remove(DataStreamMessage* dataStreamMessage) +VirtualStorage::remove(mqbi::DataStreamMessage* dataStreamMessage) { mqbi::AppMessage& appMessage = dataStreamMessage->app(ordinal()); @@ -95,7 +95,7 @@ VirtualStorage::remove(DataStreamMessage* dataStreamMessage) } } -void VirtualStorage::onGC(const DataStreamMessage& dataStreamMessage) +void VirtualStorage::onGC(const mqbi::DataStreamMessage& dataStreamMessage) { if (!dataStreamMessage.d_apps.empty()) { const mqbi::AppMessage& appMessage = dataStreamMessage.app(ordinal()); @@ -191,7 +191,6 @@ void StorageIterator::reset(const bmqt::MessageGUID& where) { clear(); - // Reset iterator to beginning d_iterator = d_owner_p->begin(where); } @@ -210,8 +209,7 @@ StorageIterator::appMessageView(unsigned int appOrdinal) const // PRECONDITIONS BSLS_ASSERT_SAFE(!atEnd()); - const VirtualStorage::DataStreamMessage& dataStreamMessage = - d_iterator->second; + const mqbi::DataStreamMessage& dataStreamMessage = d_iterator->second; if (dataStreamMessage.d_apps.size() > appOrdinal) { return d_iterator->second.app(appOrdinal); @@ -224,7 +222,7 @@ mqbi::AppMessage& StorageIterator::appMessageState(unsigned int appOrdinal) // PRECONDITIONS BSLS_ASSERT_SAFE(!atEnd()); - VirtualStorage::DataStreamMessage* dataStreamMessage = &d_iterator->second; + mqbi::DataStreamMessage* dataStreamMessage = &d_iterator->second; d_owner_p->setup(dataStreamMessage); diff --git a/src/groups/mqb/mqbs/mqbs_virtualstorage.h b/src/groups/mqb/mqbs/mqbs_virtualstorage.h index c8eb10cf6..37d1dccfa 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstorage.h +++ b/src/groups/mqb/mqbs/mqbs_virtualstorage.h @@ -71,29 +71,11 @@ class VirtualStorage { // This Mechanism represents one App in a Storage (FileBased or InMemory) public: - struct DataStreamMessage { - int d_size; - // The message size - - bsl::vector d_apps; - // App states for the message - - DataStreamMessage(int size, bslma::Allocator* allocator); - - /// Return reference to the modifiable state of the App corresponding - /// to the specified 'ordinal. - mqbi::AppMessage& app(unsigned int appOrdinal); - - /// Return reference to the non-modifiable state of the App - /// corresponding to the specified 'ordinal. - const mqbi::AppMessage& app(unsigned int appOrdinal) const; - }; - /// msgGUID -> MessageContext /// Must be a container in which iteration order is same as insertion /// order. typedef bmqc::OrderedHashMap > DataStream; @@ -178,15 +160,17 @@ class VirtualStorage { /// Change the state of this App in the specified 'dataStreamMessage' to /// indicate CONFIRM. - mqbi::StorageResult::Enum confirm(DataStreamMessage* dataStreamMessage); + mqbi::StorageResult::Enum + confirm(mqbi::DataStreamMessage* dataStreamMessage); /// Change the state of this App in the specified 'dataStreamMessage' to /// indicate removal (by a purge or unregistration). - mqbi::StorageResult::Enum remove(DataStreamMessage* dataStreamMessage); + mqbi::StorageResult::Enum + remove(mqbi::DataStreamMessage* dataStreamMessage); /// Observe removal of this App from the specified 'dataStreamMessage' by /// GC and update bytes and messages counts if needed. - void onGC(const DataStreamMessage& dataStreamMessage); + void onGC(const mqbi::DataStreamMessage& dataStreamMessage); /// Reset bytes and messages counts as in the case of purging all Apps. void resetStats(); @@ -360,35 +344,6 @@ class VirtualStorageIterator : public StorageIterator { // INLINE DEFINITIONS // ============================================================================ -// --------------------------------------- -// class VirtualStorage::DataStreamMessage -// --------------------------------------- - -inline VirtualStorage::DataStreamMessage::DataStreamMessage( - int size, - bslma::Allocator* allocator) -: d_size(size) -, d_apps(allocator) -{ - // NOTHING -} - -inline mqbi::AppMessage& -VirtualStorage::DataStreamMessage::app(unsigned int appOrdinal) -{ - BSLS_ASSERT_SAFE(appOrdinal < d_apps.size()); - - return d_apps[appOrdinal]; -} - -inline const mqbi::AppMessage& -VirtualStorage::DataStreamMessage::app(unsigned int appOrdinal) const -{ - BSLS_ASSERT_SAFE(appOrdinal < d_apps.size()); - - return d_apps[appOrdinal]; -} - // -------------------- // class VirtualStorage // -------------------- diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp index fabab605c..07c77d43c 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp @@ -145,7 +145,7 @@ VirtualStorageCatalog::get(const bmqt::MessageGUID& msgGUID) return it; } -void VirtualStorageCatalog::setup(VirtualStorage::DataStreamMessage* data) +void VirtualStorageCatalog::setup(mqbi::DataStreamMessage* data) { // The only case for subsequent resize is proxy receiving subsequent PUSH // messages for the same GUID and different apps @@ -155,14 +155,14 @@ void VirtualStorageCatalog::setup(VirtualStorage::DataStreamMessage* data) } mqbi::StorageResult::Enum -VirtualStorageCatalog::put(const bmqt::MessageGUID& msgGUID, - int msgSize, - VirtualStorage::DataStreamMessage** out) +VirtualStorageCatalog::put(const bmqt::MessageGUID& msgGUID, + int msgSize, + mqbi::DataStreamMessage** out) { bsl::pair insertResult = - d_dataStream.insert(bsl::make_pair( - msgGUID, - VirtualStorage::DataStreamMessage(msgSize, d_allocator_p))); + d_dataStream.insert( + bsl::make_pair(msgGUID, + mqbi::DataStreamMessage(msgSize, d_allocator_p))); if (!insertResult.second) { // Duplicate GUID @@ -270,7 +270,6 @@ VirtualStorageCatalog::confirm(const bmqt::MessageGUID& msgGUID, VirtualStoragesIter it = d_virtualStorages.findByKey2(appKey); BSLS_ASSERT_SAFE(it != d_virtualStorages.end()); - setup(&data->second); const mqbi::StorageResult::Enum rc = it->value()->confirm(&data->second); if (queue() && mqbi::StorageResult::e_SUCCESS == rc) { queue()->stats()->onEvent( @@ -328,7 +327,7 @@ VirtualStorageCatalog::removeAll(const mqbu::StorageKey& appKey) itData != d_dataStream.end();) { mqbi::StorageResult::Enum result = mqbi::StorageResult::e_SUCCESS; - VirtualStorage::DataStreamMessage* data = &itData->second; + mqbi::DataStreamMessage* data = &itData->second; setup(data); if (itVs->value()->remove(data) == @@ -503,8 +502,8 @@ VirtualStorageCatalog::virtualStorage(const mqbu::StorageKey& appKey) } void VirtualStorageCatalog::autoConfirm( - VirtualStorage::DataStreamMessage* dataStreamMessage, - const mqbu::StorageKey& appKey) + mqbi::DataStreamMessage* dataStreamMessage, + const mqbu::StorageKey& appKey) { // PRECONDITIONS BSLS_ASSERT_SAFE(dataStreamMessage); diff --git a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h index 1b37dfc6a..1b7824328 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h +++ b/src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h @@ -171,14 +171,14 @@ class VirtualStorageCatalog { DataStreamIterator get(const bmqt::MessageGUID& msgGUID); /// Allocate space for all Apps states in the specified 'data' if needed. - void setup(VirtualStorage::DataStreamMessage* data); + void setup(mqbi::DataStreamMessage* data); /// Save the message having the specified 'msgGUID' and 'msgSize' to the /// DataStream. If the specified 'out' is not '0', allocate space for all /// Apps states and load the created object into the 'out'. - mqbi::StorageResult::Enum put(const bmqt::MessageGUID& msgGUID, - int msgSize, - VirtualStorage::DataStreamMessage** out = 0); + mqbi::StorageResult::Enum put(const bmqt::MessageGUID& msgGUID, + int msgSize, + mqbi::DataStreamMessage** out = 0); /// Get an iterator for items stored in the DataStream identified by the /// specified 'appKey'. @@ -245,8 +245,8 @@ class VirtualStorageCatalog { /// (Auto)Confirm the specified 'msgGUID' for the specified 'appKey'. /// Behavior is undefined unless there is an App with the 'appKey'. - void autoConfirm(VirtualStorage::DataStreamMessage* dataStreamMessage, - const mqbu::StorageKey& appKey); + void autoConfirm(mqbi::DataStreamMessage* dataStreamMessage, + const mqbu::StorageKey& appKey); /// Set the default RDA according to the specified 'maxDeliveryAttempts'. void setDefaultRda(int maxDeliveryAttempts);