Skip to content

Commit

Permalink
Perf[MQB]: inline stats onEvent (#542)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Petukhov <[email protected]>
  • Loading branch information
tepamid authored Dec 16, 2024
1 parent 171d7cc commit 5a6670d
Show file tree
Hide file tree
Showing 21 changed files with 655 additions and 535 deletions.
22 changes: 12 additions & 10 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,11 @@ void ClientSession::sendAck(bmqt::AckResult::Enum status,

// If queue is found, report locally generated NACK
if (isSelfGenerated) {
queueState->d_handle_p->queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
queueState->d_handle_p->queue()
->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(

1);
}
}

Expand Down Expand Up @@ -1658,9 +1660,9 @@ void ClientSession::onAckEvent(const mqbi::DispatcherAckEvent& event)
// Calculate time delta between PUT and ACK
const bsls::Types::Int64 timeDelta =
bmqsys::Time::highResolutionTimer() - cit->second.d_timeStamp;
queue->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_ACK_TIME,
timeDelta);
queue->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
timeDelta);

if (!d_isClientGeneratingGUIDs) {
// Legacy client.
Expand Down Expand Up @@ -2693,8 +2695,8 @@ ClientSession::ClientSession(
this,
bdlf::PlaceHolders::_1)); // type

mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_CLIENT_CREATED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_CLIENT_CREATED>();

BALL_LOG_INFO << description() << ": created "
<< "[dispatcherProcessor: " << processor
Expand All @@ -2714,8 +2716,8 @@ ClientSession::~ClientSession()

BALL_LOG_INFO << description() << ": destructor";

mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_CLIENT_DESTROYED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_CLIENT_DESTROYED>();

// Unregister from the dispatcher
dispatcher()->unregisterClient(this);
Expand Down
35 changes: 19 additions & 16 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,

// If queue exists, report self generated NACK
if (isSelfGenerated) {
it->second.d_handle_p->queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
it->second.d_handle_p->queue()
->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);
}
}
else if (!isSelfGenerated) {
Expand Down Expand Up @@ -467,9 +467,9 @@ void Cluster::sendAck(bmqt::AckResult::Enum status,
cit->second.d_subQueueInfosMap.findBySubIdSafe(
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID);
if (subQueueCiter != cit->second.d_subQueueInfosMap.end()) {
subQueueCiter->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_ACK,
1);
subQueueCiter->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_ACK>(1);
}
// In the case of Strong Consistency, a Receipt can arrive and trigger
// an ACK after Producer closes subStream.
Expand Down Expand Up @@ -548,7 +548,7 @@ void Cluster::generateNack(bmqt::AckResult::Enum status,
options);

// Report locally generated NACK
queue->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, 1);
queue->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);

bmqu::MemOutStream os;
os << description() << ": Failed to relay PUT message "
Expand Down Expand Up @@ -1206,9 +1206,11 @@ void Cluster::onPutEvent(const mqbi::DispatcherPutEvent& event)
queueState.d_subQueueInfosMap.findBySubId(
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID);

subQueueCiter->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_PUT,
appDataSp->length());
subQueueCiter->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_PUT>(

appDataSp->length());

// TBD: groupId: Similar to 'appDataSp' above, load 'optionsSp' here,
// using something like PutMessageIterator::loadOptions().
Expand Down Expand Up @@ -1644,9 +1646,9 @@ Cluster::validateMessage(mqbi::QueueHandle** queueHandle,

if (eventType == bmqp::EventType::e_CONFIRM) {
// Update client stats
subQueueIt->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_CONFIRM,
1);
subQueueIt->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_CONFIRM>(1);
}

return ValidationResult::k_SUCCESS;
Expand Down Expand Up @@ -1912,9 +1914,10 @@ void Cluster::onPushEvent(const mqbi::DispatcherPushEvent& event)
queueState.d_subQueueInfosMap.findBySubscriptionId(
event.subQueueInfos()[i].id());

