Skip to content

Commit

Permalink
fix: proxy detects non-consecutive duplicates
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Dec 16, 2024
1 parent 5a6670d commit c658247
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 160 deletions.
55 changes: 38 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,16 @@ struct PushStream {
struct App {
Elements d_elements;
bsl::shared_ptr<RelayQueueEngine_AppState> d_app;
/// Replica deduplicates PUSH for the same App in the same batch.
bmqt::MessageGUID d_lastGUID;

App(const bsl::shared_ptr<RelayQueueEngine_AppState>& app);
void add(Element* element);
void remove(Element* element);

/// Return 'true'
bool setLastPush(const bmqt::MessageGUID& lastGUID);

const Element* last() const;
};

Expand All @@ -144,9 +150,10 @@ struct PushStream {
const Apps::iterator d_iteratorApp;

public:
Element(const bmqp::SubQueueInfo& subscription,
const iterator& iterator,
const Apps::iterator& iteratorApp);
Element(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& iterator,
const Apps::iterator& iteratorApp);

/// Return a modifiable reference to the App state associated with this
/// Element.
Expand Down Expand Up @@ -215,11 +222,13 @@ struct PushStream {
/// Remove all Elements, Apps, and GUIDs.
unsigned int removeAll();

/// Create new Element associated with the specified `info`,
// `upstreamSubQueueId`, and `iterator`.
Element* create(const bmqp::SubQueueInfo& info,
const iterator& iterator,
const Apps::iterator& iteratorApp);
/// Create new Element associated with the specified `rda`,
/// 'subscriptionId`, `iterator` pointing to the corresponding GUID, and
/// `iteratorApp` pointing to the corresponding App.
Element* create(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& iterator,
const Apps::iterator& iteratorApp);
};

// ========================
Expand Down Expand Up @@ -430,14 +439,15 @@ inline PushStream::ElementBase::ElementBase()
// NOTHING
}

inline PushStream::Element::Element(const bmqp::SubQueueInfo& subscription,
const iterator& iterator,
const Apps::iterator& iteratorApp)
: d_app(subscription.rdaInfo())
inline PushStream::Element::Element(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& iterator,
const Apps::iterator& iteratorApp)
: d_app(rda)
, d_iteratorGuid(iterator)
, d_iteratorApp(iteratorApp)
{
d_app.d_subscriptionId = subscription.id();
d_app.d_subscriptionId = subscriptionId;
}

inline void PushStream::Element::eraseGuid(PushStream::Stream& stream)
Expand Down Expand Up @@ -610,6 +620,16 @@ inline void PushStream::App::remove(Element* element)
d_elements.remove(element, e_APP);
}

inline bool PushStream::App::setLastPush(const bmqt::MessageGUID& lastGUID)
{
if (d_lastGUID == lastGUID) {
return false;
}
d_lastGUID = lastGUID;

return true;
}

inline const PushStream::Element* PushStream::App::last() const
{
return d_elements.back();
Expand All @@ -620,14 +640,15 @@ inline const PushStream::Element* PushStream::App::last() const
// -----------------

inline PushStream::Element*
PushStream::create(const bmqp::SubQueueInfo& subscription,
const iterator& it,
const Apps::iterator& iteratorApp)
PushStream::create(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& it,
const Apps::iterator& iteratorApp)
{
BSLS_ASSERT_SAFE(it != d_stream.end());

Element* element = new (d_pushElementsPool_sp->allocate())
Element(subscription, it, iteratorApp);
Element(rda, subscriptionId, it, iteratorApp);
return element;
}

Expand Down
113 changes: 75 additions & 38 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ void RelayQueueEngine::deliverMessages()
App_State* app = element->app().d_app.get();

BSLS_ASSERT_SAFE(app);

if (!app->isAuthorized()) {
// This App got the PUSH (recorded in the PushStream)
BMQ_LOGTHROTTLE_ERROR()
Expand All @@ -602,13 +603,18 @@ void RelayQueueEngine::deliverMessages()

d_storageIter_mp->removeCurrentElement();
}
else if (d_appsDeliveryContext.processApp(*app, i)) {
// The current element has made it either to delivery or
// putAside and it can be removed
d_storageIter_mp->removeCurrentElement();
else if (element->app().setLastPush(d_storageIter_mp->guid())) {
if (d_appsDeliveryContext.processApp(*app, i)) {
// The current element has made it either to delivery or
// putAside and it can be removed
d_storageIter_mp->removeCurrentElement();
}
else {
// The current element has made it to resumePoint and it
// cannot be removed.
element->app().setLastPush(bmqt::MessageGUID());
}
}
// Else, the current element has made it to resumePoint and it
// cannot be removed.
}
d_appsDeliveryContext.deliverMessage();
}
Expand Down Expand Up @@ -1725,30 +1731,36 @@ bool RelayQueueEngine::subscriptionId2upstreamSubQueueId(
}

