Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance[MQBSTAT]: lookup for per-appId metrics O(n) -> O(1) #389

Merged
merged 5 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queuestate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ QueueState::QueueState(mqbi::Queue* queue,
, d_scheduler_p(0)
, d_miscWorkThreadPool_p(0)
, d_storage_mp(0)
, d_stats()
, d_stats(allocator)
, d_messageThrottleConfig()
, d_handleCatalog(queue, allocator)
, d_context(queue->schemaLearner(), allocator)
Expand All @@ -80,7 +80,7 @@ QueueState::QueueState(mqbi::Queue* queue,
d_handleParameters.qId() = d_id;

// Initialize stats
d_stats.initialize(d_uri, d_domain_p, allocator);
d_stats.initialize(d_uri, d_domain_p);

// NOTE: The 'description' will be set by the owner of this object.

Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ mqbc::ClusterDataIdentity clusterIdentity(const bslstl::StringRef& name,
// Create client identity
bmqp_ctrlmsg::ClientIdentity identity(allocator);
if (!isRemote) {
BSLS_ASSERT_SAFE(netCluster->selfNode());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the line 65 below, netCluster->selfNode() is dereferenced without checks, which might lead to segfault. This might be achieved by using mock domain/cluster objects.


identity.protocolVersion() = bmqp::Protocol::k_VERSION;
identity.sdkVersion() = bmqscm::Version::versionAsInt();
identity.clientType() = bmqp_ctrlmsg::ClientType::E_TCPBROKER;
Expand Down
11 changes: 10 additions & 1 deletion src/groups/mqb/mqbmock/mqbmock_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ void Cluster::_initializeNetcluster()
: k_LEADER_NODE_ID + 1;
dynamic_cast<mqbnet::MockCluster*>(d_netCluster_mp.get())
->_setSelfNodeId(selfNodeId);

if (d_isClusterMember) {
BSLS_ASSERT_OPT(0 != d_netCluster_mp->selfNode());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another early safety check for the same problem with dereferencing selfNode with mocks.

}
}

void Cluster::_initializeNodeSessions()
Expand Down Expand Up @@ -231,7 +235,12 @@ Cluster::Cluster(bdlbb::BlobBufferFactory* bufferFactory,
, d_processor()
{
// PRECONDITIONS
BSLS_ASSERT_OPT(isClusterMember || !isLeader);
if (isClusterMember) {
BSLS_ASSERT_OPT(!clusterNodeDefs.empty());
Copy link
Collaborator Author

@678098 678098 Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early safety check. Providing an empty clusterNodeDefs makes it certain that selfNode will be nullptr.

}
else {
BSLS_ASSERT_OPT(!isLeader);
}

_initializeClusterDefinition(name,
location,
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbmock/mqbmock_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Queue::Queue(mqbi::Domain* domain, bslma::Allocator* allocator)
, d_hasMultipleSubStreams(false)
, d_handleParameters(allocator)
, d_streamParameters(allocator)
, d_stats()
, d_stats(allocator)
, d_domain_p(domain)
, d_dispatcher_p(0)
, d_queueEngine_p(0)
Expand All @@ -68,7 +68,7 @@ Queue::Queue(mqbi::Domain* domain, bslma::Allocator* allocator)

// Initialize stats
if (domain) {
d_stats.initialize(d_uri, domain, allocator);
d_stats.initialize(d_uri, domain);
}
}

Expand Down
128 changes: 61 additions & 67 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,29 +193,6 @@ bool filterDirect(const mwcst::TableRecords::Record& record)
return record.type() == mwcst::StatContext::e_TOTAL_VALUE;
}

/// Functor object returning `true`, i.e., filter out, if the specified 'name'
/// matches context's name
class ContextNameMatcher {
private:
// DATA
const bsl::string& d_name;

public:
// CREATORS
ContextNameMatcher(const bsl::string& name)
: d_name(name)
{
// NOTHING
}

// ACCESSORS
bool
operator()(const bslma::ManagedPtr<mwcst::StatContext>& context_mp) const
{
return (context_mp->name() == d_name);
}
};

} // close unnamed namespace

// -----------------------------
Expand Down Expand Up @@ -439,21 +416,21 @@ QueueStatsDomain::getValue(const mwcst::StatContext& context,
#undef STAT_SINGLE
}

QueueStatsDomain::QueueStatsDomain()
: d_statContext_mp(0)
, d_subContexts_mp(0)
QueueStatsDomain::QueueStatsDomain(bslma::Allocator* allocator)
: d_allocator_p(bslma::Default::allocator(allocator))
, d_statContext_mp(0)
, d_subContextsHolder(d_allocator_p)
, d_subContextsLookup(d_allocator_p)
{
// NOTHING
}

void QueueStatsDomain::initialize(const bmqt::Uri& uri,
mqbi::Domain* domain,
bslma::Allocator* allocator)
void QueueStatsDomain::initialize(const bmqt::Uri& uri, mqbi::Domain* domain)
{
BSLS_ASSERT_SAFE(!d_statContext_mp && "initialize was already called");

// Create subContext
bdlma::LocalSequentialAllocator<2048> localAllocator(allocator);
bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);

d_statContext_mp = domain->queueStatContext()->addSubcontext(
mwcst::StatContextConfiguration(uri.canonical(), &localAllocator));
Expand Down Expand Up @@ -491,17 +468,16 @@ void QueueStatsDomain::initialize(const bmqt::Uri& uri,
if (!domain->cluster()->isRemote() &&
domain->config().mode().isFanoutValue() &&
domain->config().mode().fanout().publishAppIdMetrics()) {
d_subContexts_mp.load(new (*allocator)
bsl::list<StatSubContextMp>(allocator),
allocator);
const bsl::vector<bsl::string>& appIDs =
domain->config().mode().fanout().appIDs();
for (bsl::vector<bsl::string>::const_iterator cit = appIDs.begin();
cit != appIDs.end();
++cit) {
StatSubContextMp subContext = d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(*cit, &localAllocator));
d_subContexts_mp->emplace_back(

d_subContextsLookup.insert(bsl::make_pair(*cit, subContext.get()));
d_subContextsHolder.emplace_back(
bslmf::MovableRefUtil::move(subContext));
}
}
Expand Down Expand Up @@ -609,25 +585,27 @@ void QueueStatsDomain::onEvent(EventType::Enum type,

BALL_LOG_SET_CATEGORY(k_LOG_CATEGORY);

if (!d_subContexts_mp) {
if (d_subContextsLookup.empty()) {
BALL_LOGTHROTTLE_WARN(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "[THROTTLED] No built sub contexts";
<< "[THROTTLED] No built sub contexts for domain: "
<< d_statContext_mp->name() << ", appId: " << appId;
Copy link
Collaborator Author

@678098 678098 Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New log line looks like
06AUG2024_02:21:59.442 10822:8281000960 WARN /blazingmq/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp:602 MQBSTAT.QUEUESTATS [THROTTLED] No matching StatContext for domain: bmq://mock-domain/abc, appId: bar

return; // RETURN
}

bsl::list<StatSubContextMp>::iterator it = bsl::find_if(
d_subContexts_mp->begin(),
d_subContexts_mp->end(),
ContextNameMatcher(appId));
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(it == d_subContexts_mp->end())) {
bsl::unordered_map<bsl::string, mwcst::StatContext*>::iterator it =
d_subContextsLookup.find(appId);

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(it ==
d_subContextsLookup.end())) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

BALL_LOGTHROTTLE_WARN(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "[THROTTLED] No matching StatContext for appId: " << appId;
<< "[THROTTLED] No matching StatContext for domain: "
<< d_statContext_mp->name() << ", appId: " << appId;
return; // RETURN
}

mwcst::StatContext* appIdContext = it->get();
mwcst::StatContext* appIdContext = it->second;
BSLS_ASSERT_SAFE(appIdContext);

switch (type) {
Expand Down Expand Up @@ -678,38 +656,54 @@ void QueueStatsDomain::setQueueContentRaw(bsls::Types::Int64 messages,
void QueueStatsDomain::updateDomainAppIds(
const bsl::vector<bsl::string>& appIds)
{
if (!d_subContexts_mp) {
if (appIds.empty()) {
d_subContextsLookup.clear();
d_subContextsHolder.clear();
return; // RETURN
}

bdlma::LocalSequentialAllocator<2048> localAllocator;

// Add subcontexts for appIds that are not already present
for (bsl::vector<bsl::string>::const_iterator cit = appIds.begin();
cit != appIds.end();
++cit) {
if (bsl::find_if(d_subContexts_mp->begin(),
d_subContexts_mp->end(),
ContextNameMatcher(*cit)) ==
d_subContexts_mp->end()) {
StatSubContextMp subContext = d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(*cit, &localAllocator));
d_subContexts_mp->emplace_back(
bslmf::MovableRefUtil::move(subContext));
}
}

// Remove subcontexts if appIds are not present in updated AppIds
bsl::list<StatSubContextMp>::iterator it = d_subContexts_mp->begin();
while (it != d_subContexts_mp->end()) {
if (bsl::find(appIds.begin(), appIds.end(), it->get()->name()) ==
appIds.end()) {
it = d_subContexts_mp->erase(it);
bsl::unordered_set<bsl::string> remainingAppIds(appIds.begin(),
appIds.end(),
d_allocator_p);

// 1. Remove subcontexts for unneeded appIds
bsl::list<StatSubContextMp>::iterator it = d_subContextsHolder.begin();
while (it != d_subContextsHolder.end()) {
const bsl::string& ctxAppId = it->get()->name();
bsl::unordered_set<bsl::string>::const_iterator sIt =
remainingAppIds.find(ctxAppId);
if (sIt == remainingAppIds.end()) {
// Subcontext for this appId is no longer needed, remove it from
// the holder and lookup table
d_subContextsLookup.erase(ctxAppId);
it = d_subContextsHolder.erase(it);
}
else {
// This appId is needed, but the stat context is already built for
// it
remainingAppIds.erase(sIt);
++it;
}
}

if (remainingAppIds.empty()) {
return; // RETURN
}

// 2. Add the remaining appIds
bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);

for (bsl::unordered_set<bsl::string>::const_iterator sIt =
remainingAppIds.begin();
sIt != remainingAppIds.end();
sIt++) {
StatSubContextMp subContext = d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(*sIt, &localAllocator));

d_subContextsLookup.insert(bsl::make_pair(*sIt, subContext.get()));
d_subContextsHolder.emplace_back(
bslmf::MovableRefUtil::move(subContext));
}
}

// -----------------------------
Expand Down
43 changes: 33 additions & 10 deletions src/groups/mqb/mqbstat/mqbstat_queuestats.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
#include <bsl_list.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bsl_unordered_map.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_cpp11.h>
#include <bsls_types.h>

Expand All @@ -70,6 +73,9 @@ namespace mqbstat {
/// domain.
class QueueStatsDomain {
public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(QueueStatsDomain, bslma::UsesBslmaAllocator)

// TYPES

/// Enum representing the various type of events for which statistics
Expand Down Expand Up @@ -182,11 +188,29 @@ class QueueStatsDomain {
typedef bslma::ManagedPtr<mwcst::StatContext> StatSubContextMp;

// PRIVATE DATA
/// Allocator to use
bslma::Allocator* d_allocator_p;

/// StatContext
bslma::ManagedPtr<mwcst::StatContext> d_statContext_mp;
// StatContext
bslma::ManagedPtr<bsl::list<StatSubContextMp> > d_subContexts_mp;
// List of appId subcontexts. It is initialized if domain name is in the
// list of enabled domains in broker's `stats` configuration.

/// List of per-appId subcontexts stored as managed pointers.
/// Note: `mwcst::StatContext` interface allocates subcontexts as
/// managed pointers. We are not able to store managed pointers
/// in a collection that might reallocate and copy its elements,
/// since ManagedPtr implementation on Solaris is constraining.
/// This is why list is used to store managed pointers. But we
/// also want to perform fast lookups to subcontexts, and for this
/// we have `d_subContextsLookup` table that points to raw pointers
/// to subcontexts. These both fields must be kept in sync during
/// reconfiguration.
/// TODO: use one bsl::unordered_map to store and lookup if Solaris support
/// is stopped.
bsl::list<StatSubContextMp> d_subContextsHolder;

/// Lookup table for per-appId subcontexts. Managed pointers to these
/// subcontexts must be held in `d_subContextsHolder`.
bsl::unordered_map<bsl::string, mwcst::StatContext*> d_subContextsLookup;

private:
// NOT IMPLEMENTED
Expand All @@ -212,18 +236,17 @@ class QueueStatsDomain {

// CREATORS

/// Create a new object in an uninitialized state.
QueueStatsDomain();
/// Create a new object in an uninitialized state, using the specified
/// `allocator` for any memory allocations.
explicit QueueStatsDomain(bslma::Allocator* allocator);

// MANIPULATORS

/// Initialize this object for the queue with the specified `uri`, and
/// register it as a subcontext of the specified `domainStatContext`
/// (which correspond to the domain-level stat context this queue is
/// part of), using the specified `allocator`.
void initialize(const bmqt::Uri& uri,
mqbi::Domain* domain,
bslma::Allocator* allocator);
/// part of).
void initialize(const bmqt::Uri& uri, mqbi::Domain* domain);

/// Set the reader count to the specified `readerCount`. Return the
/// `QueueStatsDomain` object.
Expand Down
Loading
Loading