subQueueCiter->value().d_clientStats->onEvent(
mqbstat::ClusterNodeStats::EventType::e_PUSH,
event.blob() ? event.blob()->length() : 0);
subQueueCiter->value()
.d_clientStats
->onEvent<mqbstat::ClusterNodeStats::EventType::e_PUSH>(
event.blob() ? event.blob()->length() : 0);
}

bmqt::GenericResult::Enum rc = bmqt::GenericResult::e_SUCCESS;
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ int Domain::configure(bsl::ostream& errorDescription,
d_capacityMeter.setLimits(limits.messages(), limits.bytes())
.setWatermarkThresholds(limits.messagesWatermarkRatio(),
limits.bytesWatermarkRatio());
d_domainsStats.onEvent(mqbstat::DomainStats::EventType::e_CFG_MSGS,
limits.messages());
d_domainsStats.onEvent(mqbstat::DomainStats::EventType::e_CFG_BYTES,
limits.bytes());
d_domainsStats.onEvent<mqbstat::DomainStats::EventType::e_CFG_MSGS>(
limits.messages());
d_domainsStats.onEvent<mqbstat::DomainStats::EventType::e_CFG_BYTES>(
limits.bytes());

if (isReconfigure) {
BSLS_ASSERT_OPT(oldConfig.has_value());
Expand Down
44 changes: 21 additions & 23 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,17 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
d_state_p->uri(),
d_state_p->partitionId());

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE,
mqbstat::QueueStatsDomain::Role::e_PRIMARY);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE>(
mqbstat::QueueStatsDomain::Role::e_PRIMARY);

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CFG_MSGS,
domainCfg.storage().queueLimits().messages());
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CFG_MSGS>(
domainCfg.storage().queueLimits().messages());

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CFG_BYTES,
domainCfg.storage().queueLimits().bytes());
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CFG_BYTES>(
domainCfg.storage().queueLimits().bytes());

if (isReconfigure) {
if (domainCfg.mode().isFanoutValue()) {
Expand Down Expand Up @@ -482,9 +482,9 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
// Calculate time delta between PUT and ACK
const bsls::Types::Int64 timeDelta =
bmqsys::Time::highResolutionTimer() - timePoint;
d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_ACK_TIME,
timeDelta);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
timeDelta);
if (res != mqbi::StorageResult::e_SUCCESS || doAck) {
bmqp::AckMessage ackMessage;
ackMessage
Expand All @@ -509,9 +509,9 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
// flushed (which occurs in 'flush' routine). In no case should
// 'afterNewMessage' be called here.

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_PUT,
appData->length());
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_PUT>(
appData->length());
}
else {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
Expand All @@ -525,9 +525,8 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
}
}
else {
d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);
}
}
}
Expand All @@ -548,9 +547,8 @@ void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() -
arrivalTimepoint;

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_ACK_TIME,
timeDelta);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(timeDelta);

