From 493b459d4c2cf442e4455323919495af4df94261 Mon Sep 17 00:00:00 2001 From: Aleksandr Ivanov Date: Thu, 12 Sep 2024 18:23:39 +0300 Subject: [PATCH] Add logging of oldest message in put aside queue and its properties Signed-off-by: Aleksandr Ivanov --- .github/workflows/build.yaml | 1 + .../mqbblp/mqbblp_queueconsumptionmonitor.cpp | 168 +----------------- .../mqbblp/mqbblp_queueconsumptionmonitor.h | 7 +- .../mqb/mqbblp/mqbblp_rootqueueengine.cpp | 158 ++++++---------- .../mqb/mqbblp/mqbblp_rootqueueengine.h | 3 +- 5 files changed, 73 insertions(+), 264 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index dc9dc65b38..8233734e5b 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -4,6 +4,7 @@ on: push: branches: - main + - enhance-alarm-log pull_request: branches: - main diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp index d5ce680575..55ba6df51b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp @@ -20,12 +20,9 @@ // MBQ #include #include -// #include #include #include #include -// #include -// #include // BMQ #include @@ -151,7 +148,7 @@ QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo( // CREATORS QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState, - const LoggingCb& loggingCb, + const LoggingCb& loggingCb, bslma::Allocator* allocator) : d_queueState_p(queueState) , d_maxIdleTime(0) @@ -161,7 +158,7 @@ QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState, { // PRECONDITIONS BSLS_ASSERT_SAFE(d_queueState_p); - BALL_LOG_WARN << "QueueConsumptionMonitor::QueueConsumptionMonitor"; + BSLS_ASSERT_SAFE(d_loggingCb); } // MANIPULATORS @@ -174,8 +171,6 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value) // PRECONDITIONS BSLS_ASSERT_SAFE(value >= 0); - BALL_LOG_WARN << "setMaxIdleTime " << value; - d_maxIdleTime = value; for (SubStreamInfoMapIter iter = d_subStreamInfos.begin(), @@ -202,8 +197,6 @@ void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key, d_subStreamInfos.end()); BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end()); - BALL_LOG_WARN << "registerSubStream"; - d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo(headCb))); } @@ -225,8 +218,6 @@ void QueueConsumptionMonitor::reset() // Should always be called from the queue thread, but will be invoked from // the cluster thread once upon queue creation. - BALL_LOG_WARN << "reset"; - d_maxIdleTime = 0; d_currentTimer = 0; d_subStreamInfos.clear(); @@ -240,8 +231,6 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - BALL_LOG_WARN << "onTimer"; - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_maxIdleTime == 0)) { // monitoring is disabled BSLS_PERFORMANCEHINT_UNLIKELY_HINT; @@ -266,10 +255,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) ++iter) { SubStreamInfo& info = iter->second; BSLS_ASSERT_SAFE(info.d_headCb); - bslma::ManagedPtr head = info.d_headCb(); - - BALL_LOG_WARN << "Inside FOR head: " << (head ? "true" : "false"); if (head) { if (head->atEnd()) { @@ -286,40 +272,24 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) if (info.d_state == State::e_IDLE) { // object was in idle state - BALL_LOG_WARN << "object was in idle state"; onTransitionToAlive(&(iter->second), iter->first); continue; // CONTINUE } - BALL_LOG_WARN << "info.d_messageSent || !head, d_messageSent: " << info.d_messageSent; + BALL_LOG_WARN << "info.d_messageSent || !head, d_messageSent: " + << info.d_messageSent; BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE); - - // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - mqbi::QueueEngine* qEngine_p = d_queueState_p->queue()->queueEngine(); - // QueueEngineUtil_AppState& app = d_queueState_p->subQueues()[1]; //*subQueue(subQueueId); - const QueueState::SubQueues& sq = d_queueState_p->subQueues(); - for (auto& as : sq) { - // size_t size = as->d_putAsideList.size(); - // const mqbu::StorageKey appKey = as->d_appKey; - // const bsl::string appId = as->d_appId; - BALL_LOG_WARN << "d_putAsideList.size(): " << as->d_putAsideList.size() << "d_redeliveryList.size(): " << as->d_redeliveryList.size() << " d_appKey: " << as->d_appKey << " d_appId: " << as->d_appId; - } - // bsl::shared_ptr as = sq[0]; continue; // CONTINUE } if (info.d_state == State::e_IDLE) { // state was already idle, nothing more to do - BALL_LOG_WARN << "state was already idle, nothing more to do"; continue; // CONTINUE } - BALL_LOG_WARN << "info.d_state: " << info.d_state; - BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE); if (d_currentTimer - info.d_lastKnownGoodTimer > d_maxIdleTime) { - BALL_LOG_WARN << "No delivered messages in the last 'maxIdleTime'"; // No delivered messages in the last 'maxIdleTime'. onTransitionToIdle(&(iter->second), iter->first, head); continue; // CONTINUE @@ -365,136 +335,12 @@ void QueueConsumptionMonitor::onTransitionToIdle( // PRECONDITIONS BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - - d_loggingCb(appKey, head); + BSLS_ASSERT_SAFE(d_loggingCb); subStreamInfo->d_state = State::e_IDLE; - // bdlma::LocalSequentialAllocator<2048> localAllocator(0); - // bsl::vector handles(&localAllocator); - // d_queueState_p->handleCatalog().loadHandles(&handles); - - // bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator); - // bsl::string appId; - - // if (appKey.isNull()) { - // appId = bmqp::ProtocolUtil::k_DEFAULT_APP_ID; - // } - // else if (d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) { - // uriBuilder.setId(appId); - // } - - // bmqt::Uri uri(&localAllocator); - // uriBuilder.uri(&uri); - - // mwcu::MemOutStream ss(&localAllocator); - - // int idx = 1; - // int numConsumers = 0; - - // const bool isFanoutValue = - // d_queueState_p->queue()->hasMultipleSubStreams(); - - // for (bsl::vector::const_iterator it = handles.begin(), - // last = handles.end(); - // it != last; - // ++it) { - // const mqbi::QueueHandle::SubStreams& subStreamInfos = - // (*it)->subStreamInfos(); - - // for (mqbi::QueueHandle::SubStreams::const_iterator infoCiter = - // subStreamInfos.begin(); - // infoCiter != subStreamInfos.end(); - // ++infoCiter) { - // const bsl::string& itemAppId = infoCiter->first; - - // bool isReader = !isFanoutValue && - // bmqt::QueueFlagsUtil::isReader( - // (*it)->handleParameters().flags()); - // // Non-fanout mode consumer in the default subStream ? - // isReader |= isFanoutValue && !itemAppId.empty(); - - - // // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - // mqbi::QueueEngine* qEngine_p = d_queueState_p->queue()->queueEngine(); - // // QueueEngineUtil_AppState& app = d_queueState_p->subQueues()[1]; //*subQueue(subQueueId); - // const QueueState::SubQueues& sq = d_queueState_p->subQueues(); - - // BALL_LOG_WARN << "SubQueues size" << sq.size(); - // for (auto& as : sq) { - // // size_t size = as->d_putAsideList.size(); - // // const mqbu::StorageKey appKey = as->d_appKey; - // // const bsl::string appId = as->d_appId; - // BALL_LOG_WARN << "d_putAsideList.size(): " << as->d_putAsideList.size() << "d_redeliveryList.size(): " << as->d_redeliveryList.size() << " d_appKey: " << as->d_appKey << " d_appId: " << as->d_appId; - // } - // // QueueEngineUtil_AppState& appState = bsl::find_if(sq.begin(), sq.end(), [&](auto as){ as->d_appId == appId }); - - - // if (!isReader) { - // continue; // CONTINUE - // } - - // if (itemAppId != appId) { - // continue; // CONTINUE - // } - - // numConsumers += infoCiter->second.d_counts.d_readCount; - - // const int level = 2, spacesPerLevel = 2; - - // ss << "\n " << idx++ << ". " << (*it)->client()->description() - // << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - // << "Handle Parameters .....: " << (*it)->handleParameters() - // << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - // << "UnconfirmedMonitors ....:"; - - // const bsl::vector monitors = - // (*it)->unconfirmedMonitors(appId); - // for (size_t i = 0; i < monitors.size(); ++i) { - // ss << "\n " << monitors[i]; - // } - // } - // } - - // mwcu::MemOutStream out; - // out << "Queue '" << uri << "' "; - // d_queueState_p->storage()->capacityMeter()->printShortSummary(out); - // out << ", max idle time " - // << mwcu::PrintUtil::prettyTimeInterval(d_maxIdleTime) - // << " appears to be stuck. It currently has " << numConsumers - // << " consumers." << ss.str() << "\n"; - - // // Print the 10 oldest messages in the queue - // static const int k_NUM_MSGS = 10; - // const int level = 0, spacesPerLevel = 2; - - // out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - // << k_NUM_MSGS << " oldest messages in the queue:\n"; - - // mqbcmd::Result result; - // mqbs::StoragePrintUtil::listMessages(&result.makeQueueContents(), - // appId, - // 0, - // k_NUM_MSGS, - // d_queueState_p->storage()); - // mqbcmd::HumanPrinter::print(out, result); - - // if (!head) { - // return; // RETURN - // } - - // // Print the current head of the queue - // mqbi::Storage* const storage = d_queueState_p->storage(); - // out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - // << "Current head of the queue:\n"; - - // mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), storage, *head); - - // mqbcmd::HumanPrinter::print(out, result); - // out << "\n"; - - // MWCTSK_ALARMLOG_ALARM("QUEUE_CONSUMER_MONITOR") - // << out.str() << MWCTSK_ALARMLOG_END; + // Call logging callback to log alarm info. + d_loggingCb(appKey, head); } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h index 9619e430f3..63323fad6c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h @@ -213,7 +213,9 @@ class QueueConsumptionMonitor { typedef bsl::function(void)> HeadCb; - typedef bsl::function& head)> + typedef bsl::function& head)> LoggingCb; private: @@ -269,6 +271,7 @@ class QueueConsumptionMonitor { SubStreamInfoMap d_subStreamInfos; LoggingCb d_loggingCb; + // Callback to log alarm info. // NOT IMPLEMENTED QueueConsumptionMonitor(const QueueConsumptionMonitor&) BSLS_CPP11_DELETED; @@ -311,7 +314,7 @@ class QueueConsumptionMonitor { /// `basicAllocator` to supply memory. If `basicAllocator` is 0, the /// currently installed default allocator is used. QueueConsumptionMonitor(QueueState* queueState, - const LoggingCb& loggingCb, + const LoggingCb& loggingCb, bslma::Allocator* allocator); // MANIPULATORS diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 3eea975c42..33b1ae2739 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -249,7 +250,7 @@ RootQueueEngine::RootQueueEngine(QueueState* queueState, bdlf::BindUtil::bind(&RootQueueEngine::logAlarmCb, this, bdlf::PlaceHolders::_1, // appKey - bdlf::PlaceHolders::_2), // host + bdlf::PlaceHolders::_2), // head allocator) , d_apps(allocator) , d_nullKeyCount(0) @@ -296,8 +297,6 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription) rc_AUTO_SUBSCRIPTIONS_ERROR = -3 // Wrong number of auto subscriptions }; - BALL_LOG_WARN << "RootQueueEngine::configure"; - // Populate map of appId to appKey for statically registered consumers size_t numApps = 0; @@ -388,9 +387,6 @@ int RootQueueEngine::configure(bsl::ostream& errorDescription) } if (!QueueEngineUtil::isBroadcastMode(d_queueState_p->queue())) { - BALL_LOG_WARN - << "setMaxIdleTime: " - << d_queueState_p->queue()->domain()->config().maxIdleTime(); d_consumptionMonitor.setMaxIdleTime( d_queueState_p->queue()->domain()->config().maxIdleTime() * bdlt::TimeUnitRatio::k_NANOSECONDS_PER_SECOND); @@ -1572,6 +1568,10 @@ void RootQueueEngine::logAlarmCb( { // executed by the *QUEUE DISPATCHER* thread + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( + d_queueState_p->queue())); + bdlma::LocalSequentialAllocator<2048> localAllocator(0); bsl::vector handles(&localAllocator); d_queueState_p->handleCatalog().loadHandles(&handles); @@ -1649,118 +1649,79 @@ void RootQueueEngine::logAlarmCb( d_queueState_p->queue()->domain()->config().maxIdleTime() * bdlt::TimeUnitRatio::k_NANOSECONDS_PER_SECOND) << " appears to be stuck. It currently has " << numConsumers - << " consumers." << ss.str() << "\n"; + << " consumers." << ss.str() << '\n'; + + mqbi::Storage* const storage = d_queueState_p->storage(); - // TODO: move to some helper class Apps::const_iterator cItApp = d_apps.findByKey1(appId); if (cItApp != d_apps.end()) { const AppStateSp& app = cItApp->value(); out << "\nPut aside list size: " << app->putAsideListSize() << '\n'; - out << "Redelivery list size: " << app->redeliveryListSize() << '\n'; - out << "Not delivered number of messages: " - << d_queueState_p->storage()->numMessages(app->d_appKey) << '\n'; + out << "Redelivery list size: " << app->redeliveryListSize() << "\n\n"; // Log consumer subscriptions mqbblp::Routers::QueueRoutingContext& routingContext = app->d_routing_sp->d_queue; mqbcmd::Routing routing; routingContext.loadInternals(&routing); - const bsl::vector& subscriptionGroups = + const bsl::vector& subscrGroups = routing.subscriptionGroups(); - // Limit to log only 100 expressions - static const size_t k_EXPR_NUM_LIMIT = 100; - const size_t exprCountLimit = bsl::min(subscriptionGroups.size(), - k_EXPR_NUM_LIMIT); - if (subscriptionGroups.size() > exprCountLimit) { - out << exprCountLimit << " of " << subscriptionGroups.size() - << " consumer subscriptions: " << '\n'; + + static const size_t k_EXPR_NUM_LIMIT = 50; + if (subscrGroups.size() > k_EXPR_NUM_LIMIT) { + out << k_EXPR_NUM_LIMIT << " of " << subscrGroups.size() + << " consumer subscription expressions: " << '\n'; } else { - out << "Consumer subscriptions: " << '\n'; + out << "Consumer subscription expressions: " << '\n'; } - // TODO: use LimitedPrinter + // Limit to log only k_EXPR_NUM_LIMIT expressions + size_t currNum = 0; for (bsl::vector::const_iterator cIt = - subscriptionGroups.begin(); - bsl::distance(subscriptionGroups.begin(), cIt) < - static_cast(exprCountLimit); - ++cIt) { + subscrGroups.begin(); + cIt != subscrGroups.end() && currNum < k_EXPR_NUM_LIMIT; + ++cIt, ++currNum) { out << cIt->expression() << '\n'; } - - out << "Put aside list GUIDS: " << '\n'; - for (RedeliveryList::iterator it = app->d_putAsideList.begin(); !app->d_putAsideList.isEnd(it); app->d_putAsideList.next(&it)) - { - out << *it << '\n'; - } - out << "First GUID in Put aside list: " << app->d_putAsideList.first() << '\n'; - out << "Has message: " << d_queueState_p->storage()->hasMessage(app->d_putAsideList.first()) << '\n'; - mqbi::StorageMessageAttributes attributes; - mqbi::StorageResult::Enum rc = d_queueState_p->storage()->get(&attributes, app->d_putAsideList.first()); - if (rc == mqbi::StorageResult::Enum::e_SUCCESS) - { - out << "Attributes: " << attributes << '\n'; - } else { - out << "Get Attributes failed: rc= " << rc << '\n'; - } - - const bmqp::MessagePropertiesInfo& logic = attributes.messagePropertiesInfo(); - bslma::ManagedPtr storageIterator; - rc = d_queueState_p->storage()->getIterator(&storageIterator, appKey, app->d_putAsideList.first()); - if (rc != mqbi::StorageResult::Enum::e_SUCCESS) - { - out << "getIterator failed: rc= " << rc << '\n'; - } - - { - const bsl::shared_ptr& appData = storageIterator->appData(); - + out << '\n'; + + // Log the first (oldest) message in Put aside list and its properties + bslma::ManagedPtr storageIt_mp; + mqbi::StorageResult::Enum rc = storage->getIterator( + &storageIt_mp, + appKey, + app->d_putAsideList.first()); + if (rc == mqbi::StorageResult::Enum::e_SUCCESS) { + // Log timestamp + out << "Oldest message in a 'Put aside' list:\n"; + mqbcmd::Result result; + mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), + storage, + *storageIt_mp); + mqbcmd::HumanPrinter::print(out, result); + out << '\n'; + // Log message properties + const bsl::shared_ptr& appData = + storageIt_mp->appData(); + const bmqp::MessagePropertiesInfo& logic = + storageIt_mp->attributes().messagePropertiesInfo(); bmqp::MessageProperties properties; - const bdlbb::Blob& blob = *appData.get(); - int rc = properties.streamIn(blob, logic.isExtended()); - if (rc) { - out << "streamIn failed: rc= " << rc << '\n'; + int ret = properties.streamIn(*appData, logic.isExtended()); + if (!ret) { + out << "Message Properties: " << properties << '\n'; + } + else { + BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = " + << rc; } - out << "Message Properties: " << properties << '\n'; - - - // bmqp::MessagePropertiesIterator iter(&properties); - // while (iter.hasNext()) { - // out << " Name [" << iter.name() << "], Type [" << iter.type() - // << "], Value [" << iter << "]\n"; - // } - - // const char* appData = 0; - // unsigned int appDataLen = 0; - // unsigned int propertiesAreaLen = 0; - // it->loadApplicationData(&appData, &appDataLen); - // int rc = mqbs::FileStoreProtocolPrinter::printMessageProperties( - // &propertiesAreaLen, - // propsOsstr, - // appData, - // bmqp::MessagePropertiesInfo(dh)); - } - - - - //StorageIterator - // /// Return a reference offering non-modifiable access to the application - // /// data associated with the item currently pointed at by this iterator. - // /// The behavior is undefined unless `atEnd` returns `false`. - // virtual const bsl::shared_ptr& appData() const = 0; - - // /// Return a reference offering non-modifiable access to the options - // /// associated with the item currently pointed at by this iterator. The - // /// behavior is undefined unless `atEnd` returns `false`. - // virtual const bsl::shared_ptr& options() const = 0; - - // /// Return a reference offering non-modifiable access to the attributes - // /// associated with the message currently pointed at by this iterator. - // /// The behavior is undefined unless `atEnd` returns `false`. - // virtual const StorageMessageAttributes& attributes() const = 0; - - - + else { + BALL_LOG_WARN << "Failed to get storage iterator for GUID: " + << app->d_putAsideList.first() << ", rc = " << rc; + } + } + else { + BALL_LOG_WARN << "No App found for appId: " << appId; } // Print the 10 oldest messages in the queue @@ -1775,7 +1736,7 @@ void RootQueueEngine::logAlarmCb( appId, 0, k_NUM_MSGS, - d_queueState_p->storage()); + storage); mqbcmd::HumanPrinter::print(out, result); if (!head) { @@ -1783,7 +1744,6 @@ void RootQueueEngine::logAlarmCb( } // Print the current head of the queue - mqbi::Storage* const storage = d_queueState_p->storage(); out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) << "Current head of the queue:\n"; diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index 48c1de4510..babe6a8c15 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -34,7 +34,6 @@ #include #include #include -#include #include #include @@ -208,7 +207,7 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { /// It logs queue data for the specified `appKey` and `head`. void logAlarmCb(const mqbu::StorageKey& appKey, - const bslma::ManagedPtr& head) const; + const bslma::ManagedPtr& head) const; public: // TRAITS