Skip to content

Commit

Permalink
Perf[MQB]: do not build temporary functors for every routed message (#…
Browse files Browse the repository at this point in the history
…477)

Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 authored Oct 28, 2024
1 parent c140945 commit 9a9b79f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 18 deletions.
41 changes: 25 additions & 16 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,17 @@ QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext(
, d_currentMessage(0)
, d_queue_p(queue)
, d_timeDelta()
, d_currentAppView_p(0)
, d_visitVisitor(
bdlf::BindUtil::bindS(allocator,
&QueueEngineUtil_AppsDeliveryContext::visit,
this,
bdlf::PlaceHolders::_1))
, d_broadcastVisitor(bdlf::BindUtil::bindS(
allocator,
&QueueEngineUtil_AppsDeliveryContext::visitBroadcast,
this,
bdlf::PlaceHolders::_1))
{
BSLS_ASSERT_SAFE(queue);
}
Expand Down Expand Up @@ -670,12 +681,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(

if (d_queue_p->isDeliverAll()) {
// collect all handles
app.routing()->iterateConsumers(
bdlf::BindUtil::bind(
&QueueEngineUtil_AppsDeliveryContext::visitBroadcast,
this,
bdlf::PlaceHolders::_1),
d_currentMessage);
app.routing()->iterateConsumers(d_broadcastVisitor, d_currentMessage);

d_isReady = true;

Expand Down Expand Up @@ -708,13 +714,14 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
// mapped file, which can delay the unmapping of files in case this
// message has a huge TTL and there are no consumers for this message.

Routers::Result result = app.selectConsumer(
bdlf::BindUtil::bind(&QueueEngineUtil_AppsDeliveryContext::visit,
this,
bdlf::PlaceHolders::_1,
appView),
d_currentMessage,
ordinal);
d_currentAppView_p = &appView;
Routers::Result result = app.selectConsumer(d_visitVisitor,
d_currentMessage,
ordinal);
// We use this pointer only from `d_visitVisitor`, so not cleaning it is
// okay, but we clean it to keep contract that `d_currentAppView_p` only
// points at the actual AppView.
d_currentAppView_p = NULL;

if (result == Routers::e_SUCCESS) {
// RootQueueEngine makes stat reports
Expand Down Expand Up @@ -749,14 +756,16 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
}

bool QueueEngineUtil_AppsDeliveryContext::visit(
const Routers::Subscription* subscription,
const mqbi::AppMessage& appView)
const Routers::Subscription* subscription)
{
BSLS_ASSERT_SAFE(subscription);
BSLS_ASSERT_SAFE(
d_currentAppView_p &&
"`d_currentAppView_p` must be assigned before calling this function");

d_consumers[subscription->handle()].push_back(
bmqp::SubQueueInfo(subscription->d_downstreamSubscriptionId,
appView.d_rdaInfo));
d_currentAppView_p->d_rdaInfo));

return true;
}
Expand Down
19 changes: 17 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,26 @@ struct QueueEngineUtil_AppsDeliveryContext {
mqbi::StorageIterator* d_currentMessage;
mqbi::Queue* d_queue_p;
bsl::optional<bsls::Types::Int64> d_timeDelta;

/// Mutable additional argument used in `visit()`, when it is called from
/// `d_visitVisitor` functor.
const mqbi::AppMessage* d_currentAppView_p;

/// Cached functor to `QueueEngineUtil_AppsDeliveryContext::visit`
const Routers::Visitor d_visitVisitor;

/// Cached functor to `QueueEngineUtil_AppsDeliveryContext::visitBroadcast`
const Routers::Visitor d_broadcastVisitor;

// Avoid reading the attributes if not necessary. Get timeDelta on demand.
// See comment in `QueueEngineUtil_AppsDeliveryContext::processApp`.

public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(QueueEngineUtil_AppsDeliveryContext,
bslma::UsesBslmaAllocator)

// CREATORS
QueueEngineUtil_AppsDeliveryContext(mqbi::Queue* queue,
bslma::Allocator* allocator);

Expand Down Expand Up @@ -637,8 +653,7 @@ struct QueueEngineUtil_AppsDeliveryContext {
bool processApp(QueueEngineUtil_AppState& app, unsigned int ordina);

/// Collect and prepare data for the subsequent `deliverMessage` call.
bool visit(const Routers::Subscription* subscription,
const mqbi::AppMessage& appView);
bool visit(const Routers::Subscription* subscription);
bool visitBroadcast(const Routers::Subscription* subscription);

/// Deliver message to the previously processed handles.
Expand Down

0 comments on commit 9a9b79f

Please sign in to comment.