if (d_state_p->handleCatalog().hasHandle(qH)) {
// Send acknowledgement
Expand All @@ -572,8 +570,8 @@ void LocalQueue::onRemoval(const bmqt::MessageGUID& msgGUID,
// TODO: do we need to update NACK stats considering that downstream can
// NACK the same GUID as well?

d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(
1);

if (d_state_p->handleCatalog().hasHandle(qH)) {
// Send negative acknowledgement
Expand Down
6 changes: 3 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1452,9 +1452,9 @@ void QueueEngineUtil_AppState::reportStats(
message->attributes());

// First report 'queue time' metric for the entire queue
d_queue_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME,
timeDelta);
d_queue_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME>(
timeDelta);

// Then report 'queue time' metric for appId
d_queue_p->stats()->onEvent(
Expand Down
22 changes: 11 additions & 11 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ QueueHandle::updateMonitor(const bsl::shared_ptr<Downstream>& subStream,
// The message exist in the storage, we likely are in situation
// (3), so it's a legit confirm and therefore, update the domain
// stats.
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM,
msgSize);
d_domainStats_p
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CONFIRM>(
msgSize);
}

BALL_LOG_INFO_BLOCK
Expand Down Expand Up @@ -404,15 +404,15 @@ mqbu::ResourceUsageMonitorStateTransition::Enum QueueHandle::updateMonitor(

if (type == bmqp::EventType::e_CONFIRM) {
// Update domain stats
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM,
msgSize);
d_domainStats_p
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CONFIRM>(
msgSize);

// Report CONFIRM time only at first hop
// Note that we update metric per entire queue and also per `appId`
if (d_clientContext_sp->isFirstHop()) {
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME,
d_domainStats_p->onEvent<
mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME>(
timeDelta);
d_domainStats_p->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CONFIRM_TIME,
Expand Down Expand Up @@ -498,8 +498,8 @@ void QueueHandle::deliverMessageImpl(
BSLS_ASSERT_SAFE(subQueueInfos.size() >= 1 &&
subQueueInfos.size() <= d_subscriptions.size());

d_domainStats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_PUSH,
msgSize);
d_domainStats_p->onEvent<mqbstat::QueueStatsDomain::EventType::e_PUSH>(
msgSize);

// Create an event to dispatch delivery of the message to the client
mqbi::DispatcherClient* client = d_clientContext_sp->client();
Expand Down Expand Up @@ -1208,7 +1208,7 @@ void QueueHandle::onAckMessage(const bmqp::AckMessage& ackMessage)
ackMsg.setQueueId(id());

client->dispatcher()->dispatchEvent(event, client);
d_domainStats_p->onEvent(mqbstat::QueueStatsDomain::EventType::e_ACK, 1);
d_domainStats_p->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK>(1);
}

bool QueueHandle::canDeliver(unsigned int downstreamSubscriptionId) const
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_queuestate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ QueueState::QueueState(mqbi::Queue* queue,

// NOTE: The 'description' will be set by the owner of this object.

mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_QUEUE_CREATED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_QUEUE_CREATED>();
}

QueueState::~QueueState()
{
mqbstat::BrokerStats::instance().onEvent(
mqbstat::BrokerStats::EventType::e_QUEUE_DESTROYED);
mqbstat::BrokerStats::instance()
.onEvent<mqbstat::BrokerStats::EventType::e_QUEUE_DESTROYED>();
}

void QueueState::add(const bmqp_ctrlmsg::QueueHandleParameters& params)
Expand Down
26 changes: 13 additions & 13 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription,
return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN
}

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE,
mqbstat::QueueStatsDomain::Role::e_PROXY);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE>(
mqbstat::QueueStatsDomain::Role::e_PROXY);

BALL_LOG_INFO
<< "Created a ProxyRemoteQueue "
Expand Down Expand Up @@ -276,9 +276,9 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription,
d_state_p->storageManager()->setQueueRaw(queue,
d_state_p->uri(),
d_state_p->partitionId());
d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE,
mqbstat::QueueStatsDomain::Role::e_REPLICA);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE>(
mqbstat::QueueStatsDomain::Role::e_REPLICA);

BALL_LOG_INFO << d_state_p->domain()->cluster()->name()
<< ": Created a ClusterMemberRemoteQueue "
Expand Down Expand Up @@ -922,9 +922,8 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
bmqt::AckResult::e_REFUSED));
ackMessage.setMessageGUID(putHeader.messageGUID());

d_state_p->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()
->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(1);

// CorrelationId & QueueId are left unset as those fields
// will be filled downstream.
Expand Down Expand Up @@ -996,8 +995,9 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,
// the time the message is actually sent upstream, i.e. in
// cluster/clusterProxy) for the most exact accuracy, but doing it here is
// good enough.
d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_PUT,
appData->length());

d_state_p->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_PUT>(
appData->length());
}

void RemoteQueue::confirmMessage(const bmqt::MessageGUID& msgGUID,
Expand Down Expand Up @@ -1496,8 +1496,8 @@ RemoteQueue::Puts::iterator& RemoteQueue::nack(Puts::iterator& it,
{
ackMessage.setMessageGUID(it->first);

d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK,
1);
d_state_p->stats()->onEvent<mqbstat::QueueStatsDomain::EventType::e_NACK>(
1);

// CorrelationId & QueueId are left unset as those fields
// will be filled downstream.
Expand Down
Loading

0 comments on commit 5a6670d

Please sign in to comment.