Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix PushStream & VirtualStorage ordinal #528

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,8 @@ void PushStreamIterator::removeCurrentElement()
d_currentElement = d_currentElement->next();
++d_currentOrdinal;

d_owner_p->remove(del);
d_owner_p->destroy(del, true);
// doKeepGuid because of the d_iterator
d_owner_p->remove(del, false);
// cannot erase the GUID because of the d_iterator

if (d_iterator->second.numElements() == 0) {
BSLS_ASSERT_SAFE(d_currentElement == 0);
Expand Down Expand Up @@ -291,9 +290,8 @@ bool VirtualPushStreamIterator::advance()

d_currentElement = d_currentElement->nextInApp();

d_owner_p->remove(del);
d_owner_p->destroy(del, false);
// do not keep Guid
d_owner_p->remove(del, true);
// can erase GUID

if (atEnd()) {
return false;
Expand Down
44 changes: 19 additions & 25 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,11 @@ struct PushStream {
void add(Element* element);

/// Remove the specified `element` from both GUID and App corresponding to
/// the `element` (and specified when constructing the `element`).
/// Return the number of remaining Elements in the corresponding GUID.
unsigned int remove(Element* element);
/// the `element` (and specified when constructing the `element`). If
/// there are no more elements in the App, erase the App. If the specified
/// `canEraseGuid` is `true` and there are no more elements in the GUID,
/// erase the GUID.
void remove(Element* element, bool canEraseGuid);

/// Remove all PushStream Elements corresponding to the specified
/// `upstreamSubQueueId`. Erase each corresponding GUIDs from the
Expand All @@ -218,9 +220,6 @@ struct PushStream {
Element* create(const bmqp::SubQueueInfo& info,
const iterator& iterator,
const Apps::iterator& iteratorApp);

/// Destroy the specified `element`
void destroy(Element* element, bool doKeepGuid);
};

// ========================
Expand Down Expand Up @@ -605,6 +604,7 @@ inline void PushStream::App::add(Element* element)
{
d_elements.add(element, e_APP);
}

inline void PushStream::App::remove(Element* element)
{
d_elements.remove(element, e_APP);
Expand All @@ -631,19 +631,6 @@ PushStream::create(const bmqp::SubQueueInfo& subscription,
return element;
}

inline void PushStream::destroy(Element* element, bool doKeepGuid)
{
if (element->app().d_elements.numElements() == 0) {
element->eraseApp(d_apps);
}

if (!doKeepGuid && element->guid().numElements() == 0) {
element->eraseGuid(d_stream);
}

d_pushElementsPool_sp->deallocate(element);
}

inline PushStream::iterator
PushStream::findOrAppendMessage(const bmqt::MessageGUID& guid)
{
Expand All @@ -662,7 +649,7 @@ inline void PushStream::add(Element* element)
element->app().add(element);
}

inline unsigned int PushStream::remove(Element* element)
inline void PushStream::remove(Element* element, bool canEraseGuid)
{
BSLS_ASSERT_SAFE(element);
BSLS_ASSERT_SAFE(!element->equal(d_stream.end()));
Expand All @@ -673,7 +660,15 @@ inline unsigned int PushStream::remove(Element* element)
// remove from the guid
element->guid().remove(element, e_GUID);

return element->guid().numElements();
if (element->app().d_elements.numElements() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Centralizing this bit of logic as here seems right. I've traced through the code paths leading up to this and I think this catches all places we need.

element->eraseApp(d_apps);
}

if (canEraseGuid && element->guid().numElements() == 0) {
element->eraseGuid(d_stream);
}

d_pushElementsPool_sp->deallocate(element);
}

inline unsigned int PushStream::removeApp(unsigned int upstreamSubQueueId)
Expand All @@ -695,10 +690,9 @@ inline unsigned int PushStream::removeApp(Apps::iterator itApp)
for (unsigned int count = 0; count < numElements; ++count) {
Element* element = itApp->second.d_elements.front();

remove(element);

destroy(element, false);
// do not keep Guid
remove(element, true);
// do not keep Guid. This relies on either 'beforeOneAppRemoved' or
// resetting iterator(s).
}

return numElements;
Expand Down
9 changes: 3 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ static void test1_basic()
itApp);

ps.add(element);
ps.remove(element);
ps.destroy(element, false);
ps.remove(element, true);
}

static void test2_iterations()
Expand Down Expand Up @@ -183,10 +182,8 @@ static void test2_iterations()
ASSERT(vit.atEnd());
}

ps.remove(element2);
ps.destroy(element2, false);
ps.remove(element3);
ps.destroy(element3, false);
ps.remove(element2, true);
ps.remove(element3, true);
}

