Skip to content

Commit

Permalink
Add logging of oldest message in put aside queue and its properties
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Ivanov <[email protected]>
  • Loading branch information
alexander-e1off committed Sep 12, 2024
1 parent 69da6c1 commit 493b459
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 264 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- main
- enhance-alarm-log
pull_request:
branches:
- main
Expand Down
168 changes: 7 additions & 161 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@
// MBQ
#include <mqbblp_queuehandlecatalog.h>
#include <mqbblp_queuestate.h>
// #include <mqbcmd_humanprinter.h>
#include <mqbcmd_messages.h>
#include <mqbi_queueengine.h>
#include <mqbi_storage.h>
// #include <mqbs_storageprintutil.h>
// #include <mqbu_capacitymeter.h>

// BMQ
#include <bmqp_ctrlmsg_messages.h>
Expand Down Expand Up @@ -151,7 +148,7 @@ QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(

// CREATORS
QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState,
const LoggingCb& loggingCb,
const LoggingCb& loggingCb,
bslma::Allocator* allocator)
: d_queueState_p(queueState)
, d_maxIdleTime(0)
Expand All @@ -161,7 +158,7 @@ QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState,
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p);
BALL_LOG_WARN << "QueueConsumptionMonitor::QueueConsumptionMonitor";
BSLS_ASSERT_SAFE(d_loggingCb);
}

// MANIPULATORS
Expand All @@ -174,8 +171,6 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value)
// PRECONDITIONS
BSLS_ASSERT_SAFE(value >= 0);

BALL_LOG_WARN << "setMaxIdleTime " << value;

d_maxIdleTime = value;

for (SubStreamInfoMapIter iter = d_subStreamInfos.begin(),
Expand All @@ -202,8 +197,6 @@ void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key,
d_subStreamInfos.end());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end());

BALL_LOG_WARN << "registerSubStream";

d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo(headCb)));
}

Expand All @@ -225,8 +218,6 @@ void QueueConsumptionMonitor::reset()
// Should always be called from the queue thread, but will be invoked from
// the cluster thread once upon queue creation.

BALL_LOG_WARN << "reset";

d_maxIdleTime = 0;
d_currentTimer = 0;
d_subStreamInfos.clear();
Expand All @@ -240,8 +231,6 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

BALL_LOG_WARN << "onTimer";

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_maxIdleTime == 0)) {
// monitoring is disabled
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
Expand All @@ -266,10 +255,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
++iter) {
SubStreamInfo& info = iter->second;
BSLS_ASSERT_SAFE(info.d_headCb);

bslma::ManagedPtr<mqbi::StorageIterator> head = info.d_headCb();

BALL_LOG_WARN << "Inside FOR head: " << (head ? "true" : "false");

if (head) {
if (head->atEnd()) {
Expand All @@ -286,40 +272,24 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)

if (info.d_state == State::e_IDLE) {
// object was in idle state
BALL_LOG_WARN << "object was in idle state";
onTransitionToAlive(&(iter->second), iter->first);
continue; // CONTINUE
}

BALL_LOG_WARN << "info.d_messageSent || !head, d_messageSent: " << info.d_messageSent;
BALL_LOG_WARN << "info.d_messageSent || !head, d_messageSent: "
<< info.d_messageSent;
BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE);

// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
mqbi::QueueEngine* qEngine_p = d_queueState_p->queue()->queueEngine();
// QueueEngineUtil_AppState& app = d_queueState_p->subQueues()[1]; //*subQueue(subQueueId);
const QueueState::SubQueues& sq = d_queueState_p->subQueues();
for (auto& as : sq) {
// size_t size = as->d_putAsideList.size();
// const mqbu::StorageKey appKey = as->d_appKey;
// const bsl::string appId = as->d_appId;
BALL_LOG_WARN << "d_putAsideList.size(): " << as->d_putAsideList.size() << "d_redeliveryList.size(): " << as->d_redeliveryList.size() << " d_appKey: " << as->d_appKey << " d_appId: " << as->d_appId;
}
// bsl::shared_ptr<QueueEngineUtil_AppState> as = sq[0];
continue; // CONTINUE
}

if (info.d_state == State::e_IDLE) {
// state was already idle, nothing more to do
BALL_LOG_WARN << "state was already idle, nothing more to do";
continue; // CONTINUE
}

BALL_LOG_WARN << "info.d_state: " << info.d_state;

BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE);