unsigned int
RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::Protocol::SubQueueInfosArray& subscriptions,
bool isOutOfOrder)
RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
bmqp::Protocol::SubQueueInfosArray& subscriptions,
bool isOutOfOrder)
{
if (isOutOfOrder) {
BSLS_ASSERT_SAFE(subscriptions.size() == 1);

// No guarantee of uniqueness. Cannot use PushStream.
unsigned int upstreamSubQueueId;
unsigned int subQueueId;

unsigned int subscriptionId = subscriptions.begin()->id();
unsigned int ordinalPlusOne = 0; // Invalid value

// Reusing 'subscriptions' to 'setPushState()' below.
subscriptions.begin()->setId(ordinalPlusOne);

if (subscriptionId2upstreamSubQueueId(msgGUID,
&upstreamSubQueueId,
subscriptions.begin()->id())) {
App_State* app = findApp(upstreamSubQueueId);
&subQueueId,
subscriptionId)) {
App_State* app = findApp(subQueueId);

if (app == 0) {
BMQ_LOGTHROTTLE_ERROR()
<< "#QUEUE_UNKNOWN_SUBSCRIPTION_ID "
<< "Remote queue: " << d_queueState_p->uri()
<< " (id: " << d_queueState_p->id()
<< ") discarding a PUSH message for guid " << msgGUID
<< ", with unknown App Id " << upstreamSubQueueId;
<< ", with unknown App Id " << subQueueId;

return 0; // RETURN
}
Expand All @@ -1761,10 +1773,14 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes,

attributes->setRefCount(1);

storePush(attributes, msgGUID, appData, true);
// Reusing 'subscriptions' to 'setPushState()' below.
ordinalPlusOne = 1 + app->ordinal();
subscriptions.begin()->setId(ordinalPlusOne);

storePush(attributes, msgGUID, appData, subscriptions, true);

// Attempt to deliver
processAppRedelivery(upstreamSubQueueId, app);
processAppRedelivery(subQueueId, app);

return 1; // RETURN
}
Expand All @@ -1777,17 +1793,21 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes,
PushStream::iterator itGuid = d_pushStream.findOrAppendMessage(msgGUID);
unsigned int count = 0;

for (bmqp::Protocol::SubQueueInfosArray::const_iterator cit =
for (bmqp::Protocol::SubQueueInfosArray::iterator it =
subscriptions.begin();
cit != subscriptions.end();
++cit) {
const bmqp::SubQueueInfo& subscription = *cit;

it != subscriptions.end();
++it) {
unsigned int subQueueId;

unsigned int subscriptionId = it->id();
unsigned int ordinalPlusOne = 0; // Invalid value

// Reusing 'subscriptions' to 'setPushState()' below.
it->setId(ordinalPlusOne);

if (!subscriptionId2upstreamSubQueueId(msgGUID,
&subQueueId,
subscription.id())) {
subscriptionId)) {
continue; // CONTINUE
}

Expand All @@ -1802,7 +1822,7 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes,
<< "Remote queue: " << d_queueState_p->uri()
<< " (id: " << d_queueState_p->id()
<< ") discarding a PUSH message for guid " << msgGUID
<< ", with unknown App Id " << subscription.id();
<< ", with unknown App Id " << subscriptionId;
continue; // CONTINUE
}

Expand All @@ -1827,9 +1847,12 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes,
}
}

