Skip to content

Commit

Permalink
Fix mqbblp_queueconsumptionmonitor.t
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Ivanov <[email protected]>
  • Loading branch information
alexander-e1off committed Sep 13, 2024
1 parent 493b459 commit b972874
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 90 deletions.
3 changes: 0 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <bmqt_uri.h>

// MWC
#include <mwctsk_alarmlog.h>
#include <mwcu_memoutstream.h>
#include <mwcu_printutil.h>

Expand Down Expand Up @@ -276,8 +275,6 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
continue; // CONTINUE
}

BALL_LOG_WARN << "info.d_messageSent || !head, d_messageSent: "
<< info.d_messageSent;
BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE);
continue; // CONTINUE
}
Expand Down
56 changes: 16 additions & 40 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <bmqt_queueflags.h>

// MWC
#include <mwctsk_alarmlog.h>
#include <mwctst_scopedlogobserver.h>
#include <mwctst_testhelper.h>
#include <mwcu_memoutstream.h>
Expand Down Expand Up @@ -93,6 +94,18 @@ ClientContext::~ClientContext()
// NOTHING
}

static void loggingCb(BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& appKey,
BSLS_ANNOTATION_UNUSED const
bslma::ManagedPtr<mqbi::StorageIterator>& head)
{
BALL_LOG_SET_CATEGORY("MQBBLP.QUEUECONSUMPTIONMONITORTEST");

mwcu::MemOutStream out(s_allocator_p);
out << "Test Alarm";
MWCTSK_ALARMLOG_ALARM("QUEUE_CONSUMER_MONITOR")
<< out.str() << MWCTSK_ALARMLOG_END;
}

struct Test : mwctst::Test {
typedef bsl::vector<
bsl::pair<mqbi::QueueHandle*, bmqp_ctrlmsg::QueueHandleParameters> >
Expand Down Expand Up @@ -152,7 +165,7 @@ Test::Test()
d_partitionId,
&d_domain,
s_allocator_p)
, d_monitor(&d_queueState, s_allocator_p)
, d_monitor(&d_queueState, &loggingCb, s_allocator_p)
, d_storage(d_queue.uri(),
mqbu::StorageKey::k_NULL_KEY,
mqbs::DataStore::k_INVALID_PARTITION_ID,
Expand Down Expand Up @@ -388,10 +401,6 @@ TEST_F(Test, logFormat)
logObserver.records().back(),
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"Queue '.*'",
s_allocator_p));
}

TEST_F(Test, putAliveIdleSendAlive)
Expand Down Expand Up @@ -442,10 +451,6 @@ TEST_F(Test, putAliveIdleSendAlive)
logObserver.records().back(),
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"0 consumers",
s_allocator_p));

d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2);
ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY),
Expand Down Expand Up @@ -503,19 +508,7 @@ TEST_F(Test, putAliveIdleWithConsumer)
ASSERT_EQ(logObserver.records().size(), ++expectedLogRecords);
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"ALARM \\[QUEUE_CONSUMER_MONITOR\\].*It currently has 2 consumers",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"test consumer 1",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"test consumer 2",
s_allocator_p));
ASSERT(!mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().back(),
"test producer",
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
}

Expand Down Expand Up @@ -691,10 +684,6 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams)
logObserver.records().rbegin()[i],
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
logObserver.records().rbegin()[i],
"0 consumers",
s_allocator_p));
}

d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2);
Expand Down Expand Up @@ -797,21 +786,8 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreamsTwoConsumers)
++iter) {
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
*iter,
"ALARM \\[QUEUE_CONSUMER_MONITOR\\] Queue "
"'bmq://bmq.test.local/test_queue\\?id=app\\d'",
s_allocator_p));
ASSERT(
mwctst::ScopedLogObserverUtil::recordMessageMatch(*iter,
"1 consumers",
s_allocator_p));
ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch(
*iter,
"test consumer \\d",
"ALARM \\[QUEUE_CONSUMER_MONITOR\\]",
s_allocator_p));
ASSERT(
!mwctst::ScopedLogObserverUtil::recordMessageMatch(*iter,
"test producer",
s_allocator_p));
}

d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2);
Expand Down
99 changes: 52 additions & 47 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ void RootQueueEngine::logAlarmCb(
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

bdlma::LocalSequentialAllocator<2048> localAllocator(0);
bdlma::LocalSequentialAllocator<4096> localAllocator(d_allocator_p);
bsl::vector<mqbi::QueueHandle*> handles(&localAllocator);
d_queueState_p->handleCatalog().loadHandles(&handles);

Expand Down Expand Up @@ -1641,7 +1641,7 @@ void RootQueueEngine::logAlarmCb(
}
}