if (d_currentTimer - info.d_lastKnownGoodTimer > d_maxIdleTime) {
BALL_LOG_WARN << "No delivered messages in the last 'maxIdleTime'";
// No delivered messages in the last 'maxIdleTime'.
onTransitionToIdle(&(iter->second), iter->first, head);
continue; // CONTINUE
Expand Down Expand Up @@ -365,136 +335,12 @@ void QueueConsumptionMonitor::onTransitionToIdle(
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

d_loggingCb(appKey, head);
BSLS_ASSERT_SAFE(d_loggingCb);

subStreamInfo->d_state = State::e_IDLE;

// bdlma::LocalSequentialAllocator<2048> localAllocator(0);
// bsl::vector<mqbi::QueueHandle*> handles(&localAllocator);
// d_queueState_p->handleCatalog().loadHandles(&handles);

// bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator);
// bsl::string appId;

// if (appKey.isNull()) {
// appId = bmqp::ProtocolUtil::k_DEFAULT_APP_ID;
// }
// else if (d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) {
// uriBuilder.setId(appId);
// }

// bmqt::Uri uri(&localAllocator);
// uriBuilder.uri(&uri);

// mwcu::MemOutStream ss(&localAllocator);

// int idx = 1;
// int numConsumers = 0;

// const bool isFanoutValue =
// d_queueState_p->queue()->hasMultipleSubStreams();

// for (bsl::vector<mqbi::QueueHandle*>::const_iterator it = handles.begin(),
// last = handles.end();
// it != last;
// ++it) {
// const mqbi::QueueHandle::SubStreams& subStreamInfos =
// (*it)->subStreamInfos();

// for (mqbi::QueueHandle::SubStreams::const_iterator infoCiter =
// subStreamInfos.begin();
// infoCiter != subStreamInfos.end();
// ++infoCiter) {
// const bsl::string& itemAppId = infoCiter->first;

// bool isReader = !isFanoutValue &&
// bmqt::QueueFlagsUtil::isReader(
// (*it)->handleParameters().flags());
// // Non-fanout mode consumer in the default subStream ?
// isReader |= isFanoutValue && !itemAppId.empty();


// // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// mqbi::QueueEngine* qEngine_p = d_queueState_p->queue()->queueEngine();
// // QueueEngineUtil_AppState& app = d_queueState_p->subQueues()[1]; //*subQueue(subQueueId);
// const QueueState::SubQueues& sq = d_queueState_p->subQueues();

// BALL_LOG_WARN << "SubQueues size" << sq.size();
// for (auto& as : sq) {
// // size_t size = as->d_putAsideList.size();
// // const mqbu::StorageKey appKey = as->d_appKey;
// // const bsl::string appId = as->d_appId;
// BALL_LOG_WARN << "d_putAsideList.size(): " << as->d_putAsideList.size() << "d_redeliveryList.size(): " << as->d_redeliveryList.size() << " d_appKey: " << as->d_appKey << " d_appId: " << as->d_appId;
// }
// // QueueEngineUtil_AppState& appState = bsl::find_if(sq.begin(), sq.end(), [&](auto as){ as->d_appId == appId });


// if (!isReader) {
// continue; // CONTINUE
// }

// if (itemAppId != appId) {
// continue; // CONTINUE
// }

// numConsumers += infoCiter->second.d_counts.d_readCount;

// const int level = 2, spacesPerLevel = 2;

// ss << "\n " << idx++ << ". " << (*it)->client()->description()
// << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
// << "Handle Parameters .....: " << (*it)->handleParameters()
// << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
// << "UnconfirmedMonitors ....:";

// const bsl::vector<const mqbu::ResourceUsageMonitor*> monitors =
// (*it)->unconfirmedMonitors(appId);
// for (size_t i = 0; i < monitors.size(); ++i) {
// ss << "\n " << monitors[i];
// }
// }
// }

// mwcu::MemOutStream out;
// out << "Queue '" << uri << "' ";
// d_queueState_p->storage()->capacityMeter()->printShortSummary(out);
// out << ", max idle time "
// << mwcu::PrintUtil::prettyTimeInterval(d_maxIdleTime)
// << " appears to be stuck. It currently has " << numConsumers
// << " consumers." << ss.str() << "\n";

// // Print the 10 oldest messages in the queue
// static const int k_NUM_MSGS = 10;
// const int level = 0, spacesPerLevel = 2;

// out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
// << k_NUM_MSGS << " oldest messages in the queue:\n";

// mqbcmd::Result result;
// mqbs::StoragePrintUtil::listMessages(&result.makeQueueContents(),
// appId,
// 0,
// k_NUM_MSGS,
// d_queueState_p->storage());
// mqbcmd::HumanPrinter::print(out, result);

// if (!head) {
// return; // RETURN
// }

// // Print the current head of the queue
// mqbi::Storage* const storage = d_queueState_p->storage();
// out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
// << "Current head of the queue:\n";

// mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), storage, *head);

// mqbcmd::HumanPrinter::print(out, result);
// out << "\n";

// MWCTSK_ALARMLOG_ALARM("QUEUE_CONSUMER_MONITOR")
// << out.str() << MWCTSK_ALARMLOG_END;
// Call logging callback to log alarm info.
d_loggingCb(appKey, head);
}

} // close package namespace
Expand Down
7 changes: 5 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ class QueueConsumptionMonitor {
typedef bsl::function<bslma::ManagedPtr<mqbi::StorageIterator>(void)>
HeadCb;

typedef bsl::function<void(const mqbu::StorageKey& appKey, const bslma::ManagedPtr<mqbi::StorageIterator>& head)>
typedef bsl::function<void(
const mqbu::StorageKey& appKey,
const bslma::ManagedPtr<mqbi::StorageIterator>& head)>
LoggingCb;

private:
Expand Down Expand Up @@ -269,6 +271,7 @@ class QueueConsumptionMonitor {
SubStreamInfoMap d_subStreamInfos;

LoggingCb d_loggingCb;
// Callback to log alarm info.

// NOT IMPLEMENTED
QueueConsumptionMonitor(const QueueConsumptionMonitor&) BSLS_CPP11_DELETED;
Expand Down Expand Up @@ -311,7 +314,7 @@ class QueueConsumptionMonitor {
/// `basicAllocator` to supply memory. If `basicAllocator` is 0, the
/// currently installed default allocator is used.
QueueConsumptionMonitor(QueueState* queueState,
const LoggingCb& loggingCb,
const LoggingCb& loggingCb,
bslma::Allocator* allocator);

// MANIPULATORS
Expand Down
Loading

0 comments on commit 493b459

Please sign in to comment.