// ============================================================================
Expand Down
42 changes: 25 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext(
mqbi::Queue* queue,
bslma::Allocator* allocator)
: d_consumers(allocator)
, d_isReady(false)
, d_numApps(0)
, d_numStops(0)
, d_currentMessage(0)
, d_queue_p(queue)
, d_timeDelta()
Expand All @@ -652,25 +653,26 @@ QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext(
BSLS_ASSERT_SAFE(queue);
}

void QueueEngineUtil_AppsDeliveryContext::start()
{
d_isReady = true;
}

bool QueueEngineUtil_AppsDeliveryContext::reset(
mqbi::StorageIterator* currentMessage)
{
d_consumers.clear();
d_timeDelta.reset();

if (!d_isReady) {
return false; // RETURN
bool result = false;

if (haveProgress() && currentMessage && currentMessage->hasReceipt()) {
d_currentMessage = currentMessage;
result = true;
}
else {
d_currentMessage = 0;
}

d_currentMessage = currentMessage;
d_isReady = false;
d_numApps = 0;
d_numStops = 0;

return d_currentMessage ? d_currentMessage->hasReceipt() : false;
return result;
}

bool QueueEngineUtil_AppsDeliveryContext::processApp(
Expand All @@ -679,12 +681,12 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
{
BSLS_ASSERT_SAFE(d_currentMessage->hasReceipt());

++d_numApps;

if (d_queue_p->isDeliverAll()) {
// collect all handles
app.routing()->iterateConsumers(d_broadcastVisitor, d_currentMessage);

d_isReady = true;

// Broadcast does not need stats nor any special per-message treatment.
return false; // RETURN
}
Expand All @@ -695,6 +697,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
// The queue iterator can advance leaving the 'app' behind.
app.setResumePoint(d_currentMessage->guid());
}
++d_numStops;
// else the existing resumePoint is earlier (if authorized)
return false; // RETURN
}
Expand All @@ -703,7 +706,6 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
ordinal);

if (!appView.isNew()) {
d_isReady = true;
return true; // RETURN
}

Expand Down Expand Up @@ -736,7 +738,9 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(

// Early return.
// If all Apps return 'e_NO_CAPACITY_ALL', stop the iteration
// (d_isReady == false).
// (d_numApps == 0).

++d_numStops;

return false; // RETURN
}
Expand All @@ -750,7 +754,6 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
}

// Still making progress (result != Routers::e_NO_CAPACITY_ALL)
d_isReady = true;

return (result == Routers::e_SUCCESS);
}
Expand Down Expand Up @@ -811,7 +814,7 @@ void QueueEngineUtil_AppsDeliveryContext::deliverMessage()
}
}

if (d_isReady) {
if (haveProgress()) {
d_currentMessage->advance();
}

Expand All @@ -823,6 +826,11 @@ bool QueueEngineUtil_AppsDeliveryContext::isEmpty() const
return d_consumers.empty();
}

bool QueueEngineUtil_AppsDeliveryContext::haveProgress() const
{
return (d_numStops < d_numApps || d_numApps == 0);
}