PushStream::Element* element = d_pushStream.create(subscription,
itGuid,
itApp);
PushStream::Element* element =
d_pushStream.create(it->rdaInfo(), subscriptionId, itGuid, itApp);

// Reusing 'subscriptions' to 'setPushState()' below.
ordinalPlusOne = 1 + itApp->second.d_app->ordinal();
it->setId(ordinalPlusOne);

d_pushStream.add(element);
++count;
Expand All @@ -1838,7 +1861,7 @@ RelayQueueEngine::push(mqbi::StorageMessageAttributes* attributes,
if (count) {
// Pass correct ref count
attributes->setRefCount(count);
storePush(attributes, msgGUID, appData, false);
storePush(attributes, msgGUID, appData, subscriptions, false);
}
return count;
}
Expand All @@ -1862,10 +1885,7 @@ bool RelayQueueEngine::checkForDuplicate(const App_State* app,
mqbi::AppMessage& appState = d_realStorageIter_mp->appMessageState(
app->ordinal());

if (!appState.isPushing()) {
appState.setPushState();
}
else {
if (appState.isPushing()) {
BMQ_LOGTHROTTLE_INFO()
<< "Remote queue: " << d_queueState_p->uri()
<< " (id: " << d_queueState_p->id() << ", App '"
Expand All @@ -1878,22 +1898,28 @@ bool RelayQueueEngine::checkForDuplicate(const App_State* app,
return true;
}

void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
bool isOutOfOrder)
void RelayQueueEngine::storePush(
mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::Protocol::SubQueueInfosArray& subscriptions,

bool isOutOfOrder)
{
if (d_queueState_p->domain()->cluster()->isRemote()) {
// Save the message along with the subIds in the storage. Note that
// for now, we will assume that in fanout mode, the only option present
// in 'options' is subQueueInfos, and we won't store the specified
// 'options' in the storage.

mqbi::DataStreamMessage* dataStreamMessage = 0;

mqbi::StorageResult::Enum result = storage()->put(
attributes,
msgGUID,
appData,
bsl::shared_ptr<bdlbb::Blob>()); // No options
bsl::shared_ptr<bdlbb::Blob>(),
&dataStreamMessage); // No options

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
result != mqbi::StorageResult::e_SUCCESS)) {
Expand All @@ -1908,6 +1934,17 @@ void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes,
// A redelivery PUSH for one App in the presence of another App
// can result in 'e_GUID_NOT_UNIQUE'.
}
else {
// Reusing previously cached ordinals.
for (bmqp::Protocol::SubQueueInfosArray::const_iterator cit =
subscriptions.begin();
cit != subscriptions.end();
++cit) {
if (cit->id() > 0) {
dataStreamMessage->app(cit->id() - 1).setPushState();
}
}
}
}
}

Expand Down
19 changes: 10 additions & 9 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,11 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
bool checkForDuplicate(const App_State* app,
const bmqt::MessageGUID& msgGUID);

void storePush(mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
bool isOutOfOrder);
void storePush(mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::Protocol::SubQueueInfosArray& subscriptions,
bool isOutOfOrder);

void beforeOneAppRemoved(unsigned int upstreamSubQueueId);

Expand Down Expand Up @@ -541,11 +542,11 @@ class RelayQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
// (`mqbi::AppMessage`, `upstreamSubQueueId`) pairs for each recognized App
/// in the specified `subscriptions`.
/// Return number of inserted PushStream Elements.
unsigned int push(mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
const bmqp::Protocol::SubQueueInfosArray& subscriptions,
bool isOutOfOrder);
unsigned int push(mqbi::StorageMessageAttributes* attributes,
const bmqt::MessageGUID& msgGUID,
const bsl::shared_ptr<bdlbb::Blob>& appData,
bmqp::Protocol::SubQueueInfosArray& subscriptions,
bool isOutOfOrder);
// ACCESSORS

/// Return the reference count that should be applied to a message
Expand Down
Loading

0 comments on commit c658247

Please sign in to comment.