Skip to content

Commit

Permalink
Performance[MQBSTAT]: lookup for per-appId metrics O(n) -> O(1) (bloo…
Browse files Browse the repository at this point in the history
…mberg#389)


Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 authored and alexander-e1off committed Oct 24, 2024
1 parent 423d1b0 commit 189a784
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 93 deletions.
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queuestate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ QueueState::QueueState(mqbi::Queue* queue,
, d_scheduler_p(0)
, d_miscWorkThreadPool_p(0)
, d_storage_mp(0)
, d_stats()
, d_stats(allocator)
, d_messageThrottleConfig()
, d_handleCatalog(queue, allocator)
, d_context(queue->schemaLearner(), allocator)
Expand All @@ -80,7 +80,7 @@ QueueState::QueueState(mqbi::Queue* queue,
d_handleParameters.qId() = d_id;

// Initialize stats
d_stats.initialize(d_uri, d_domain_p, allocator);
d_stats.initialize(d_uri, d_domain_p);

// NOTE: The 'description' will be set by the owner of this object.

Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ mqbc::ClusterDataIdentity clusterIdentity(const bslstl::StringRef& name,
// Create client identity
bmqp_ctrlmsg::ClientIdentity identity(allocator);
if (!isRemote) {
BSLS_ASSERT_SAFE(netCluster->selfNode());

identity.protocolVersion() = bmqp::Protocol::k_VERSION;
identity.sdkVersion() = bmqscm::Version::versionAsInt();
identity.clientType() = bmqp_ctrlmsg::ClientType::E_TCPBROKER;
Expand Down
11 changes: 10 additions & 1 deletion src/groups/mqb/mqbmock/mqbmock_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ void Cluster::_initializeNetcluster()
: k_LEADER_NODE_ID + 1;
dynamic_cast<mqbnet::MockCluster*>(d_netCluster_mp.get())
->_setSelfNodeId(selfNodeId);

if (d_isClusterMember) {
BSLS_ASSERT_OPT(0 != d_netCluster_mp->selfNode());
}
}

void Cluster::_initializeNodeSessions()
Expand Down Expand Up @@ -231,7 +235,12 @@ Cluster::Cluster(bdlbb::BlobBufferFactory* bufferFactory,
, d_processor()
{
// PRECONDITIONS
BSLS_ASSERT_OPT(isClusterMember || !isLeader);
if (isClusterMember) {
BSLS_ASSERT_OPT(!clusterNodeDefs.empty());
}
else {
BSLS_ASSERT_OPT(!isLeader);
}

_initializeClusterDefinition(name,
location,
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbmock/mqbmock_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Queue::Queue(mqbi::Domain* domain, bslma::Allocator* allocator)
, d_hasMultipleSubStreams(false)
, d_handleParameters(allocator)
, d_streamParameters(allocator)
, d_stats()
, d_stats(allocator)
, d_domain_p(domain)
, d_dispatcher_p(0)
, d_queueEngine_p(0)
Expand All @@ -68,7 +68,7 @@ Queue::Queue(mqbi::Domain* domain, bslma::Allocator* allocator)

// Initialize stats
if (domain) {
d_stats.initialize(d_uri, domain, allocator);
d_stats.initialize(d_uri, domain);
}
}

Expand Down
128 changes: 61 additions & 67 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,29 +193,6 @@ bool filterDirect(const mwcst::TableRecords::Record& record)
return record.type() == mwcst::StatContext::e_TOTAL_VALUE;
}

/// Functor object returning `true`, i.e., filter out, if the specified 'name'
/// matches context's name
class ContextNameMatcher {
private:
// DATA
const bsl::string& d_name;

public:
// CREATORS
ContextNameMatcher(const bsl::string& name)
: d_name(name)
{
// NOTHING
}

// ACCESSORS
bool
operator()(const bslma::ManagedPtr<mwcst::StatContext>& context_mp) const
{
return (context_mp->name() == d_name);
}
};

} // close unnamed namespace

// -----------------------------
Expand Down Expand Up @@ -439,21 +416,21 @@ QueueStatsDomain::getValue(const mwcst::StatContext& context,
#undef STAT_SINGLE
}

QueueStatsDomain::QueueStatsDomain()
: d_statContext_mp(0)
, d_subContexts_mp(0)
QueueStatsDomain::QueueStatsDomain(bslma::Allocator* allocator)
: d_allocator_p(bslma::Default::allocator(allocator))
, d_statContext_mp(0)
, d_subContextsHolder(d_allocator_p)
, d_subContextsLookup(d_allocator_p)
{
// NOTHING
}

void QueueStatsDomain::initialize(const bmqt::Uri& uri,
mqbi::Domain* domain,
bslma::Allocator* allocator)
void QueueStatsDomain::initialize(const bmqt::Uri& uri, mqbi::Domain* domain)
{
BSLS_ASSERT_SAFE(!d_statContext_mp && "initialize was already called");

// Create subContext
bdlma::LocalSequentialAllocator<2048> localAllocator(allocator);
bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);

d_statContext_mp = domain->queueStatContext()->addSubcontext(
mwcst::StatContextConfiguration(uri.canonical(), &localAllocator));
Expand Down Expand Up @@ -491,17 +468,16 @@ void QueueStatsDomain::initialize(const bmqt::Uri& uri,
if (!domain->cluster()->isRemote() &&
domain->config().mode().isFanoutValue() &&
domain->config().mode().fanout().publishAppIdMetrics()) {
d_subContexts_mp.load(new (*allocator)
bsl::list<StatSubContextMp>(allocator),
allocator);
const bsl::vector<bsl::string>& appIDs =
domain->config().mode().fanout().appIDs();
for (bsl::vector<bsl::string>::const_iterator cit = appIDs.begin();
cit != appIDs.end();
++cit) {
StatSubContextMp subContext = d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(*cit, &localAllocator));
d_subContexts_mp->emplace_back(

d_subContextsLookup.insert(bsl::make_pair(*cit, subContext.get()));
d_subContextsHolder.emplace_back(
bslmf::MovableRefUtil::move(subContext));
}
}
Expand Down Expand Up @@ -609,25 +585,27 @@ void QueueStatsDomain::onEvent(EventType::Enum type,

BALL_LOG_SET_CATEGORY(k_LOG_CATEGORY);

if (!d_subContexts_mp) {
if (d_subContextsLookup.empty()) {
BALL_LOGTHROTTLE_WARN(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "[THROTTLED] No built sub contexts";
<< "[THROTTLED] No built sub contexts for domain: "
<< d_statContext_mp->name() << ", appId: " << appId;
return; // RETURN
}

bsl::list<StatSubContextMp>::iterator it = bsl::find_if(
d_subContexts_mp->begin(),
d_subContexts_mp->end(),
ContextNameMatcher(appId));
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(it == d_subContexts_mp->end())) {
bsl::unordered_map<bsl::string, mwcst::StatContext*>::iterator it =
d_subContextsLookup.find(appId);

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(it ==
d_subContextsLookup.end())) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

BALL_LOGTHROTTLE_WARN(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "[THROTTLED] No matching StatContext for appId: " << appId;
<< "[THROTTLED] No matching StatContext for domain: "
<< d_statContext_mp->name() << ", appId: " << appId;
return; // RETURN
}

mwcst::StatContext* appIdContext = it->get();
mwcst::StatContext* appIdContext = it->second;
BSLS_ASSERT_SAFE(appIdContext);

switch (type) {
Expand Down Expand Up @@ -678,38 +656,54 @@ void QueueStatsDomain::setQueueContentRaw(bsls::Types::Int64 messages,
void QueueStatsDomain::updateDomainAppIds(
const bsl::vector<bsl::string>& appIds)
{
if (!d_subContexts_mp) {
if (appIds.empty()) {
d_subContextsLookup.clear();
d_subContextsHolder.clear();
return; // RETURN
}

bdlma::LocalSequentialAllocator<2048> localAllocator;

// Add subcontexts for appIds that are not already present
for (bsl::vector<bsl::string>::const_iterator cit = appIds.begin();
cit != appIds.end();
++cit) {
if (bsl::find_if(d_subContexts_mp->begin(),
d_subContexts_mp->end(),
ContextNameMatcher(*cit)) ==
d_subContexts_mp->end()) {
StatSubContextMp subContext = d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(*cit, &localAllocator));
d_subContexts_mp->emplace_back(
bslmf::MovableRefUtil::move(subContext));
}
}

// Remove subcontexts if appIds are not present in updated AppIds
bsl::list<StatSubContextMp>::iterator it = d_subContexts_mp->begin();
while (it != d_subContexts_mp->end()) {
if (bsl::find(appIds.begin(), appIds.end(), it->get()->name()) ==
appIds.end()) {
it = d_subContexts_mp->erase(it);
bsl::unordered_set<bsl::string> remainingAppIds(appIds.begin(),
appIds.end(),
d_allocator_p);

// 1. Remove subcontexts for unneeded appIds
bsl::list<StatSubContextMp>::iterator it = d_subContextsHolder.begin();
while (it != d_subContextsHolder.end()) {
const bsl::string& ctxAppId = it->get()->name();
bsl::unordered_set<bsl::string>::const_iterator sIt =
remainingAppIds.find(ctxAppId);
if (sIt == remainingAppIds.end()) {
// Subcontext for this appId is no longer needed, remove it from
// the holder and lookup table
d_subContextsLookup.erase(ctxAppId);
it = d_subContextsHolder.erase(it);
}
else {
// This appId is needed, but the stat context is already built for
// it
remainingAppIds.erase(sIt);
++it;
}
}

if (remainingAppIds.empty()) {
return; // RETURN
}

// 2. Add the remaining appIds
bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);

for (bsl::unordered_set<bsl::string>::const_iterator sIt =
remainingAppIds.begin();
sIt != remainingAppIds.end();
sIt++) {
StatSubContextMp subContext = d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(*sIt, &localAllocator));

d_subContextsLookup.insert(bsl::make_pair(*sIt, subContext.get()));
d_subContextsHolder.emplace_back(
bslmf::MovableRefUtil::move(subContext));
}
}

// -----------------------------
Expand Down
43 changes: 33 additions & 10 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
#include <bsl_list.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bsl_unordered_map.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_cpp11.h>
#include <bsls_types.h>

Expand All @@ -70,6 +73,9 @@ namespace mqbstat {
/// domain.
class QueueStatsDomain {
public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(QueueStatsDomain, bslma::UsesBslmaAllocator)

// TYPES

/// Enum representing the various type of events for which statistics
Expand Down Expand Up @@ -182,11 +188,29 @@ class QueueStatsDomain {
typedef bslma::ManagedPtr<mwcst::StatContext> StatSubContextMp;

// PRIVATE DATA
/// Allocator to use
bslma::Allocator* d_allocator_p;

/// StatContext
bslma::ManagedPtr<mwcst::StatContext> d_statContext_mp;
// StatContext
bslma::ManagedPtr<bsl::list<StatSubContextMp> > d_subContexts_mp;
// List of appId subcontexts. It is initialized if domain name is in the
// list of enabled domains in broker's `stats` configuration.

/// List of per-appId subcontexts stored as managed pointers.
/// Note: `mwcst::StatContext` interface allocates subcontexts as
/// managed pointers. We are not able to store managed pointers
/// in a collection that might reallocate and copy its elements,
/// since ManagedPtr implementation on Solaris is constraining.
/// This is why list is used to store managed pointers. But we
/// also want to perform fast lookups to subcontexts, and for this
/// we have `d_subContextsLookup` table that points to raw pointers
/// to subcontexts. These both fields must be kept in sync during
/// reconfiguration.
/// TODO: use one bsl::unordered_map to store and lookup if Solaris support
/// is stopped.
bsl::list<StatSubContextMp> d_subContextsHolder;

/// Lookup table for per-appId subcontexts. Managed pointers to these
/// subcontexts must be held in `d_subContextsHolder`.
bsl::unordered_map<bsl::string, mwcst::StatContext*> d_subContextsLookup;

private:
// NOT IMPLEMENTED
Expand All @@ -212,18 +236,17 @@ class QueueStatsDomain {

// CREATORS

/// Create a new object in an uninitialized state.
QueueStatsDomain();
/// Create a new object in an uninitialized state, using the specified
/// `allocator` for any memory allocations.
explicit QueueStatsDomain(bslma::Allocator* allocator);

// MANIPULATORS

/// Initialize this object for the queue with the specified `uri`, and
/// register it as a subcontext of the specified `domainStatContext`
/// (which correspond to the domain-level stat context this queue is
/// part of), using the specified `allocator`.
void initialize(const bmqt::Uri& uri,
mqbi::Domain* domain,
bslma::Allocator* allocator);
/// part of).
void initialize(const bmqt::Uri& uri, mqbi::Domain* domain);

/// Set the reader count to the specified `readerCount`. Return the
/// `QueueStatsDomain` object.
Expand Down
Loading

0 comments on commit 189a784

Please sign in to comment.