Skip to content

Commit

Permalink
fix: proxy detects non-consecutive duplicates
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Dec 18, 2024
1 parent 5a6670d commit a6dcd0d
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 183 deletions.
55 changes: 38 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,16 @@ struct PushStream {
struct App {
Elements d_elements;
bsl::shared_ptr<RelayQueueEngine_AppState> d_app;
/// Replica deduplicates PUSH for the same App in the same batch.
bmqt::MessageGUID d_lastGUID;

App(const bsl::shared_ptr<RelayQueueEngine_AppState>& app);
void add(Element* element);
void remove(Element* element);

/// Return 'true'
bool setLastPush(const bmqt::MessageGUID& lastGUID);

const Element* last() const;
};

Expand All @@ -144,9 +150,10 @@ struct PushStream {
const Apps::iterator d_iteratorApp;

public:
Element(const bmqp::SubQueueInfo& subscription,
const iterator& iterator,
const Apps::iterator& iteratorApp);
Element(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& iterator,
const Apps::iterator& iteratorApp);

/// Return a modifiable reference to the App state associated with this
/// Element.
Expand Down Expand Up @@ -215,11 +222,13 @@ struct PushStream {
/// Remove all Elements, Apps, and GUIDs.
unsigned int removeAll();

/// Create new Element associated with the specified `info`,
// `upstreamSubQueueId`, and `iterator`.
Element* create(const bmqp::SubQueueInfo& info,
const iterator& iterator,
const Apps::iterator& iteratorApp);
/// Create new Element associated with the specified `rda`,
/// 'subscriptionId`, `iterator` pointing to the corresponding GUID, and
/// `iteratorApp` pointing to the corresponding App.
Element* create(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& iterator,
const Apps::iterator& iteratorApp);
};

// ========================
Expand Down Expand Up @@ -430,14 +439,15 @@ inline PushStream::ElementBase::ElementBase()
// NOTHING
}

inline PushStream::Element::Element(const bmqp::SubQueueInfo& subscription,
const iterator& iterator,
const Apps::iterator& iteratorApp)
: d_app(subscription.rdaInfo())
inline PushStream::Element::Element(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& iterator,
const Apps::iterator& iteratorApp)
: d_app(rda)
, d_iteratorGuid(iterator)
, d_iteratorApp(iteratorApp)
{
d_app.d_subscriptionId = subscription.id();
d_app.d_subscriptionId = subscriptionId;
}

inline void PushStream::Element::eraseGuid(PushStream::Stream& stream)
Expand Down Expand Up @@ -610,6 +620,16 @@ inline void PushStream::App::remove(Element* element)
d_elements.remove(element, e_APP);
}

inline bool PushStream::App::setLastPush(const bmqt::MessageGUID& lastGUID)
{
if (d_lastGUID == lastGUID) {
return false;
}
d_lastGUID = lastGUID;

return true;
}

inline const PushStream::Element* PushStream::App::last() const
{
return d_elements.back();
Expand All @@ -620,14 +640,15 @@ inline const PushStream::Element* PushStream::App::last() const
// -----------------

inline PushStream::Element*
PushStream::create(const bmqp::SubQueueInfo& subscription,
const iterator& it,
const Apps::iterator& iteratorApp)
PushStream::create(const bmqp::RdaInfo& rda,
unsigned int subscriptionId,
const iterator& it,
const Apps::iterator& iteratorApp)
{
BSLS_ASSERT_SAFE(it != d_stream.end());

Element* element = new (d_pushElementsPool_sp->allocate())
Element(subscription, it, iteratorApp);
Element(rda, subscriptionId, it, iteratorApp);
return element;
}

Expand Down
17 changes: 10 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ static void test1_basic()
mqbblp::PushStream::Apps::iterator itApp =
ps.d_apps.emplace(subQueueId, app).first;

mqbblp::PushStream::Element* element = ps.create(subscription,
itGuid,
itApp);
mqbblp::PushStream::Element* element =
ps.create(subscription.rdaInfo(), subscription.id(), itGuid, itApp);

ps.add(element);
ps.remove(element, true);
Expand Down Expand Up @@ -85,7 +84,8 @@ static void test2_iterations()
mqbblp::PushStream::Apps::iterator itApp1 =
ps.d_apps.emplace(subQueueId1, unused).first;

mqbblp::PushStream::Element* element1 = ps.create(subscription1,
mqbblp::PushStream::Element* element1 = ps.create(subscription1.rdaInfo(),
subscription1.id(),
itGuid1,
itApp1);

Expand All @@ -97,19 +97,22 @@ static void test2_iterations()
mqbblp::PushStream::Apps::iterator itApp2 =
ps.d_apps.emplace(subQueueId2, unused).first;

mqbblp::PushStream::Element* element2 = ps.create(subscription2,
mqbblp::PushStream::Element* element2 = ps.create(subscription2.rdaInfo(),
subscription2.id(),
itGuid2,
itApp2);

ps.add(element2);

mqbblp::PushStream::Element* element3 = ps.create(subscription2,
mqbblp::PushStream::Element* element3 = ps.create(subscription2.rdaInfo(),
subscription2.id(),
itGuid1,
itApp2);

ps.add(element3);

mqbblp::PushStream::Element* element4 = ps.create(subscription1,
mqbblp::PushStream::Element* element4 = ps.create(subscription1.rdaInfo(),
subscription1.id(),
itGuid2,
itApp1);

Expand Down
33 changes: 17 additions & 16 deletions src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,25 +916,26 @@ void QueueEngineTester::post(const bslstl::StringRef& messages,
bsl::vector<bsl::string> msgs(d_allocator_p);
parseMessages(&msgs, messages);

bmqp::Protocol::SubQueueInfosArray subscriptions(d_allocator_p);
for (unsigned int i = 0; i < msgs.size(); ++i) {
// Each message must have its own 'subscriptions'.
bmqp::Protocol::SubQueueInfosArray subscriptions(d_allocator_p);

if (d_subIds.empty()) {
subscriptions.push_back(
bmqp::SubQueueInfo(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID));
}
else {
// Assume, RelayQueueEngine will use upstreamSubQueueIds as the
// subscriptionIds.
// This needs to be in accord with the 'configureHandle' logic.

for (SubIdsMap::const_iterator cit = d_subIds.cbegin();
cit != d_subIds.cend();
++cit) {
subscriptions.push_back(bmqp::SubQueueInfo(cit->second));
if (d_subIds.empty()) {
subscriptions.push_back(
bmqp::SubQueueInfo(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID));
}
else {
// Assume, RelayQueueEngine will use upstreamSubQueueIds as the
// subscriptionIds.
// This needs to be in accord with the 'configureHandle' logic.

for (SubIdsMap::const_iterator cit = d_subIds.cbegin();
cit != d_subIds.cend();
++cit) {
subscriptions.push_back(bmqp::SubQueueInfo(cit->second));
}
}
}

for (unsigned int i = 0; i < msgs.size(); ++i) {
// Put in storage
bmqt::MessageGUID msgGUID;
mqbi::StorageMessageAttributes msgAttributes;
Expand Down
Loading

0 comments on commit a6dcd0d

Please sign in to comment.