bsls::Types::Int64 QueueEngineUtil_AppsDeliveryContext::timeDelta()
{
if (!d_timeDelta.has_value()) {
Expand Down
9 changes: 5 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,8 @@ struct QueueEngineUtil_AppsDeliveryContext {

private:
Consumers d_consumers;
bool d_isReady;
int d_numApps;
int d_numStops; // Apps not moving
mqbi::StorageIterator* d_currentMessage;
mqbi::Queue* d_queue_p;
bsl::optional<bsls::Types::Int64> d_timeDelta;
Expand All @@ -626,9 +627,6 @@ struct QueueEngineUtil_AppsDeliveryContext {
QueueEngineUtil_AppsDeliveryContext(mqbi::Queue* queue,
bslma::Allocator* allocator);

/// Start delivery cycle(s).
void start();

/// Prepare the context to process next message.
/// Return `true` if the delivery can continue iterating dataStream
/// The `false` return value indicates either the end of the dataStream or
Expand Down Expand Up @@ -662,6 +660,9 @@ struct QueueEngineUtil_AppsDeliveryContext {
/// Return `true` if there is at least one delivery target selected.
bool isEmpty() const;

/// Return `true` if not all Apps are at capacity or there are no Apps.
bool haveProgress() const;

bsls::Types::Int64 timeDelta();
};

Expand Down
27 changes: 16 additions & 11 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,6 @@ void RelayQueueEngine::deliverMessages()
// 1. End of storage; or
// 2. All subStreams return 'e_NO_CAPACITY_ALL'

d_appsDeliveryContext.start();

while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) {
// Assume, all Apps need to deliver (some may be at capacity)
unsigned int numApps = d_storageIter_mp->numApps();
Expand All @@ -604,14 +602,13 @@ void RelayQueueEngine::deliverMessages()

d_storageIter_mp->removeCurrentElement();
}

if (d_appsDeliveryContext.processApp(*app, i)) {
else if (d_appsDeliveryContext.processApp(*app, i)) {
// The current element has made it either to delivery or
// putAside or resumerPoint and it can be removed
// putAside and it can be removed
d_storageIter_mp->removeCurrentElement();
}
// Else, the current element has made it to resumerPoint and
// it cannot be removed
// Else, the current element has made it to resumePoint and it
// cannot be removed.
}
d_appsDeliveryContext.deliverMessage();
}
Expand Down Expand Up @@ -1919,14 +1916,22 @@ void RelayQueueEngine::storePush(mqbi::StorageMessageAttributes* attributes,
void RelayQueueEngine::beforeOneAppRemoved(unsigned int upstreamSubQueueId)
{
while (!d_storageIter_mp->atEnd()) {
if (d_storageIter_mp->numApps() > 1) {
const int numApps = d_storageIter_mp->numApps();
if (numApps > 1) {
// Removal of App's elements will not invalidate 'd_storageIter_mp'
break;
}
if (numApps == 1) {
const PushStream::Element* element = d_storageIter_mp->element(0);
if (element->app().d_app->upstreamSubQueueId() !=
upstreamSubQueueId) {
break;
}
}
else {
BSLS_ASSERT_SAFE(numApps == 0);

const PushStream::Element* element = d_storageIter_mp->element(0);
if (element->app().d_app->upstreamSubQueueId() != upstreamSubQueueId) {
break;
// The case when 'advance' does not follow 'removeCurrentElement'
}

d_storageIter_mp->advance();
Expand Down
1 change: 0 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,6 @@ void RootQueueEngine::afterNewMessage(
d_queueState_p->queue()));

// Deliver new messages to active (alive and capable to deliver) consumers
d_appsDeliveryContext.start();

while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) {
// Assume, all Apps need to deliver (some may be at capacity)
Expand Down
7 changes: 4 additions & 3 deletions src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,10 @@ int VirtualStorageCatalog::addVirtualStorage(bsl::ostream& errorDescription,
appOrdinal = d_nextOrdinal++;
}
else {
appOrdinal = d_availableOrdinals.front();
AvailableOrdinals::const_iterator first = d_availableOrdinals.cbegin();
appOrdinal = *first;
// There is no conflict because everything 'appOrdinal' was removed.
d_availableOrdinals.pop_front();
d_availableOrdinals.erase(first);
}

BSLS_ASSERT_SAFE(appOrdinal <= d_virtualStorages.size());
Expand Down Expand Up @@ -468,7 +469,7 @@ bool VirtualStorageCatalog::removeVirtualStorage(
removeAll(appKey);

const VirtualStorage& vs = *it->value();
d_availableOrdinals.push_back(vs.ordinal());
d_availableOrdinals.insert(vs.ordinal());

if (d_queue_p) {
BSLS_ASSERT_SAFE(d_queue_p->queueEngine());
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class VirtualStorageCatalog {
typedef bsl::shared_ptr<VirtualStorage> VirtualStorageSp;

/// List of available ordinal values for Virtual Storages.
typedef bsl::list<Ordinal> AvailableOrdinals;
typedef bsl::set<Ordinal> AvailableOrdinals;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine. Ensures that we always get the smallest available ordinal, and we're not going to incur the cost of chasing a few more pointers in the critical path.


/// appKey -> virtualStorage
typedef bmqc::
Expand Down
Loading