diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index 8f11d5514..208ad4205 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -173,21 +173,21 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure) d_state_p->uri(), d_state_p->partitionId()); - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, mqbstat::QueueStatsDomain::Role::e_PRIMARY); - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_CFG_MSGS, domainCfg.storage().queueLimits().messages()); - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_CFG_BYTES, domainCfg.storage().queueLimits().bytes()); if (isReconfigure) { if (domainCfg.mode().isFanoutValue()) { - d_state_p->stats().updateDomainAppIds( + d_state_p->stats()->updateDomainAppIds( domainCfg.mode().fanout().appIDs()); } } @@ -482,7 +482,7 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, // Calculate time delta between PUT and ACK const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() - timePoint; - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_ACK_TIME, timeDelta); if (res != mqbi::StorageResult::e_SUCCESS || doAck) { @@ -509,8 +509,9 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, // flushed (which occurs in 'flush' routine). In no case should // 'afterNewMessage' be called here. - d_state_p->stats().onEvent(mqbstat::QueueStatsDomain::EventType::e_PUT, - appData->length()); + d_state_p->stats()->onEvent( + mqbstat::QueueStatsDomain::EventType::e_PUT, + appData->length()); } else { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; @@ -524,7 +525,7 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, } } else { - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_NACK, 1); } @@ -547,7 +548,7 @@ void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID, const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() - arrivalTimepoint; - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_ACK_TIME, timeDelta); @@ -571,8 +572,8 @@ void LocalQueue::onRemoval(const bmqt::MessageGUID& msgGUID, // TODO: do we need to update NACK stats considering that downstream can // NACK the same GUID as well? - d_state_p->stats().onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, + 1); if (d_state_p->handleCatalog().hasHandle(qH)) { // Send negative acknowledgement diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h index 12a053a80..b3c073445 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.h +++ b/src/groups/mqb/mqbblp/mqbblp_queue.h @@ -256,8 +256,9 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// Return the queue engine used by this queue. mqbi::QueueEngine* queueEngine() BSLS_KEYWORD_OVERRIDE; - /// Return the stats associated with this queue. - mqbstat::QueueStatsDomain* stats() BSLS_KEYWORD_OVERRIDE; + /// Set the stats associated with this queue. + void setStats(const bsl::shared_ptr& stats) + BSLS_KEYWORD_OVERRIDE; /// Return number of unconfirmed messages across all handles with the /// `specified `subId'. @@ -373,6 +374,10 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// Return the storage used by this queue. mqbi::Storage* storage() const BSLS_KEYWORD_OVERRIDE; + /// Return the stats associated with this queue. + const bsl::shared_ptr& + stats() const BSLS_KEYWORD_OVERRIDE; + /// Return the partitionId assigned to this queue. int partitionId() const BSLS_KEYWORD_OVERRIDE; @@ -495,9 +500,15 @@ inline mqbi::QueueEngine* Queue::queueEngine() } } -inline mqbstat::QueueStatsDomain* Queue::stats() +inline const bsl::shared_ptr& Queue::stats() const +{ + return d_state.stats(); +} + +inline void +Queue::setStats(const bsl::shared_ptr& stats) { - return &(d_state.stats()); + d_state.setStats(stats); } inline mqbi::Domain* Queue::domain() const diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp index df4e3e1c0..6070ba067 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp @@ -66,7 +66,7 @@ QueueState::QueueState(mqbi::Queue* queue, , d_resources(resources) , d_miscWorkThreadPool_p(0) , d_storage_mp(0) -, d_stats(allocator) +, d_stats_sp(0) , d_messageThrottleConfig() , d_handleCatalog(queue, allocator) , d_context(queue->schemaLearner(), allocator) @@ -79,7 +79,12 @@ QueueState::QueueState(mqbi::Queue* queue, d_handleParameters.qId() = d_id; // Initialize stats - d_stats.initialize(d_uri, d_domain_p); + // There are neither FileBackedStorage nor domain config on proxies + if (d_domain_p->cluster()->isRemote() || + !d_domain_p->config().storage().config().isFileBackedValue()) { + d_stats_sp.createInplace(allocator, allocator); + d_stats_sp->initialize(d_uri, d_domain_p); + } // NOTE: The 'description' will be set by the owner of this object. @@ -134,7 +139,7 @@ QueueState::subtract(const bmqp_ctrlmsg::QueueHandleParameters& params) void QueueState::updateStats() { stats() - .setReaderCount(handleParameters().readCount()) + ->setReaderCount(handleParameters().readCount()) .setWriterCount(handleParameters().writeCount()); } diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.h b/src/groups/mqb/mqbblp/mqbblp_queuestate.h index 8a7a9a2b8..74cb10ad2 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.h +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.h @@ -160,7 +160,7 @@ class QueueState { // Dispatcher Client Data of the queue // associated to this state. - mqbstat::QueueStatsDomain d_stats; + bsl::shared_ptr d_stats_sp; // Statistics of the queue associated // to this state. @@ -238,9 +238,8 @@ class QueueState { bool removeUpstreamParameters( unsigned int subQueueId = bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID); - /// Return a reference offering modifiable access to the corresponding - /// attribute. - mqbstat::QueueStatsDomain& stats(); + /// Set the corresponding attribute to the specified `stats`. + void setStats(const bsl::shared_ptr& stats); /// Add read, write, and admin counters from the specified `params` to /// cumulative values per queue and per appId. @@ -303,6 +302,10 @@ class QueueState { const mqbcfg::MessageThrottleConfig& messageThrottleConfig() const; const bmqt::Uri& uri() const; + /// Return a reference offering non-modifiable access to the shared pointer + /// to the QueueStatsDomain. + const bsl::shared_ptr& stats() const; + /// Print to the specified `out` object the internal details about this /// queue state. void loadInternals(mqbcmd::QueueState* out) const; @@ -459,9 +462,10 @@ QueueState::getUpstreamParameters(bmqp_ctrlmsg::StreamParameters* value, return true; } -inline mqbstat::QueueStatsDomain& QueueState::stats() +inline void +QueueState::setStats(const bsl::shared_ptr& stats) { - return d_stats; + d_stats_sp = stats; } inline void @@ -600,6 +604,13 @@ inline const bmqt::Uri& QueueState::uri() const return d_uri; } +inline const bsl::shared_ptr& +QueueState::stats() const +{ + BSLS_ASSERT_SAFE(d_stats_sp); + return d_stats_sp; +} + inline const QueueState::SubQueues& QueueState::subQueues() const { return d_subStreams; diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index fd300f016..5171c0b95 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1036,7 +1036,7 @@ mqbi::QueueHandle* RelayQueueEngine::getHandle( queueHandle = d_queueState_p->handleCatalog().createHandle( clientContext, handleParameters, - &d_queueState_p->stats()); + d_queueState_p->stats().get()); handleCreated = true; BALL_LOG_INFO << "For queue [" << d_queueState_p->uri() diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index 493b1831f..9c8057c5e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -158,7 +158,7 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription, return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN } - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, mqbstat::QueueStatsDomain::Role::e_PROXY); @@ -276,7 +276,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription, d_state_p->storageManager()->setQueueRaw(queue, d_state_p->uri(), d_state_p->partitionId()); - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, mqbstat::QueueStatsDomain::Role::e_REPLICA); @@ -529,7 +529,7 @@ int RemoteQueue::configure(bsl::ostream& errorDescription, bool isReconfigure) if (isReconfigure) { const mqbconfm::Domain& domainCfg = d_state_p->domain()->config(); if (domainCfg.mode().isFanoutValue()) { - d_state_p->stats().updateDomainAppIds( + d_state_p->stats()->updateDomainAppIds( domainCfg.mode().fanout().appIDs()); } } @@ -922,7 +922,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, bmqt::AckResult::e_REFUSED)); ackMessage.setMessageGUID(putHeader.messageGUID()); - d_state_p->stats().onEvent( + d_state_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_NACK, 1); @@ -996,8 +996,8 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, // the time the message is actually sent upstream, i.e. in // cluster/clusterProxy) for the most exact accuracy, but doing it here is // good enough. - d_state_p->stats().onEvent(mqbstat::QueueStatsDomain::EventType::e_PUT, - appData->length()); + d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_PUT, + appData->length()); } void RemoteQueue::confirmMessage(const bmqt::MessageGUID& msgGUID, @@ -1496,8 +1496,8 @@ RemoteQueue::Puts::iterator& RemoteQueue::nack(Puts::iterator& it, { ackMessage.setMessageGUID(it->first); - d_state_p->stats().onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, - 1); + d_state_p->stats()->onEvent(mqbstat::QueueStatsDomain::EventType::e_NACK, + 1); // CorrelationId & QueueId are left unset as those fields // will be filled downstream. diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index bb4ed3a37..68435607b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -751,7 +751,7 @@ mqbi::QueueHandle* RootQueueEngine::getHandle( queueHandle = d_queueState_p->handleCatalog().createHandle( clientContext, handleParameters, - &d_queueState_p->stats()); + d_queueState_p->stats().get()); BSLS_ASSERT_SAFE(queueHandle && "handle creation failed"); handleCreated = true; @@ -1518,7 +1518,7 @@ int RootQueueEngine::onRejectMessage(mqbi::QueueHandle* handle, d_queueState_p, d_allocator_p)); } - d_queueState_p->stats().onEvent( + d_queueState_p->stats()->onEvent( mqbstat::QueueStatsDomain::EventType::e_REJECT, 1); diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h index b1a0e321e..cd3b7dbf6 100644 --- a/src/groups/mqb/mqbi/mqbi_queue.h +++ b/src/groups/mqb/mqbi/mqbi_queue.h @@ -795,8 +795,9 @@ class Queue : public DispatcherClient { /// Return the queue engine used by this queue. virtual QueueEngine* queueEngine() = 0; - /// Return the stats associated with this queue. - virtual mqbstat::QueueStatsDomain* stats() = 0; + /// Set the stats associated with this queue. + virtual void + setStats(const bsl::shared_ptr& stats) = 0; /// Return number of unconfirmed messages across all handles with the /// `specified `subId'. @@ -911,6 +912,10 @@ class Queue : public DispatcherClient { /// Return the storage used by this queue. virtual Storage* storage() const = 0; + /// Return the stats associated with this queue. + virtual const bsl::shared_ptr& + stats() const = 0; + /// Return the partitionId assigned to this queue. virtual int partitionId() const = 0; diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.cpp b/src/groups/mqb/mqbmock/mqbmock_queue.cpp index fda477727..98eaf2ca2 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queue.cpp @@ -52,7 +52,7 @@ Queue::Queue(mqbi::Domain* domain, bslma::Allocator* allocator) , d_hasMultipleSubStreams(false) , d_handleParameters(allocator) , d_streamParameters(allocator) -, d_stats(allocator) +, d_stats_sp(0) , d_domain_p(domain) , d_dispatcher_p(0) , d_queueEngine_p(0) @@ -67,7 +67,8 @@ Queue::Queue(mqbi::Domain* domain, bslma::Allocator* allocator) // Initialize stats if (domain) { - d_stats.initialize(d_uri, domain); + d_stats_sp.createInplace(allocator, allocator); + d_stats_sp->initialize(d_uri, domain); } } @@ -238,9 +239,10 @@ mqbi::QueueEngine* Queue::queueEngine() return d_queueEngine_p; } -mqbstat::QueueStatsDomain* Queue::stats() +inline void +Queue::setStats(const bsl::shared_ptr& stats) { - return &d_stats; + d_stats_sp = stats; } bsls::Types::Int64 @@ -466,6 +468,12 @@ mqbi::Storage* Queue::storage() const return d_storage_p; } +const bsl::shared_ptr& Queue::stats() const +{ + BSLS_ASSERT_SAFE(d_stats_sp); + return d_stats_sp; +} + int Queue::partitionId() const { return -1; diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h index 3f5ff769d..1c80082eb 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.h +++ b/src/groups/mqb/mqbmock/mqbmock_queue.h @@ -136,7 +136,7 @@ class Queue : public mqbi::Queue { // Configuration values for message // throttling intervals and thresholds. - mqbstat::QueueStatsDomain d_stats; + bsl::shared_ptr d_stats_sp; // Statistics of the queue mqbi::Domain* d_domain_p; @@ -241,8 +241,9 @@ class Queue : public mqbi::Queue { /// Return the queue engine used by this queue. mqbi::QueueEngine* queueEngine() BSLS_KEYWORD_OVERRIDE; - /// Return the stats associated with this queue. - mqbstat::QueueStatsDomain* stats() BSLS_KEYWORD_OVERRIDE; + /// Set the stats associated with this queue. + void setStats(const bsl::shared_ptr& stats) + BSLS_KEYWORD_OVERRIDE; /// Return number of unconfirmed messages across all handles with the /// `specified `subId'. @@ -384,6 +385,10 @@ class Queue : public mqbi::Queue { /// Return the storage used by this queue. mqbi::Storage* storage() const BSLS_KEYWORD_OVERRIDE; + /// Return the stats associated with this queue. + const bsl::shared_ptr& + stats() const BSLS_KEYWORD_OVERRIDE; + /// Return the partitionId assigned to this queue. int partitionId() const BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index 9b5b09019..1dff4b762 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -89,14 +89,11 @@ void FileBackedStorage::purgeCommon(const mqbu::StorageKey& appKey) // Update stats d_capacityMeter.clear(); - if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_PURGE, - 0); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); - } + d_queueStats_sp->onEvent(mqbstat::QueueStatsDomain::EventType::e_PURGE, + 0); + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); } } @@ -105,8 +102,7 @@ FileBackedStorage::FileBackedStorage( DataStore* dataStore, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const mqbconfm::Domain& config, - mqbu::CapacityMeter* parentCapacityMeter, + mqbi::Domain* domain, bslma::Allocator* allocator, bmqma::CountingAllocatorStore* allocatorStore) : d_allocator_p(allocator) @@ -117,24 +113,25 @@ FileBackedStorage::FileBackedStorage( , d_virtualStorageCatalog( this, allocatorStore ? allocatorStore->get("VirtualHandles") : d_allocator_p) -, d_ttlSeconds(config.messageTtl()) +, d_ttlSeconds(domain->config().messageTtl()) , d_capacityMeter( "queue [" + queueUri.asString() + "]", - parentCapacityMeter, + domain->capacityMeter(), allocator, bdlf::BindUtil::bind(&FileBackedStorage::logAppsSubscriptionInfoCb, this, bdlf::PlaceHolders::_1) // stream ) , d_handles(bsls::TimeInterval() - .addMilliseconds(config.deduplicationTimeMs()) + .addMilliseconds(domain->config().deduplicationTimeMs()) .totalNanoseconds(), allocatorStore ? allocatorStore->get("Handles") : d_allocator_p) , d_queueOpRecordHandles(allocator) , d_isEmpty(1) -, d_hasReceipts(!config.consistency().isStrongValue()) +, d_hasReceipts(!domain->config().consistency().isStrongValue()) , d_currentlyAutoConfirming() , d_autoConfirms(d_allocator_p) +, d_queueStats_sp() { BSLS_ASSERT(d_store_p); @@ -148,7 +145,11 @@ FileBackedStorage::FileBackedStorage( // and domain instance will return a zero capacity meter when queries to be // passed to the 'FileBackedStorage' instance. - d_virtualStorageCatalog.setDefaultRda(config.maxDeliveryAttempts()); + d_virtualStorageCatalog.setDefaultRda( + domain->config().maxDeliveryAttempts()); + + d_queueStats_sp.createInplace(d_allocator_p, d_allocator_p); + d_queueStats_sp->initialize(queueUri, domain); } FileBackedStorage::~FileBackedStorage() @@ -262,13 +263,13 @@ void FileBackedStorage::setQueue(mqbi::Queue* queue) // Update queue stats if a queue has been associated with the storage. if (queue) { + queue->setStats(d_queueStats_sp); + const bsls::Types::Int64 numMessage = numMessages( mqbu::StorageKey::k_NULL_KEY); const bsls::Types::Int64 numByte = numBytes( mqbu::StorageKey::k_NULL_KEY); - queue->stats()->setQueueContentRaw(numMessage, numByte); - BALL_LOG_INFO << "Associated queue [" << queue->uri() << "] with key [" << queueKey() << "] and Partition [" << queue->partitionId() << "] with its storage having " @@ -477,10 +478,10 @@ FileBackedStorage::releaseRef(const bmqt::MessageGUID& guid) // and a purge. if (queue()) { queue()->queueEngine()->beforeMessageRemoved(guid); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, + msgLen); // There is not really a need to remove the guid from all virtual // storages, because we can be here only if guid doesn't exist in @@ -495,11 +496,9 @@ FileBackedStorage::releaseRef(const bmqt::MessageGUID& guid) d_capacityMeter.remove(1, msgLen); d_handles.erase(it); - if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); - } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); return mqbi::StorageResult::e_ZERO_REFERENCES; } @@ -605,11 +604,9 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) d_isEmpty.storeRelaxed(1); } - if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); - } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); return mqbi::StorageResult::e_SUCCESS; } @@ -696,10 +693,10 @@ int FileBackedStorage::gcExpiredMessages( // The same 'e_DEL_MESSAGE' is about 3 cases: TTL, no SC quorum, purge. if (queue()) { queue()->queueEngine()->beforeMessageRemoved(cit->first); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, + msgLen); // Remove message from all virtual storages. d_virtualStorageCatalog.gc(cit->first); @@ -716,18 +713,18 @@ int FileBackedStorage::gcExpiredMessages( ++numMsgsDeleted; } - if (queue() && numMsgsDeleted > 0) { + if (numMsgsDeleted > 0) { if (numMsgsDeleted > numMsgsUnreceipted) { - queue()->stats()->onEvent( + d_queueStats_sp->onEvent( mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE, numMsgsDeleted - numMsgsUnreceipted); } if (numMsgsUnreceipted) { - queue()->stats()->onEvent( + d_queueStats_sp->onEvent( mqbstat::QueueStatsDomain::EventType::e_NO_SC_MESSAGE, numMsgsUnreceipted); } - queue()->stats()->onEvent( + d_queueStats_sp->onEvent( mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, d_handles.historySize()); } @@ -744,11 +741,9 @@ bool FileBackedStorage::gcHistory() bool hasMoreToGc = d_handles.gc(bmqsys::Time::highResolutionTimer(), k_GC_MESSAGES_BATCH_SIZE); - if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); - } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); return hasMoreToGc; } @@ -803,11 +798,9 @@ void FileBackedStorage::processMessageRecord( // Update the messages & bytes monitors, and the stats. d_capacityMeter.forceCommit(1, msgLen); // Return value ignored. - if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, - msgLen); - } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE, + msgLen); d_isEmpty.storeRelaxed(0); } @@ -920,10 +913,10 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid) if (queue()) { queue()->queueEngine()->beforeMessageRemoved(guid); - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, - msgLen); } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE, + msgLen); // Delete 'guid' from all virtual storages, if any. Note that 'guid' // should have already been removed from each virtual storage when confirm @@ -949,11 +942,9 @@ void FileBackedStorage::processDeletionRecord(const bmqt::MessageGUID& guid) d_isEmpty.storeRelaxed(1); } - if (queue()) { - queue()->stats()->onEvent( - mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, - d_handles.historySize()); - } + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY, + d_handles.historySize()); } void FileBackedStorage::addQueueOpRecordHandle( @@ -1021,6 +1012,13 @@ FileBackedStorage::autoConfirm(const mqbu::StorageKey& appKey, return mqbi::StorageResult::e_SUCCESS; } +void FileBackedStorage::setPrimary() +{ + d_queueStats_sp->onEvent( + mqbstat::QueueStatsDomain::EventType::e_CHANGE_ROLE, + mqbstat::QueueStatsDomain::Role::e_PRIMARY); +} + void FileBackedStorage::clearSelection() { for (AutoConfirms::const_iterator it = d_autoConfirms.begin(); diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index 58f916468..382bf7170 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -209,6 +210,9 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { AutoConfirms d_autoConfirms; // Auto CONFIRMs waiting for 'put' or 'processMessageRecord' + bsl::shared_ptr d_queueStats_sp; + // Statistics of the queue associated to this storage. + private: // NOT IMPLEMENTED FileBackedStorage(const FileBackedStorage&) BSLS_KEYWORD_DELETED; @@ -242,8 +246,7 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { FileBackedStorage(DataStore* dataStore, const bmqt::Uri& queueUri, const mqbu::StorageKey& queueKey, - const mqbconfm::Domain& config, - mqbu::CapacityMeter* parentCapacityMeter, + mqbi::Domain* domain, bslma::Allocator* allocator, bmqma::CountingAllocatorStore* allocatorStore = 0); @@ -539,6 +542,8 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { /// Any other sequence removes auto CONFIRMs. /// Auto-confirmed Apps do not PUSH the message. + virtual void setPrimary() BSLS_KEYWORD_OVERRIDE; + // ACCESSORS (for mqbs::ReplicatedStorage) int partitionId() const BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp index 004ecdb17..5e67b2ad2 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.t.cpp @@ -510,13 +510,15 @@ struct Tester { domainCfg.deduplicationTimeMs() = 0; // No history domainCfg.messageTtl() = ttlSeconds; + bmqu::MemOutStream errDescription(bmqtst::TestHelperUtil::allocator()); + d_mockDomain.configure(errDescription, domainCfg); + d_replicatedStorage_mp.load( new (*d_allocator_p) mqbs::FileBackedStorage( &d_dataStore, bmqt::Uri(uri, bmqtst::TestHelperUtil::allocator()), queueKey, - domainCfg, - d_mockDomain.capacityMeter(), + &d_mockDomain, d_allocator_p), d_allocator_p); diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 6a4298c92..fe429263b 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -5427,8 +5427,7 @@ void FileStore::createStorage(bsl::shared_ptr* storageSp, FileBackedStorage(this, queueUri, queueKey, - domain->config(), - domain->capacityMeter(), + domain, storageAlloc, &d_storageAllocatorStore), storageAlloc); @@ -6722,6 +6721,11 @@ void FileStore::setActivePrimary(mqbnet::ClusterNode* primaryNode, d_config.partitionId(), mqbstat::ClusterStats::PrimaryStatus::e_PRIMARY); + for (StorageMapIter sIt = d_storages.begin(); sIt != d_storages.end(); + ++sIt) { + sIt->second->setPrimary(); + } + // Schedule a sync point issue recurring event every 1 second, starting // after 1 second. d_config.scheduler()->scheduleRecurringEvent( diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index 9cb8cae34..37bd3450e 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -133,8 +133,6 @@ void InMemoryStorage::setQueue(mqbi::Queue* queue) const bsls::Types::Int64 numByte = numBytes( mqbu::StorageKey::k_NULL_KEY); - queue->stats()->setQueueContentRaw(numMessage, numByte); - BALL_LOG_INFO << "Associated queue [" << queue->uri() << "] with key [" << queueKey() << "] and Partition [" << queue->partitionId() << "] with its storage having [" @@ -600,6 +598,11 @@ void InMemoryStorage::purge( BSLS_ASSERT_OPT(false && "Invalid operation on in-memory storage"); } +void InMemoryStorage::setPrimary() +{ + // NOTHING +} + // ACCESSORS (for mqbs::ReplicatedStorage) const ReplicatedStorage::RecordHandles& InMemoryStorage::queueOpRecordHandles() const diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index 9677a1486..5ccbac8be 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -546,6 +546,8 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { void purge(const mqbu::StorageKey& appKey) BSLS_KEYWORD_OVERRIDE; + virtual void setPrimary() BSLS_KEYWORD_OVERRIDE; + // ACCESSORS // (virtual mqbs::ReplicatedStorage) int partitionId() const BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_replicatedstorage.h b/src/groups/mqb/mqbs/mqbs_replicatedstorage.h index d98e0c12f..670ec4cae 100644 --- a/src/groups/mqb/mqbs/mqbs_replicatedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_replicatedstorage.h @@ -98,6 +98,11 @@ class ReplicatedStorage : public mqbi::Storage { /// replica nodes, and the record will not be replicated to peer nodes. virtual void purge(const mqbu::StorageKey& appKey) = 0; + // Notify the storage of node role set to primary + virtual void setPrimary() = 0; + + // ACCESSORS + /// Return a non-modifiable list of handles of all QUEUEOP records /// associated with this storage. virtual const RecordHandles& queueOpRecordHandles() const = 0; diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp index 2fa99dd62..7943f4b57 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.cpp @@ -685,15 +685,6 @@ void QueueStatsDomain::onEvent(EventType::Enum type, }; } -void QueueStatsDomain::setQueueContentRaw(bsls::Types::Int64 messages, - bsls::Types::Int64 bytes) -{ - BSLS_ASSERT_SAFE(d_statContext_mp && "initialize was not called"); - - d_statContext_mp->setValue(DomainQueueStats::e_STAT_BYTES, bytes); - d_statContext_mp->setValue(DomainQueueStats::e_STAT_MESSAGES, messages); -} - void QueueStatsDomain::updateDomainAppIds( const bsl::vector& appIds) { diff --git a/src/groups/mqb/mqbstat/mqbstat_queuestats.h b/src/groups/mqb/mqbstat/mqbstat_queuestats.h index 20043568a..7d4a389b8 100644 --- a/src/groups/mqb/mqbstat/mqbstat_queuestats.h +++ b/src/groups/mqb/mqbstat/mqbstat_queuestats.h @@ -267,11 +267,6 @@ class QueueStatsDomain { bsls::Types::Int64 value, const bsl::string& appId); - /// Force set the stats of the content of the queue to the specified - /// absolute `messages` and `bytes` values. - void setQueueContentRaw(bsls::Types::Int64 messages, - bsls::Types::Int64 bytes); - /// Update subcontexts in case of domain reconfigure with the given list of /// AppIds. void updateDomainAppIds(const bsl::vector& appIds);