mwcu::MemOutStream out;
mwcu::MemOutStream out(&localAllocator);
out << "Queue '" << uri << "' ";
d_queueState_p->storage()->capacityMeter()->printShortSummary(out);
out << ", max idle time "
Expand All @@ -1667,58 +1667,63 @@ void RootQueueEngine::logAlarmCb(
const bsl::vector<mqbcmd::SubscriptionGroup>& subscrGroups =
routing.subscriptionGroups();

static const size_t k_EXPR_NUM_LIMIT = 50;
if (subscrGroups.size() > k_EXPR_NUM_LIMIT) {
out << k_EXPR_NUM_LIMIT << " of " << subscrGroups.size()
<< " consumer subscription expressions: " << '\n';
}
else {
out << "Consumer subscription expressions: " << '\n';
}
// Limit to log only k_EXPR_NUM_LIMIT expressions
size_t currNum = 0;
for (bsl::vector<mqbcmd::SubscriptionGroup>::const_iterator cIt =
subscrGroups.begin();
cIt != subscrGroups.end() && currNum < k_EXPR_NUM_LIMIT;
++cIt, ++currNum) {
out << cIt->expression() << '\n';
if (!subscrGroups.empty()) {
static const size_t k_EXPR_NUM_LIMIT = 50;
if (subscrGroups.size() > k_EXPR_NUM_LIMIT) {
out << k_EXPR_NUM_LIMIT << " of " << subscrGroups.size()
<< " consumer subscription expressions: " << '\n';
}
else {
out << "Consumer subscription expressions: " << '\n';
}
// Limit to log only k_EXPR_NUM_LIMIT expressions
size_t currNum = 0;
for (bsl::vector<mqbcmd::SubscriptionGroup>::const_iterator cIt =
subscrGroups.begin();
cIt != subscrGroups.end() && currNum < k_EXPR_NUM_LIMIT;
++cIt, ++currNum) {
out << cIt->expression() << '\n';
}
out << '\n';
}
out << '\n';

// Log the first (oldest) message in Put aside list and its properties
bslma::ManagedPtr<mqbi::StorageIterator> storageIt_mp;
mqbi::StorageResult::Enum rc = storage->getIterator(
&storageIt_mp,
appKey,
app->d_putAsideList.first());
if (rc == mqbi::StorageResult::Enum::e_SUCCESS) {
// Log timestamp
out << "Oldest message in a 'Put aside' list:\n";
mqbcmd::Result result;
mqbs::StoragePrintUtil::listMessage(&result.makeMessage(),
storage,
*storageIt_mp);
mqbcmd::HumanPrinter::print(out, result);
out << '\n';
// Log message properties
const bsl::shared_ptr<bdlbb::Blob>& appData =
storageIt_mp->appData();
const bmqp::MessagePropertiesInfo& logic =
storageIt_mp->attributes().messagePropertiesInfo();
bmqp::MessageProperties properties;
int ret = properties.streamIn(*appData, logic.isExtended());
if (!ret) {
out << "Message Properties: " << properties << '\n';
if (!app->d_putAsideList.empty()) {
bslma::ManagedPtr<mqbi::StorageIterator> storageIt_mp;
mqbi::StorageResult::Enum rc = storage->getIterator(
&storageIt_mp,
appKey,
app->d_putAsideList.first());
if (rc == mqbi::StorageResult::Enum::e_SUCCESS) {
// Log timestamp
out << "Oldest message in a 'Put aside' list:\n";
mqbcmd::Result result;
mqbs::StoragePrintUtil::listMessage(&result.makeMessage(),
storage,
*storageIt_mp);
mqbcmd::HumanPrinter::print(out, result);
out << '\n';
// Log message properties
const bsl::shared_ptr<bdlbb::Blob>& appData =
storageIt_mp->appData();
const bmqp::MessagePropertiesInfo& logic =
storageIt_mp->attributes().messagePropertiesInfo();
bmqp::MessageProperties properties;
int ret = properties.streamIn(*appData, logic.isExtended());
if (!ret) {
out << "Message Properties: " << properties << '\n';
}
else {
BALL_LOG_WARN
<< "Failed to streamIn MessageProperties, rc = " << rc;
}
}
else {
BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = "
<< rc;
BALL_LOG_WARN << "Failed to get storage iterator for GUID: "
<< app->d_putAsideList.first()
<< ", rc = " << rc;
}
}
else {
BALL_LOG_WARN << "Failed to get storage iterator for GUID: "
<< app->d_putAsideList.first() << ", rc = " << rc;
}
}
else {
BALL_LOG_WARN << "No App found for appId: " << appId;
Expand Down

0 comments on commit b972874

Please sign in to comment.