diff --git a/src/groups/bmq/bmqa/bmqa_confirmeventbuilder.cpp b/src/groups/bmq/bmqa/bmqa_confirmeventbuilder.cpp index 572604506..32dee1c88 100644 --- a/src/groups/bmq/bmqa/bmqa_confirmeventbuilder.cpp +++ b/src/groups/bmq/bmqa/bmqa_confirmeventbuilder.cpp @@ -100,7 +100,7 @@ const bdlbb::Blob& ConfirmEventBuilder::blob() const // PRECONDITIONS BSLS_ASSERT(d_impl.d_builder_p); - return d_impl.d_builder_p->blob(); + return *d_impl.d_builder_p->blob(); } } // close package namespace diff --git a/src/groups/bmq/bmqa/bmqa_message.t.cpp b/src/groups/bmq/bmqa/bmqa_message.t.cpp index 8a9a18785..77e627aee 100644 --- a/src/groups/bmq/bmqa/bmqa_message.t.cpp +++ b/src/groups/bmq/bmqa/bmqa_message.t.cpp @@ -188,7 +188,7 @@ static void test2_validPushMessagePrint() bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p); ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_EQ(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); ASSERT_EQ(0, peb.messageCount()); // Add SubQueueInfo option @@ -199,7 +199,7 @@ static void test2_validPushMessagePrint() ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); // 'eventSize()' excludes unpacked messages ASSERT_LT(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // But the option is written to the underlying blob rc = peb.packMessage(payload, queueId, @@ -211,7 +211,7 @@ static void test2_validPushMessagePrint() ASSERT_LT(payload.length(), peb.eventSize()); ASSERT_EQ(1, peb.messageCount()); - bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true); + bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true); implPtr->configureAsMessageEvent(bmqpEvent); implPtr->addCorrelationId(bmqt::CorrelationId()); @@ -303,7 +303,7 @@ static void test3_messageProperties() queue->setId(queueId); implPtr->insertQueue(subQueueId, queue); - bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true); + bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true); implPtr->configureAsMessageEvent(bmqpEvent); implPtr->addCorrelationId(bmqt::CorrelationId()); @@ -443,7 +443,7 @@ static void test4_subscriptionHandle() static_cast(peb.eventSize())); // 'eventSize()' excludes unpacked messages ASSERT_LT(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // But the option is written to the underlying blob // Add message @@ -457,7 +457,7 @@ static void test4_subscriptionHandle() ASSERT_LT(payload.length(), peb.eventSize()); ASSERT_EQ(1, peb.messageCount()); - bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true); + bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true); implPtr->configureAsMessageEvent(bmqpEvent); implPtr->insertQueue(sId, queueSp); @@ -506,7 +506,7 @@ static void test4_subscriptionHandle() ASSERT_LT(payload.length(), peb.eventSize()); ASSERT_EQ(1, peb.messageCount()); - bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true); + bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true); implPtr->configureAsMessageEvent(bmqpEvent); implPtr->insertQueue(defaultSubscriptionId, queueSp); @@ -548,7 +548,7 @@ static void test4_subscriptionHandle() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); ASSERT_EQ(1, builder.messageCount()); - bmqp::Event bmqpEvent(&builder.blob(), s_allocator_p); + bmqp::Event bmqpEvent(builder.blob().get(), s_allocator_p); implPtr->configureAsMessageEvent(bmqpEvent); implPtr->insertQueue(queueSp); @@ -582,7 +582,7 @@ static void test4_subscriptionHandle() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); ASSERT_EQ(1, builder.messageCount()); - bmqp::Event bmqpEvent(&builder.blob(), s_allocator_p); + bmqp::Event bmqpEvent(builder.blob().get(), s_allocator_p); implPtr->configureAsMessageEvent(bmqpEvent); implPtr->insertQueue(queueSp); diff --git a/src/groups/bmq/bmqa/bmqa_messageevent.t.cpp b/src/groups/bmq/bmqa/bmqa_messageevent.t.cpp index 6a44cd160..ff1e53f45 100644 --- a/src/groups/bmq/bmqa/bmqa_messageevent.t.cpp +++ b/src/groups/bmq/bmqa/bmqa_messageevent.t.cpp @@ -173,7 +173,7 @@ static void test2_ackMesageIteratorTest() PVV("Appending messages"); appendMessages(&builder, &messages, k_NUM_MSGS); - bmqp::Event rawEvent(&builder.blob(), s_allocator_p); + bmqp::Event rawEvent(builder.blob().get(), s_allocator_p); bsl::shared_ptr eventImpl; eventImpl.createInplace(s_allocator_p, &bufferFactory, s_allocator_p); @@ -239,7 +239,7 @@ static void test3_putMessageIteratorTest() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); } - bmqp::Event rawEvent(&builder.blob(), s_allocator_p); + bmqp::Event rawEvent(builder.blob().get(), s_allocator_p); bsl::shared_ptr eventImpl; eventImpl.createInplace(s_allocator_p, &bufferFactory, s_allocator_p); diff --git a/src/groups/bmq/bmqa/bmqa_mocksession.cpp b/src/groups/bmq/bmqa/bmqa_mocksession.cpp index cfa1f5907..753c5e46c 100644 --- a/src/groups/bmq/bmqa/bmqa_mocksession.cpp +++ b/src/groups/bmq/bmqa/bmqa_mocksession.cpp @@ -300,7 +300,7 @@ Event MockSessionUtil::createAckEvent(const bsl::vector& acks, // TODO: deprecate `createAckEvent` with bufferFactory arg and introduce // another function with BlobSpPool arg. - bmqa::Session::BlobSpPool blobSpPool( + BlobSpPool blobSpPool( bmqp::BlobPoolUtil::createBlobPool(bufferFactory, allocator)); bmqp::AckEventBuilder ackBuilder(&blobSpPool, alloc); @@ -318,7 +318,7 @@ Event MockSessionUtil::createAckEvent(const bsl::vector& acks, } implPtr->configureAsMessageEvent( - bmqp::Event(&ackBuilder.blob(), alloc, true)); + bmqp::Event(ackBuilder.blob().get(), alloc, true)); for (size_t i = 0; i != acks.size(); ++i) { implPtr->addCorrelationId(acks[i].d_correlationId); } @@ -343,7 +343,7 @@ Event MockSessionUtil::createPushEvent( // TODO: deprecate `createPushEvent` with bufferFactory arg and introduce // another function with BlobSpPool arg. - bmqa::Session::BlobSpPool blobSpPool( + BlobSpPool blobSpPool( bmqp::BlobPoolUtil::createBlobPool(bufferFactory, allocator)); bmqp::PushEventBuilder pushBuilder(&blobSpPool, alloc); @@ -379,7 +379,7 @@ Event MockSessionUtil::createPushEvent( implPtr->addCorrelationId(bmqt::CorrelationId()); } - bmqp::Event bmqpEvent(&pushBuilder.blob(), alloc, true); + bmqp::Event bmqpEvent(pushBuilder.blob().get(), alloc, true); implPtr->configureAsMessageEvent(bmqpEvent); return event; diff --git a/src/groups/bmq/bmqa/bmqa_mocksession.h b/src/groups/bmq/bmqa/bmqa_mocksession.h index 4a598d46b..9604e26f9 100644 --- a/src/groups/bmq/bmqa/bmqa_mocksession.h +++ b/src/groups/bmq/bmqa/bmqa_mocksession.h @@ -556,6 +556,7 @@ #include #include #include +#include #include #include #include @@ -607,6 +608,13 @@ struct MockSessionUtil { private: // PRIVATE TYPES + /// Pool of shared pointers to Blobs + typedef bdlcc::SharedObjectPool< + bdlbb::Blob, + bdlcc::ObjectPoolFunctors::DefaultCreator, + bdlcc::ObjectPoolFunctors::RemoveAll > + BlobSpPool; + /// Event impl shared pointer to access /// the pimpl of `bmqa::Event`. typedef bsl::shared_ptr EventImplSp; @@ -751,9 +759,6 @@ class MockSession : public AbstractSession { public: // TYPES - /// Pool of shared pointers to Blobs - typedef bmqa::Session::BlobSpPool BlobSpPool; - // CLASS METHODS /// Perform a one time initialization needed by components used in @@ -803,6 +808,13 @@ class MockSession : public AbstractSession { // PRIVATE TYPES + /// Pool of shared pointers to Blobs + typedef bdlcc::SharedObjectPool< + bdlbb::Blob, + bdlcc::ObjectPoolFunctors::DefaultCreator, + bdlcc::ObjectPoolFunctors::RemoveAll > + BlobSpPool; + /// Aligned buffer holding the two key hash map typedef bsls::AlignedBuffer TwoKeyHashMapBuffer; diff --git a/src/groups/bmq/bmqa/bmqa_session.h b/src/groups/bmq/bmqa/bmqa_session.h index 5b55556f1..6e1423b1e 100644 --- a/src/groups/bmq/bmqa/bmqa_session.h +++ b/src/groups/bmq/bmqa/bmqa_session.h @@ -565,7 +565,6 @@ // BDE #include -#include #include #include #include @@ -680,13 +679,6 @@ class Session : public AbstractSession { public: // TYPES - /// Pool of shared pointers to Blobs - typedef bdlcc::SharedObjectPool< - bdlbb::Blob, - bdlcc::ObjectPoolFunctors::DefaultCreator, - bdlcc::ObjectPoolFunctors::RemoveAll > - BlobSpPool; - /// Invoked as a response to an asynchronous open queue operation, /// `OpenQueueCallback` is an alias for a callback function object /// (functor) that takes as an argument the specified `result`, diff --git a/src/groups/bmq/bmqimp/bmqimp_application.cpp b/src/groups/bmq/bmqimp/bmqimp_application.cpp index 9690adbde..4d6dd304d 100644 --- a/src/groups/bmq/bmqimp/bmqimp_application.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_application.cpp @@ -584,7 +584,6 @@ Application::Application( NegotiatedChannelFactoryConfig(&d_statChannelFactory, negotiationMessage, sessionOptions.connectTimeout(), - &d_blobBufferFactory, &d_blobSpPool, allocator), allocator) diff --git a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp index d8273e233..4ea0dd7ae 100644 --- a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp @@ -4530,7 +4530,7 @@ void BrokerSession::transferAckEvent(bmqp::AckEventBuilder* ackBuilder, // Our event is full at this point so send this ack event to the user and // reset the builder to append the ack that was rejected. - bmqp::Event event(&ackBuilder->blob(), d_allocator_p, true); + bmqp::Event event(ackBuilder->blob().get(), d_allocator_p, true); // clone = true if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( d_messageDumper.isEventDumpEnabled())) { @@ -4820,7 +4820,7 @@ bool BrokerSession::appendOrSend( if (result == bmqt::EventBuilderResult::e_PAYLOAD_TOO_BIG) { // Send the current event, reset the builder. bmqt::GenericResult::Enum res = writeOrBuffer( - builder.blob(), + *builder.blob(), d_sessionOptions.channelHighWatermark()); if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( @@ -4943,7 +4943,7 @@ void BrokerSession::retransmitPendingMessages() // Send the final PUT event if there are any messages in the builder if (putBuilder.messageCount()) { bmqt::GenericResult::Enum res = writeOrBuffer( - putBuilder.blob(), + *putBuilder.blob(), d_sessionOptions.channelHighWatermark()); if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( @@ -7330,7 +7330,7 @@ int BrokerSession::confirmMessage(const bsl::shared_ptr& queue, << rc; return rc; // RETURN } - bool isAccepted = acceptUserEvent(builder.blob(), timeout); + bool isAccepted = acceptUserEvent(*builder.blob(), timeout); if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(!isAccepted)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; diff --git a/src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp b/src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp index 5fd334e0a..08de29779 100644 --- a/src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_brokersession.t.cpp @@ -2426,7 +2426,7 @@ void TestSession::sendControlMessage( int rc = builder.setMessage(message, bmqp::EventType::e_CONTROL); ASSERT_EQ(0, rc); - const bdlbb::Blob& packet = builder.blob(); + const bdlbb::Blob& packet = *builder.blob(); PVVV_SAFE("Send control message"); d_brokerSession.processPacket(packet); @@ -4020,7 +4020,7 @@ static void test11_disconnect() bmqp::EventType::e_CONTROL); ASSERT_EQ(rc, 0); - obj.processPacket(builder.blob()); + obj.processPacket(*builder.blob()); // Reset the channel obj.setChannel(bsl::shared_ptr()); @@ -4120,7 +4120,7 @@ static void test11_disconnect() bmqp::EventType::e_CONTROL); ASSERT_EQ(rc, 0); - obj.processPacket(builder.blob()); + obj.processPacket(*builder.blob()); // Reset the channel obj.setChannel(bsl::shared_ptr()); @@ -4671,7 +4671,7 @@ static void test21_post_Limit() ASSERT_EQ(bmqt::EventBuilderResult::e_SUCCESS, builder.packMessage(pQueue->id())); - int rc = obj.session().post(builder.blob(), postTimeout); + int rc = obj.session().post(*builder.blob(), postTimeout); PVV_SAFE("Step 5. Ensure the PUT message is accepted"); ASSERT_EQ(rc, bmqt::PostResult::e_SUCCESS); @@ -4686,7 +4686,7 @@ static void test21_post_Limit() ASSERT(rawEvent.isPutEvent()); PVV_SAFE("Step 6. Ensure e_BW_LIMIT is returned for the second post"); - rc = obj.session().post(builder.blob(), postTimeout); + rc = obj.session().post(*builder.blob(), postTimeout); ASSERT_EQ(rc, bmqt::PostResult::e_BW_LIMIT); ASSERT_EQ(obj.channel().writeCalls().size(), 0u); @@ -4700,7 +4700,7 @@ static void test21_post_Limit() // Call post with a bigger timeout so that LWM event arrives before it // expires. - rc = obj.session().post(builder.blob(), timeout); + rc = obj.session().post(*builder.blob(), timeout); PVV_SAFE("Step 8. Ensure e_SUCCESS is returned"); ASSERT_EQ(rc, bmqt::PostResult::e_SUCCESS); @@ -4720,7 +4720,7 @@ static void test21_post_Limit() PVV_SAFE("Step 9. Set the channel to return e_GENERIC_ERROR on write"); obj.channel().setWriteStatus(bmqio::StatusCategory::e_GENERIC_ERROR); - rc = obj.session().post(builder.blob(), postTimeout); + rc = obj.session().post(*builder.blob(), postTimeout); PVV_SAFE("Step 10. Ensure e_SUCCESS is returned"); ASSERT_EQ(rc, bmqt::PostResult::e_SUCCESS); @@ -4802,7 +4802,7 @@ static void test22_confirm_Limit() bmqt::MessageGUID()); ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); - rc = obj.session().confirmMessages(builder.blob(), confirmTimeout); + rc = obj.session().confirmMessages(*builder.blob(), confirmTimeout); PVV_SAFE("Step 5. Ensure e_SUCCESS is returned"); ASSERT_EQ(rc, bmqt::GenericResult::e_SUCCESS); @@ -4826,7 +4826,7 @@ static void test22_confirm_Limit() // Call confirm with a bigger timeout so that LWM event arrives before it // expires. - rc = obj.session().confirmMessages(builder.blob(), timeout); + rc = obj.session().confirmMessages(*builder.blob(), timeout); PVV_SAFE("Step 7. Ensure e_SUCCESS is returned"); ASSERT_EQ(rc, bmqt::GenericResult::e_SUCCESS); @@ -4844,7 +4844,7 @@ static void test22_confirm_Limit() PVV_SAFE("Step 8. Set the channel to return e_GENERIC_ERROR on write"); obj.channel().setWriteStatus(bmqio::StatusCategory::e_GENERIC_ERROR); - rc = obj.session().confirmMessages(builder.blob(), confirmTimeout); + rc = obj.session().confirmMessages(*builder.blob(), confirmTimeout); PVV_SAFE("Step 9. Ensure e_SUCCESS"); @@ -6637,7 +6637,7 @@ static void test33_queueNackTest() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); // Post the event using just event blob - int res = obj.session().post(eventBuilder.blob(), timeout); + int res = obj.session().post(*eventBuilder.blob(), timeout); ASSERT_EQ(res, bmqt::PostResult::e_SUCCESS); @@ -8998,7 +8998,7 @@ static void test50_putRetransmittingTest() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); // Post the event using event blob - int res = obj.session().post(putEventBuilder.blob(), timeout); + int res = obj.session().post(*putEventBuilder.blob(), timeout); ASSERT_EQ(res, bmqt::PostResult::e_SUCCESS); @@ -9013,7 +9013,7 @@ static void test50_putRetransmittingTest() guidFirst, pQueue->id()); - obj.session().processPacket(ackEventBuilder.blob()); + obj.session().processPacket(*ackEventBuilder.blob()); bsl::shared_ptr ackEvent = obj.waitAckEvent(); @@ -9044,7 +9044,7 @@ static void test50_putRetransmittingTest() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); // Post the event using event blob - res = obj.session().post(putEventBuilder.blob(), timeout); + res = obj.session().post(*putEventBuilder.blob(), timeout); ASSERT_EQ(res, bmqt::PostResult::e_SUCCESS); @@ -9117,7 +9117,7 @@ static void test50_putRetransmittingTest() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); // Post the event using event blob - res = obj.session().post(putEventBuilder.blob(), timeout); + res = obj.session().post(*putEventBuilder.blob(), timeout); ASSERT_EQ(res, bmqt::PostResult::e_SUCCESS); @@ -9273,7 +9273,7 @@ static void test51_putRetransmittingNoAckTest() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); // Post the event using event blob - int res = obj.session().post(putEventBuilder.blob(), timeout); + int res = obj.session().post(*putEventBuilder.blob(), timeout); ASSERT_EQ(res, bmqt::PostResult::e_SUCCESS); @@ -9298,7 +9298,7 @@ static void test51_putRetransmittingNoAckTest() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); // Post the event using event blob - res = obj.session().post(putEventBuilder.blob(), timeout); + res = obj.session().post(*putEventBuilder.blob(), timeout); ASSERT_EQ(res, bmqt::PostResult::e_SUCCESS); diff --git a/src/groups/bmq/bmqimp/bmqimp_event.cpp b/src/groups/bmq/bmqimp/bmqimp_event.cpp index 99fe0ab24..fdb469f36 100644 --- a/src/groups/bmq/bmqimp/bmqimp_event.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_event.cpp @@ -320,7 +320,7 @@ Event& Event::downgradeMessageEventModeToRead() d_msgEventMode = MessageEventMode::e_READ; - d_rawEvent.reset(&(d_putEventBuilderBuffer.object().blob())); + d_rawEvent.reset(d_putEventBuilderBuffer.object().blob().get()); BSLS_ASSERT(d_rawEvent.isPutEvent()); // Only PUT events can be built via SDK diff --git a/src/groups/bmq/bmqimp/bmqimp_event.t.cpp b/src/groups/bmq/bmqimp/bmqimp_event.t.cpp index 560920c53..ba13993a2 100644 --- a/src/groups/bmq/bmqimp/bmqimp_event.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_event.t.cpp @@ -1178,8 +1178,7 @@ static void test8_putEventBuilder() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = builder.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(builder.blob().get(), s_allocator_p); ASSERT(rawEvent.isValid()); ASSERT(rawEvent.isPutEvent()); diff --git a/src/groups/bmq/bmqimp/bmqimp_eventqueue.t.cpp b/src/groups/bmq/bmqimp/bmqimp_eventqueue.t.cpp index 6f9d1faac..f910fd995 100644 --- a/src/groups/bmq/bmqimp/bmqimp_eventqueue.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_eventqueue.t.cpp @@ -278,7 +278,7 @@ static void test2_capacityTest() s_allocator_p); builder.startMessage(); - const bdlbb::Blob& eventBlob = builder.blob(); + const bdlbb::Blob& eventBlob = *builder.blob(); bmqp::Event rawEvent(&eventBlob, s_allocator_p); bsl::shared_ptr event; @@ -606,7 +606,7 @@ static void test6_workingStatsTest() obj.initializeStats(&rootStatContext, start, end); builder.startMessage(); - const bdlbb::Blob& eventBlob = builder.blob(); + const bdlbb::Blob& eventBlob = *builder.blob(); bmqp::Event rawEvent(&eventBlob, s_allocator_p); bsl::shared_ptr event; diff --git a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp index d84f722a5..b01bf81c7 100644 --- a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp @@ -658,7 +658,7 @@ bmqp::Event& Tester::pushEvent(bmqp::Event* event) const // PRECONDITIONS BSLS_ASSERT_OPT(event && "'event' must be provided"); - event->reset(&d_pushEventBuilder.blob(), true); + event->reset(d_pushEventBuilder.blob().get(), true); return *event; } @@ -668,7 +668,7 @@ bmqp::Event& Tester::ackEvent(bmqp::Event* event) const // PRECONDITIONS BSLS_ASSERT_OPT(event && "'event' must be provided"); - event->reset(&d_ackEventBuilder.blob(), true); + event->reset(d_ackEventBuilder.blob().get(), true); return *event; } @@ -678,7 +678,7 @@ bmqp::Event& Tester::putEvent(bmqp::Event* event) const // PRECONDITIONS BSLS_ASSERT_OPT(event && "'event' must be provided"); - event->reset(&d_putEventBuilder.blob(), true); + event->reset(d_putEventBuilder.blob().get(), true); return *event; } @@ -688,7 +688,7 @@ bmqp::Event& Tester::confirmEvent(bmqp::Event* event) const // PRECONDITIONS BSLS_ASSERT_OPT(event && "'event' must be provided"); - event->reset(&d_confirmEventBuilder.blob(), true); + event->reset(d_confirmEventBuilder.blob().get(), true); return *event; } diff --git a/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.cpp b/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.cpp index 5351307dd..af83af556 100644 --- a/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.cpp @@ -66,19 +66,16 @@ NegotiatedChannelFactoryConfig::NegotiatedChannelFactoryConfig( bmqio::ChannelFactory* base, const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage, const bsls::TimeInterval& negotiationTimeout, - bdlbb::BlobBufferFactory* bufferFactory, BlobSpPool* blobSpPool_p, bslma::Allocator* basicAllocator) : d_baseFactory_p(base) , d_negotiationMessage(negotiationMessage, basicAllocator) , d_negotiationTimeout(negotiationTimeout) -, d_bufferFactory_p(bufferFactory) , d_blobSpPool_p(blobSpPool_p) , d_allocator_p(bslma::Default::allocator(basicAllocator)) { // PRECONDITIONS BSLS_ASSERT(base); - BSLS_ASSERT(bufferFactory); } NegotiatedChannelFactoryConfig::NegotiatedChannelFactoryConfig( @@ -87,7 +84,6 @@ NegotiatedChannelFactoryConfig::NegotiatedChannelFactoryConfig( : d_baseFactory_p(original.d_baseFactory_p) , d_negotiationMessage(original.d_negotiationMessage, basicAllocator) , d_negotiationTimeout(original.d_negotiationTimeout) -, d_bufferFactory_p(original.d_bufferFactory_p) , d_blobSpPool_p(original.d_blobSpPool_p) , d_allocator_p(bslma::Default::allocator(basicAllocator)) { @@ -146,8 +142,8 @@ void NegotiatedChannelFactory::negotiate( << "': " << d_config.d_negotiationMessage; bmqio::Status status; BALL_LOG_TRACE << "Sending blob:\n" - << bmqu::BlobStartHexDumper(&builder.blob()); - channel->write(&status, builder.blob()); + << bmqu::BlobStartHexDumper(builder.blob().get()); + channel->write(&status, *builder.blob()); if (!status) { BALL_LOG_ERROR << "Negotiation failed [reason: 'failed sending packet'" << ", status: " << status << "]"; diff --git a/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.h b/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.h index 7e34f0d30..05cbf2ad5 100644 --- a/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.h +++ b/src/groups/bmq/bmqimp/bmqimp_negotiatedchannelfactory.h @@ -67,7 +67,6 @@ class NegotiatedChannelFactoryConfig { bmqio::ChannelFactory* d_baseFactory_p; bmqp_ctrlmsg::NegotiationMessage d_negotiationMessage; bsls::TimeInterval d_negotiationTimeout; - bdlbb::BlobBufferFactory* d_bufferFactory_p; BlobSpPool* d_blobSpPool_p; bslma::Allocator* d_allocator_p; @@ -84,7 +83,6 @@ class NegotiatedChannelFactoryConfig { bmqio::ChannelFactory* base, const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage, const bsls::TimeInterval& negotiationTimeout, - bdlbb::BlobBufferFactory* bufferFactory, BlobSpPool* blobSpPool_p, bslma::Allocator* basicAllocator = 0); diff --git a/src/groups/bmq/bmqimp/bmqimp_queuemanager.t.cpp b/src/groups/bmq/bmqimp/bmqimp_queuemanager.t.cpp index 5749f8588..3dbab13c7 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queuemanager.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_queuemanager.t.cpp @@ -608,8 +608,7 @@ static void test9_pushStatsTest() BSLS_ASSERT_SAFE(rc == bmqt::EventBuilderResult::e_SUCCESS); - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -710,8 +709,7 @@ static void test10_putStatsTest() BSLS_ASSERT_SAFE(rc == bmqt::EventBuilderResult::e_SUCCESS); - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPutEvent()); diff --git a/src/groups/bmq/bmqp/bmqp_ackeventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_ackeventbuilder.cpp index 5837c4437..13caadc78 100644 --- a/src/groups/bmq/bmqp/bmqp_ackeventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_ackeventbuilder.cpp @@ -38,11 +38,19 @@ AckEventBuilder::AckEventBuilder(BlobSpPool* blobSpPool_p, bslma::Allocator* allocator) : d_blobSpPool_p(blobSpPool_p) , d_blob_sp(0, allocator) // initialized in `reset()` +, d_emptyBlob_sp(blobSpPool_p->getObject()) , d_msgCount(0) { // PRECONDITIONS BSLS_ASSERT_SAFE(blobSpPool_p); + // Assume that items built with the given `blobSpPool_p` either all have or + // all don't have buffer factory, and check it once for `d_emptyBlob_sp`. + // We require this since we do `Blob::setLength`: + BSLS_ASSERT_SAFE( + NULL != d_emptyBlob_sp->factory() && + "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); + reset(); } @@ -50,11 +58,6 @@ void AckEventBuilder::reset() { d_blob_sp = d_blobSpPool_p->getObject(); - // The following prerequisite is necessary since we do `Blob::setLength`: - BSLS_ASSERT_SAFE( - NULL != d_blob_sp->factory() && - "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); - d_msgCount = 0; // NOTE: Since AckEventBuilder owns the blob and we just reset it, we have @@ -114,27 +117,7 @@ AckEventBuilder::appendMessage(int status, return bmqt::EventBuilderResult::e_SUCCESS; } -const bdlbb::Blob& AckEventBuilder::blob() const -{ - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); - - // Empty event - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return ProtocolUtil::emptyBlob(); // RETURN - } - - // Fix packet's length in header now that we know it. Following is valid - // (see comment in reset). - EventHeader& eh = *reinterpret_cast( - d_blob_sp->buffer(0).data()); - eh.setLength(d_blob_sp->length()); - - return *d_blob_sp; -} - -bsl::shared_ptr AckEventBuilder::blob_sp() const +const bsl::shared_ptr& AckEventBuilder::blob() const { // PRECONDITIONS BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); @@ -142,7 +125,7 @@ bsl::shared_ptr AckEventBuilder::blob_sp() const // Empty event if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return bsl::shared_ptr(); // RETURN + return d_emptyBlob_sp; // RETURN } // Fix packet's length in header now that we know it. Following is valid diff --git a/src/groups/bmq/bmqp/bmqp_ackeventbuilder.h b/src/groups/bmq/bmqp/bmqp_ackeventbuilder.h index 619b0a029..c2f573e78 100644 --- a/src/groups/bmq/bmqp/bmqp_ackeventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_ackeventbuilder.h @@ -41,18 +41,22 @@ /// Usage ///----- //.. -// bdlbb::BlobPooledBlobBufferFactory bufferFactory(1024, d_allocator_p); -// bmqp::AckEventBuilder builder(&bufferFactory, d_allocator_p); +// bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +// bmqp::BlobPoolUtil::BlobSpPool blobSpPool( +// bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p)); +// bmqp::AckEventBuilder builder(&blobSpPool, d_allocator_p); // // // Append multiple messages // builder.appendMessage(0, 1, bmqt::MessageGUID(), 1); // builder.appendMessage(-1, 2, bmqt::MessageGUID(), 1); // -// const bdlbb::Blob& eventBlob = builder.blob(); +// const bsl::shared_ptr& eventBlob = builder.blob(); // // Send the blob ... // // // We can reset the builder to reuse it; note that this invalidates the -// // 'eventBlob' retrieved above +// // 'eventBlob' shared pointer reference retrieved above. To keep the +// // bdlbb::Blob valid the shared pointer should be copied, and the copy +// // should be passed and kept in IO components. // builder.reset(); //.. @@ -93,6 +97,9 @@ class AckEventBuilder BSLS_CPP11_FINAL { /// `mutable` to skip writing the length until the blob is retrieved. mutable bsl::shared_ptr d_blob_sp; + /// Empty blob to be returned when no messages were added to this builder. + bsl::shared_ptr d_emptyBlob_sp; + int d_msgCount; // number of messages currently in the // event @@ -110,7 +117,7 @@ class AckEventBuilder BSLS_CPP11_FINAL { public: // CREATORS - /// Create a new `PushEventBuilder` using the specified `blobSpPool_p` and + /// Create a new `AckEventBuilder` using the specified `blobSpPool_p` and /// `allocator` for the blob. We require BlobSpPool to build Blobs with /// set BlobBufferFactory since we might want to expand the built Blob /// dynamically. @@ -145,17 +152,13 @@ class AckEventBuilder BSLS_CPP11_FINAL { /// were added, this will return 0. int eventSize() const; - /// Return a reference not offering modifiable access to the blob built - /// by this event. If no messages were added, this will return an empty - /// blob, i.e., a blob with length == 0. - const bdlbb::Blob& blob() const; - - /// Return a shared pointer to the built Blob. If no messages were added, - /// this will return an empty shared pointer. - /// Note that a shared pointer is returned by value, so the user holds to - /// the copy of a pointer. The Blob in that copy will be valid even if we - /// `reset` this builder and modify the internal shared pointer. - bsl::shared_ptr blob_sp() const; + /// Return a reference to the shared pointer to the built Blob. If no + /// messages were added, the Blob object under this reference will be + /// empty. + /// Note that this accessor exposes an internal shared pointer object, and + /// it is the user's responsibility to make a copy of it if it needs to be + /// passed and kept in another thread while this builder object is used. + const bsl::shared_ptr& blob() const; }; // ============================================================================ diff --git a/src/groups/bmq/bmqp/bmqp_ackeventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_ackeventbuilder.t.cpp index f5b2de7d4..0819ad2d7 100644 --- a/src/groups/bmq/bmqp/bmqp_ackeventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_ackeventbuilder.t.cpp @@ -97,10 +97,10 @@ static void verifyContent(const bmqp::AckEventBuilder& builder, (data.size() * sizeof(bmqp::AckMessage)); ASSERT_EQ(static_cast(builder.messageCount()), data.size()); ASSERT_EQ(static_cast(builder.eventSize()), expectedSize); - ASSERT_EQ(static_cast(builder.blob().length()), expectedSize); + ASSERT_EQ(static_cast(builder.blob()->length()), expectedSize); PVV("Iterating over messages"); - bmqp::Event event(&builder.blob(), s_allocator_p); + bmqp::Event event(builder.blob().get(), s_allocator_p); ASSERT_EQ(event.isValid(), true); ASSERT_EQ(event.isAckEvent(), true); @@ -147,7 +147,7 @@ static void test1_breathingTest() ASSERT_EQ(obj.messageCount(), 0); ASSERT_NE(obj.maxMessageCount(), 0); ASSERT_EQ(obj.eventSize(), 0); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); PVV("Appending one message"); appendMessages(&obj, &messages, 1); @@ -196,7 +196,7 @@ static void test3_reset() PV("Verifying accessors"); ASSERT_EQ(obj.messageCount(), 0); ASSERT_EQ(obj.eventSize(), 0); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); PV("Appending another message"); messages.clear(); @@ -298,7 +298,7 @@ static void testN1_decodeFromFile() BSLS_ASSERT(ofile.good() == true); - bdlbb::BlobUtil::copy(buf, obj.blob(), 0, obj.blob().length()); + bdlbb::BlobUtil::copy(buf, *obj.blob(), 0, obj.blob()->length()); ofile.write(buf, k_SIZE); ofile.close(); bsl::memset(buf, 0, k_SIZE); @@ -317,9 +317,9 @@ static void testN1_decodeFromFile() bdlbb::BlobBuffer dataBlobBuffer(dataBufferSp, k_SIZE); outBlob.appendDataBuffer(dataBlobBuffer); - outBlob.setLength(obj.blob().length()); + outBlob.setLength(obj.blob()->length()); - ASSERT_EQ(bdlbb::BlobUtil::compare(obj.blob(), outBlob), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(*obj.blob(), outBlob), 0); // Decode event bmqp::Event event(&outBlob, s_allocator_p); diff --git a/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.cpp index ed55aeb86..854b713aa 100644 --- a/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.cpp @@ -38,11 +38,19 @@ ConfirmEventBuilder::ConfirmEventBuilder(BlobSpPool* blobSpPool_p, bslma::Allocator* allocator) : d_blobSpPool_p(blobSpPool_p) , d_blob_sp(0, allocator) // initialized in `reset()` +, d_emptyBlob_sp(blobSpPool_p->getObject()) , d_msgCount(0) { // PRECONDITIONS BSLS_ASSERT_SAFE(blobSpPool_p); + // Assume that items built with the given `blobSpPool_p` either all have or + // all don't have buffer factory, and check it once for `d_emptyBlob_sp`. + // We require this since we do `Blob::setLength`: + BSLS_ASSERT_SAFE( + NULL != d_emptyBlob_sp->factory() && + "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); + reset(); } @@ -50,11 +58,6 @@ void ConfirmEventBuilder::reset() { d_blob_sp = d_blobSpPool_p->getObject(); - // The following prerequisite is necessary since we do `Blob::setLength`: - BSLS_ASSERT_SAFE( - NULL != d_blob_sp->factory() && - "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); - d_msgCount = 0; // NOTE: Since ConfirmEventBuilder owns the blob and we just reset it, we @@ -116,27 +119,7 @@ ConfirmEventBuilder::appendMessage(int queueId, return bmqt::EventBuilderResult::e_SUCCESS; } -const bdlbb::Blob& ConfirmEventBuilder::blob() const -{ - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); - - // Empty event - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return ProtocolUtil::emptyBlob(); // RETURN - } - - // Fix packet's length in header now that we know it. Following is valid - // (see comment in reset). - EventHeader& eh = *reinterpret_cast( - d_blob_sp->buffer(0).data()); - eh.setLength(d_blob_sp->length()); - - return *d_blob_sp; -} - -bsl::shared_ptr ConfirmEventBuilder::blob_sp() const +const bsl::shared_ptr& ConfirmEventBuilder::blob() const { // PRECONDITIONS BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); @@ -144,7 +127,7 @@ bsl::shared_ptr ConfirmEventBuilder::blob_sp() const // Empty event if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return bsl::shared_ptr(); // RETURN + return d_emptyBlob_sp; // RETURN } // Fix packet's length in header now that we know it. Following is valid diff --git a/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.h b/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.h index c4efe6d38..8d24bae8c 100644 --- a/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.h @@ -41,18 +41,22 @@ /// Usage ///----- //.. -// bdlbb::BlobPooledBlobBufferFactory bufferFactory(1024, d_allocator_p); -// bmqp::ConfirmEventBuilder builder(&bufferFactory, d_allocator_p); +// bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +// bmqp::BlobPoolUtil::BlobSpPool blobSpPool( +// bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p)); +// bmqp::ConfirmEventBuilder builder(&blobSpPool, d_allocator_p); // // // Append multiple messages, from same or different queue // builder.appendMessage(k_QUEUEID1, k_SUBQUEUEID1, bmqt::MessageGUID()); // builder.appendMessage(k_QUEUEID2, k_SUBQUEUEID2, bmqt::MessageGUID()); // -// const bdlbb::Blob& eventBlob = builder.blob(); +// const bsl::shared_ptr& eventBlob = builder.blob(); // // Send the blob ... // // // We can reset the builder to reuse it; note that this invalidates the -// // 'eventBlob' retrieved above +// // 'eventBlob' shared pointer reference retrieved above. To keep the +// // bdlbb::Blob valid the shared pointer should be copied, and the copy +// // should be passed and kept in IO components. // builder.reset(); // //.. @@ -92,6 +96,9 @@ class ConfirmEventBuilder { /// `mutable` to skip writing the length until the blob is retrieved. mutable bsl::shared_ptr d_blob_sp; + /// Empty blob to be returned when no messages were added to this builder. + bsl::shared_ptr d_emptyBlob_sp; + int d_msgCount; // number of messages currently in the // event @@ -111,9 +118,9 @@ class ConfirmEventBuilder { public: // CREATORS - /// Create a new `PushEventBuilder` using the specified `blobSpPool_p` and - /// `allocator` for the blob. We require BlobSpPool to build Blobs with - /// set BlobBufferFactory since we might want to expand the built Blob + /// Create a new `ConfirmEventBuilder` using the specified `blobSpPool_p` + /// and `allocator` for the blob. We require BlobSpPool to build Blobs + /// with set BlobBufferFactory since we might want to expand the built Blob /// dynamically. ConfirmEventBuilder(BlobSpPool* blobSpPool_p, bslma::Allocator* allocator); @@ -144,17 +151,13 @@ class ConfirmEventBuilder { /// were added, this will return 0. int eventSize() const; - /// Return a reference not offering modifiable access to the blob built - /// by this event. If no messages were added, this will return an empty - /// blob, i.e., a blob with length == 0. - const bdlbb::Blob& blob() const; - - /// Return a shared pointer to the built Blob. If no messages were added, - /// this will return an empty shared pointer. - /// Note that a shared pointer is returned by value, so the user holds to - /// the copy of a pointer. The Blob in that copy will be valid even if we - /// `reset` this builder and modify the internal shared pointer. - bsl::shared_ptr blob_sp() const; + /// Return a reference to the shared pointer to the built Blob. If no + /// messages were added, the Blob object under this reference will be + /// empty. + /// Note that this accessor exposes an internal shared pointer object, and + /// it is the user's responsibility to make a copy of it if it needs to be + /// passed and kept in another thread while this builder object is used. + const bsl::shared_ptr& blob() const; }; // ============================================================================ diff --git a/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.t.cpp index ff144cfb1..92633fd5d 100644 --- a/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_confirmeventbuilder.t.cpp @@ -95,10 +95,10 @@ static void verifyContent(const bmqp::ConfirmEventBuilder& builder, (data.size() * sizeof(bmqp::ConfirmMessage)); ASSERT_EQ(static_cast(builder.messageCount()), data.size()); ASSERT_EQ(static_cast(builder.eventSize()), expectedSize); - ASSERT_EQ(static_cast(builder.blob().length()), expectedSize); + ASSERT_EQ(static_cast(builder.blob()->length()), expectedSize); PVV("Iterating over messages"); - bmqp::Event event(&builder.blob(), s_allocator_p); + bmqp::Event event(builder.blob().get(), s_allocator_p); ASSERT_EQ(event.isValid(), true); ASSERT_EQ(event.isConfirmEvent(), true); @@ -144,7 +144,7 @@ static void test1_breathingTest() ASSERT_EQ(obj.messageCount(), 0); ASSERT_NE(obj.maxMessageCount(), 0); ASSERT_EQ(obj.eventSize(), 0); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); PVV("Appending one message"); appendMessages(&obj, &messages, 1); @@ -193,7 +193,7 @@ static void test3_reset() PV("Verifying accessors"); ASSERT_EQ(obj.messageCount(), 0); ASSERT_EQ(obj.eventSize(), 0); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); PV("Appending another message"); messages.clear(); @@ -277,11 +277,11 @@ static void testN1_decodeFromFile() PVV("Appending messages"); appendMessages(&obj, &messages, k_NUM_MSGS); - ASSERT_NE(obj.blob().length(), 0); + ASSERT_NE(obj.blob()->length(), 0); os << "msg_confirm_" << guid << ".bin" << bsl::ends; - const int blobLen = obj.blob().length(); + const int blobLen = obj.blob()->length(); char* buf = new char[blobLen]; /// Functor invoked to delete the file at the specified `filePath` @@ -301,7 +301,7 @@ static void testN1_decodeFromFile() BSLS_ASSERT(ofile.good() == true); - bdlbb::BlobUtil::copy(buf, obj.blob(), 0, blobLen); + bdlbb::BlobUtil::copy(buf, *obj.blob(), 0, blobLen); ofile.write(buf, blobLen); ofile.close(); bsl::memset(buf, 0, blobLen); @@ -322,7 +322,7 @@ static void testN1_decodeFromFile() outBlob.appendDataBuffer(dataBlobBuffer); outBlob.setLength(blobLen); - ASSERT_EQ(bdlbb::BlobUtil::compare(obj.blob(), outBlob), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(*obj.blob(), outBlob), 0); // Decode event bmqp::Event event(&outBlob, s_allocator_p); diff --git a/src/groups/bmq/bmqp/bmqp_event.t.cpp b/src/groups/bmq/bmqp/bmqp_event.t.cpp index 8ecbf3dd1..eba120a2d 100644 --- a/src/groups/bmq/bmqp/bmqp_event.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_event.t.cpp @@ -538,7 +538,7 @@ static void test4_eventLoading() test.d_encodingType, s_allocator_p); - BSLS_ASSERT_OPT(obj.blob().length() == 0); + BSLS_ASSERT_OPT(obj.blob()->length() == 0); { PVV(test.d_line << ": Create a control message"); @@ -550,12 +550,12 @@ static void test4_eventLoading() // Encode the message rc = obj.setMessage(ctrlMessage, bmqp::EventType::e_CONTROL); BSLS_ASSERT_OPT(rc == 0); - BSLS_ASSERT_OPT(obj.blob().length() > 0); - BSLS_ASSERT_OPT(obj.blob().length() % 4 == 0); + BSLS_ASSERT_OPT(obj.blob()->length() > 0); + BSLS_ASSERT_OPT(obj.blob()->length() % 4 == 0); PVV(test.d_line << ": Decode and compare message"); - bmqp::Event ctrlEvent(&obj.blob(), s_allocator_p); + bmqp::Event ctrlEvent(obj.blob().get(), s_allocator_p); ASSERT_EQ(ctrlEvent.isValid(), true); ASSERT_EQ(ctrlEvent.isControlEvent(), true); @@ -573,14 +573,14 @@ static void test4_eventLoading() PVV(test.d_line << ": Reset"); obj.reset(); - BSLS_ASSERT_OPT(obj.blob().length() == 0); + BSLS_ASSERT_OPT(obj.blob()->length() == 0); } { bmqp::SchemaEventBuilder obj(&blobSpPool, bmqp::EncodingType::e_BER, s_allocator_p); - BSLS_ASSERT_OPT(obj.blob().length() == 0); + BSLS_ASSERT_OPT(obj.blob()->length() == 0); PVV(L_ << ": Create an elector message"); @@ -591,12 +591,12 @@ static void test4_eventLoading() rc = obj.setMessage(electorMsg, bmqp::EventType::e_ELECTOR); BSLS_ASSERT_OPT(rc == 0); - BSLS_ASSERT_OPT(obj.blob().length() > 0); - BSLS_ASSERT_OPT(obj.blob().length() % 4 == 0); + BSLS_ASSERT_OPT(obj.blob()->length() > 0); + BSLS_ASSERT_OPT(obj.blob()->length() % 4 == 0); PVV(L_ << ": Decode and compare message"); - bmqp::Event electorEvent(&obj.blob(), s_allocator_p); + bmqp::Event electorEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(electorEvent.isValid()); BSLS_ASSERT_OPT(electorEvent.isElectorEvent()); @@ -610,7 +610,7 @@ static void test4_eventLoading() PVV(L_ << ": Reset"); obj.reset(); - BSLS_ASSERT_OPT(obj.blob().length() == 0); + BSLS_ASSERT_OPT(obj.blob()->length() == 0); } bmqp::ProtocolUtil::shutdown(); diff --git a/src/groups/bmq/bmqp/bmqp_eventutil.cpp b/src/groups/bmq/bmqp/bmqp_eventutil.cpp index f982fd8a6..415b32203 100644 --- a/src/groups/bmq/bmqp/bmqp_eventutil.cpp +++ b/src/groups/bmq/bmqp/bmqp_eventutil.cpp @@ -394,7 +394,7 @@ void Flattener::advanceEvent() BSLS_ASSERT_SAFE(d_builder.messageCount() > 0); BSLS_ASSERT_SAFE(!d_currEventInfo.d_ids.empty()); - d_eventInfos_p->emplace_back(d_builder.blob(), d_currEventInfo.d_ids); + d_eventInfos_p->emplace_back(*d_builder.blob(), d_currEventInfo.d_ids); d_currEventInfo.d_ids.clear(); d_builder.reset(); diff --git a/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp b/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp index ecd7d74a7..ffeea0f45 100644 --- a/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp @@ -305,7 +305,7 @@ static void test1_breathingTest() // Create event appendMessages(&pushEventBuilder, data); - bmqp::Event event(&(pushEventBuilder.blob()), s_allocator_p); + bmqp::Event event(pushEventBuilder.blob().get(), s_allocator_p); // 2) Flatten the event bsl::vector eventInfos(s_allocator_p); @@ -467,7 +467,7 @@ static void test2_flattenExplodesEvent() // Create event appendMessages(&pushEventBuilder, data); - bmqp::Event event(&(pushEventBuilder.blob()), s_allocator_p); + bmqp::Event event(pushEventBuilder.blob().get(), s_allocator_p); // 2) Flatten the event bsl::vector eventInfos(s_allocator_p); @@ -722,7 +722,7 @@ static void test3_flattenWithMessageProperties() logic); BSLS_ASSERT_OPT(result == bmqt::EventBuilderResult::e_SUCCESS); - bmqp::Event event(&(pushEventBuilder.blob()), s_allocator_p); + bmqp::Event event(pushEventBuilder.blob().get(), s_allocator_p); // 2) Flatten the event. bsl::vector eventInfos(s_allocator_p); diff --git a/src/groups/bmq/bmqp/bmqp_protocolutil.cpp b/src/groups/bmq/bmqp/bmqp_protocolutil.cpp index b429b5f96..79e567695 100644 --- a/src/groups/bmq/bmqp/bmqp_protocolutil.cpp +++ b/src/groups/bmq/bmqp/bmqp_protocolutil.cpp @@ -59,13 +59,11 @@ const char k_PADDING_DATA[9][8] = { /// Array of all potential padding buffers used for word and dword padding. bsls::ObjectBuffer g_paddingBlobBuffer[9]; +/// Static prefilled blobs respectively containing a heartbeat request and +/// a heartbeat response. bsls::ObjectBuffer g_heartbeatReqBlob; bsls::ObjectBuffer g_heartbeatRspBlob; -/// Static prefilled blobs respectively containing a heartbeat request, -/// heartbeat response and an empty blob. -bsls::ObjectBuffer g_emptyBlob; - /// Integer to keep track of the number of calls to `initialize` for the /// `ProtocolUtil`. If the value is non-zero, then it has already been /// initialized, otherwise it can be initialized. Each call to `initialize` @@ -160,9 +158,6 @@ void ProtocolUtil::initialize(bslma::Allocator* allocator) new (g_heartbeatRspBlob.buffer()) bdlbb::Blob(alloc); g_heartbeatRspBlob.object().appendDataBuffer(buffer); } - - // Create empty blob - new (g_emptyBlob.buffer()) bdlbb::Blob(alloc); } void ProtocolUtil::shutdown() @@ -178,7 +173,6 @@ void ProtocolUtil::shutdown() g_heartbeatRspBlob.object().bdlbb::Blob::~Blob(); g_heartbeatReqBlob.object().bdlbb::Blob::~Blob(); - g_emptyBlob.object().bdlbb::Blob::~Blob(); for (int i = 0; i < 9; ++i) { g_paddingBlobBuffer[i].object().reset(); @@ -292,11 +286,6 @@ const bdlbb::Blob& ProtocolUtil::heartbeatRspBlob() return g_heartbeatRspBlob.object(); } -const bdlbb::Blob& ProtocolUtil::emptyBlob() -{ - return g_emptyBlob.object(); -} - void ProtocolUtil::hexToBinary(char* buffer, int length, const char* hex) { for (int i = 0; i < length; ++i) { diff --git a/src/groups/bmq/bmqp/bmqp_protocolutil.h b/src/groups/bmq/bmqp/bmqp_protocolutil.h index b8c2af80f..38090bbad 100644 --- a/src/groups/bmq/bmqp/bmqp_protocolutil.h +++ b/src/groups/bmq/bmqp/bmqp_protocolutil.h @@ -284,13 +284,6 @@ struct ProtocolUtil { /// event. static const bdlbb::Blob& heartbeatRspBlob(); - /// Return const reference to the pre-allocated empty blob. If you need - /// an empty blob and not going to modify it use this method instead of - /// creating another blob. Heap allocate it to prevent 'exit-time- - /// destructor needed' compiler warning. Causes valgrind-reported - /// memory leak. - static const bdlbb::Blob& emptyBlob(); - /// Hex/Binary conversion ///--------------------- diff --git a/src/groups/bmq/bmqp/bmqp_protocolutil.t.cpp b/src/groups/bmq/bmqp/bmqp_protocolutil.t.cpp index 124b392fd..bfc055ed3 100644 --- a/src/groups/bmq/bmqp/bmqp_protocolutil.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_protocolutil.t.cpp @@ -517,13 +517,6 @@ static void test6_heartbeatAndEmptyBlobs() sizeof(bmqp::EventHeader))); } - PV("Verifying emtpy blob") - { - const bdlbb::Blob& blob = bmqp::ProtocolUtil::emptyBlob(); - ASSERT_EQ(0, blob.length()); - ASSERT_EQ(0, blob.numDataBuffers()); - } - bmqp::ProtocolUtil::shutdown(); } @@ -860,7 +853,7 @@ static void test11_parseMessageProperties() ASSERT_EQ(bmqt::EventBuilderResult::e_SUCCESS, builderResult); bmqp::PutMessageIterator putIt(&bufferFactory, s_allocator_p, true); - bmqp::Event rawEvent(&peb.blob(), s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(rawEvent.isPutEvent()); rawEvent.loadPutMessageIterator(&putIt); diff --git a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.cpp index f08d34eff..398cd1a94 100644 --- a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.cpp @@ -151,6 +151,7 @@ PushEventBuilder::PushEventBuilder(BlobSpPool* blobSpPool_p, : d_allocator_p(bslma::Default::allocator(allocator)) , d_blobSpPool_p(blobSpPool_p) , d_blob_sp(0, allocator) // initialized in `reset()` +, d_emptyBlob_sp(blobSpPool_p->getObject()) , d_msgCount(0) , d_options() , d_currPushHeader() @@ -158,6 +159,13 @@ PushEventBuilder::PushEventBuilder(BlobSpPool* blobSpPool_p, // PRECONDITIONS BSLS_ASSERT_SAFE(blobSpPool_p); + // Assume that items built with the given `blobSpPool_p` either all have or + // all don't have buffer factory, and check it once for `d_emptyBlob_sp`. + // We require this since we do `Blob::setLength`: + BSLS_ASSERT_SAFE( + NULL != d_emptyBlob_sp->factory() && + "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); + reset(); } @@ -170,11 +178,6 @@ int PushEventBuilder::reset() d_blob_sp = d_blobSpPool_p->getObject(); - // The following prerequisite is necessary since we do `Blob::setLength`: - BSLS_ASSERT_SAFE( - NULL != d_blob_sp->factory() && - "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); - d_msgCount = 0; d_options.reset(); @@ -401,19 +404,14 @@ PushEventBuilder::addMsgGroupIdOption(const Protocol::MsgGroupId& msgGroupId) } // ACCESSORS -const bdlbb::Blob& PushEventBuilder::blob() const +const bsl::shared_ptr& PushEventBuilder::blob() const { - // Fix packet's length in header now that we know it .. Following is valid - // (see comment in reset) - EventHeader& eh = *reinterpret_cast( - d_blob_sp->buffer(0).data()); - eh.setLength(d_blob_sp->length()); - - return *d_blob_sp; -} + // Empty event + if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { + BSLS_PERFORMANCEHINT_UNLIKELY_HINT; + return d_emptyBlob_sp; // RETURN + } -bsl::shared_ptr PushEventBuilder::blob_sp() const -{ // Fix packet's length in header now that we know it .. Following is valid // (see comment in reset) EventHeader& eh = *reinterpret_cast( diff --git a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.h b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.h index d35507334..f54f355fb 100644 --- a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.h @@ -40,18 +40,22 @@ /// Usage ///----- //.. -// bdlbb::PooledBlobBufferFactory bufferFactory(1024, d_allocator_p); -// bmqp::PushEventBuilder builder(&bufferFactory, d_allocator_p); +// bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +// bmqp::BlobPoolUtil::BlobSpPool blobSpPool( +// bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p)); +// bmqp::PushEventBuilder builder(&blobSpPool, d_allocator_p); // // // Append multiple messages // builder.packMessage("hello", 5, 0, bmqt::MessageGUID()); // builder.packMessage(myBlob, 0, bmqt::MessageGUID()); // -// const bdlbb::Blob& eventBlob = builder.blob(); +// const bsl::shared_ptr& eventBlob = builder.blob(); // // Send the blob ... // // // We can reset the builder to reuse it; note that this invalidates the -// // 'eventBlob' retrieved above +// // 'eventBlob' shared pointer reference retrieved above. To keep the +// // bdlbb::Blob valid the shared pointer should be copied, and the copy +// // should be passed and kept in IO components. // builder.reset(); //.. @@ -99,6 +103,9 @@ class PushEventBuilder { /// `mutable` to skip writing the length until the blob is retrieved. mutable bsl::shared_ptr d_blob_sp; + /// Empty blob to be returned when no messages were added to this builder. + bsl::shared_ptr d_emptyBlob_sp; + int d_msgCount; // number of messages currently in // the event. @@ -245,17 +252,13 @@ class PushEventBuilder { /// Return the number of messages currently in the event being built. int messageCount() const; - /// Return a reference not offering modifiable access to the blob built - /// by this event. If no messages were added, this will return a blob - /// composed only of an `EventHeader`. - const bdlbb::Blob& blob() const; - - /// Return a shared pointer to the built Blob. If no messages were added, - /// this will return an empty shared pointer. - /// Note that a shared pointer is returned by value, so the user holds to - /// the copy of a pointer. The Blob in that copy will be valid even if we - /// `reset` this builder and modify the internal shared pointer. - bsl::shared_ptr blob_sp() const; + /// Return a reference to the shared pointer to the built Blob. If no + /// messages were added, the Blob object under this reference will be + /// empty. + /// Note that this accessor exposes an internal shared pointer object, and + /// it is the user's responsibility to make a copy of it if it needs to be + /// passed and kept in another thread while this builder object is used. + const bsl::shared_ptr& blob() const; }; // ============================================================================ diff --git a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp index 8610d9bb7..a0d44e9a8 100644 --- a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp @@ -294,7 +294,7 @@ static void test1_breathingTest() bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p); ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_EQ(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); ASSERT_EQ(0, peb.messageCount()); // Add SubQueueInfo option @@ -306,7 +306,7 @@ static void test1_breathingTest() ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); // 'eventSize()' excludes unpacked messages ASSERT_LT(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // But the option is written to the underlying blob rc = peb.packMessage(payload, queueId, @@ -321,8 +321,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -410,7 +409,7 @@ static void test2_buildEventBackwardsCompatibility() bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p); ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_EQ(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); ASSERT_EQ(0, peb.messageCount()); // Add SubQueueInfo option @@ -422,7 +421,7 @@ static void test2_buildEventBackwardsCompatibility() ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); // 'eventSize()' excludes unpacked messages ASSERT_LT(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // But the option is written to the underlying blob rc = peb.packMessage(payload, queueId, @@ -437,8 +436,7 @@ static void test2_buildEventBackwardsCompatibility() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -523,7 +521,7 @@ static void test3_buildEventWithPackedOption() bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p); ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_EQ(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); ASSERT_EQ(0, peb.messageCount()); // Add SubQueueInfo option @@ -538,7 +536,7 @@ static void test3_buildEventWithPackedOption() ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); // 'eventSize()' excludes unpacked messages ASSERT_LT(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // But the option is written to the underlying blob rc = peb.packMessage(payload, queueId, @@ -553,8 +551,7 @@ static void test3_buildEventWithPackedOption() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -624,8 +621,7 @@ static void test4_buildEventWithMultipleMessages() } // Iterate and check - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -733,7 +729,7 @@ static void test5_buildEventWithPayloadTooBig() bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p); ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_EQ(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); ASSERT_EQ(0, peb.messageCount()); // Add a valid size option @@ -743,7 +739,7 @@ static void test5_buildEventWithPayloadTooBig() ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_LT(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // We expect 'addSubQueueInfosOption' to write directly to the underlying // blob ASSERT_EQ(0, peb.messageCount()); @@ -758,7 +754,7 @@ static void test5_buildEventWithPayloadTooBig() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_PAYLOAD_TOO_BIG); ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_EQ(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // Already-written options have to be removed if packing a message // fails ASSERT_EQ(0, peb.messageCount()); @@ -785,8 +781,7 @@ static void test5_buildEventWithPayloadTooBig() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -846,8 +841,8 @@ static void test6_buildEventWithImplicitPayload() // Get blob and use bmqp iterator to test. Note that bmqp event and bmqp // iterators are lower than bmqp builders, and thus, can be used to test // them. - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); + BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -951,7 +946,7 @@ static void test7_buildEventOptionTooBig() ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_EQ(sizeof(bmqp::EventHeader), - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); ASSERT_EQ(0, peb.messageCount()); // Add option of valid size @@ -974,7 +969,7 @@ static void test7_buildEventOptionTooBig() ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast(peb.eventSize())); ASSERT_LE(sizeof(bmqp::EventHeader) + sizeof(bmqp::PushHeader) + optionSize, - static_cast(peb.blob().length())); + static_cast(peb.blob()->length())); // Less than or equal due to possible padding ASSERT_EQ(0, peb.messageCount()); @@ -1002,15 +997,14 @@ static void test7_buildEventOptionTooBig() regularPayload.length(); ASSERT_LE(evtSizeUnpadded, peb.eventSize()); // Less than or equal due to possible padding - ASSERT_LE(evtSizeUnpadded, peb.blob().length()); + ASSERT_LE(evtSizeUnpadded, peb.blob()->length()); // Less than or equal due to possible padding ASSERT_EQ(1, peb.messageCount()); // Get blob and use bmqp iterator to test. Note that bmqp event and bmqp // iterators are lower than bmqp builders, and thus, can be used to test // them. - const bdlbb::Blob& eventBlob = peb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT_SAFE(true == rawEvent.isValid()); BSLS_ASSERT_SAFE(true == rawEvent.isPushEvent()); @@ -1197,7 +1191,7 @@ static void testN1_decodeFromFile() BSLS_ASSERT(ofile.good() == true); - bdlbb::BlobUtil::copy(buf, obj.blob(), 0, obj.blob().length()); + bdlbb::BlobUtil::copy(buf, *obj.blob(), 0, obj.blob()->length()); ofile.write(buf, k_SIZE); ofile.close(); bsl::memset(buf, 0, k_SIZE); @@ -1216,9 +1210,9 @@ static void testN1_decodeFromFile() bdlbb::BlobBuffer dataBlobBuffer(dataBufferSp, k_SIZE); outBlob.appendDataBuffer(dataBlobBuffer); - outBlob.setLength(obj.blob().length()); + outBlob.setLength(obj.blob()->length()); - ASSERT_EQ(bdlbb::BlobUtil::compare(obj.blob(), outBlob), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(*obj.blob(), outBlob), 0); // Decode event bmqp::Event rawEvent(&outBlob, s_allocator_p); diff --git a/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp index 47f2aa0c6..9da9d8d1a 100644 --- a/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp @@ -428,18 +428,7 @@ bmqt::EventBuilderResult::Enum PutEventBuilder::packMessageRaw(int queueId) return packMessageInternal(*d_blobPayload_p, queueId); } -const bdlbb::Blob& PutEventBuilder::blob() const -{ - // Fix packet's length in header now that we know it .. Following is valid - // (see comment in reset) - EventHeader& eh = *reinterpret_cast( - d_blob_sp->buffer(0).data()); - eh.setLength(d_blob_sp->length()); - - return *d_blob_sp; -} - -bsl::shared_ptr PutEventBuilder::blob_sp() const +const bsl::shared_ptr& PutEventBuilder::blob() const { // Fix packet's length in header now that we know it .. Following is valid // (see comment in reset) diff --git a/src/groups/bmq/bmqp/bmqp_puteventbuilder.h b/src/groups/bmq/bmqp/bmqp_puteventbuilder.h index 939478fd4..ef8a7d4e2 100644 --- a/src/groups/bmq/bmqp/bmqp_puteventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_puteventbuilder.h @@ -40,8 +40,10 @@ /// Usage ///----- //.. -// bdlbb::PooledBlobBufferFactory bufferFactory(1024, d_allocator_p); -// bmqp::PutEventBuilder builder(&bufferFactory, d_allocator_p); +// bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +// bmqp::BlobPoolUtil::BlobSpPool blobSpPool( +// bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p)); +// bmqp::PutEventBuilder builder(&blobSpPool, d_allocator_p); // // // Append multiple messages // // (Error handling omitted below for brevity) @@ -51,11 +53,13 @@ // // // Repeat above steps if adding more messages to this event is desired. // -// const bdlbb::Blob& eventBlob = builder.blob(); +// const bsl::shared_ptr& eventBlob = builder.blob(); // // Send the blob ... // // // We can reset the builder to reuse it; note that this invalidates the -// // 'eventBlob' retrieved above +// // 'eventBlob' shared pointer reference retrieved above. To keep the +// // bdlbb::Blob valid the shared pointer should be copied, and the copy +// // should be passed and kept in IO components. // builder.reset(); //.. // @@ -339,17 +343,13 @@ class PutEventBuilder { /// message was not compressed, a value of 1 is returned. double lastPackedMesageCompressionRatio() const; - /// Return a reference not offering modifiable access to the blob built - /// by this event. If no messages were added, this will return a blob - /// composed only of an `EventHeader`. - const bdlbb::Blob& blob() const; - - /// Return a shared pointer to the built Blob. If no messages were added, - /// this will return an empty shared pointer. - /// Note that a shared pointer is returned by value, so the user holds to - /// the copy of a pointer. The Blob in that copy will be valid even if we - /// `reset` this builder and modify the internal shared pointer. - bsl::shared_ptr blob_sp() const; + /// Return a reference to the shared pointer to the built Blob. If no + /// messages were added, the Blob object under this reference will be + /// empty. + /// Note that this accessor exposes an internal shared pointer object, and + /// it is the user's responsibility to make a copy of it if it needs to be + /// passed and kept in another thread while this builder object is used. + const bsl::shared_ptr& blob() const; const bmqp::MessageProperties* messageProperties() const; }; diff --git a/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp index 76665aba3..58f284bd6 100644 --- a/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_puteventbuilder.t.cpp @@ -335,8 +335,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(rawEvent.isValid()); BSLS_ASSERT_OPT(rawEvent.isPutEvent()); @@ -435,7 +434,7 @@ static void test1_breathingTest() ASSERT_GT(obj.eventSize(), k_PAYLOAD_BIGGER_LEN); ASSERT_EQ(obj.messageCount(), 1); - rawEvent.reset(&obj.blob()); + rawEvent.reset(obj.blob().get()); rawEvent.loadPutMessageIterator(&putIter, true); ASSERT_EQ(1, putIter.next()); @@ -559,8 +558,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(rawEvent.isValid()); BSLS_ASSERT_OPT(rawEvent.isPutEvent()); @@ -665,7 +663,7 @@ static void test1_breathingTest() ASSERT_LT(obj.eventSize(), k_PAYLOAD_BIGGER_LEN); ASSERT_EQ(obj.messageCount(), 1); - rawEvent.reset(&obj.blob()); + rawEvent.reset(obj.blob().get()); rawEvent.loadPutMessageIterator(&putIter, true); ASSERT_EQ(1, putIter.next()); @@ -797,8 +795,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(rawEvent.isValid()); BSLS_ASSERT_OPT(rawEvent.isPutEvent()); @@ -909,7 +906,7 @@ static void test1_breathingTest() ASSERT_LT(obj.eventSize(), k_PAYLOAD_BIGGER_LEN); ASSERT_EQ(obj.messageCount(), 1); - rawEvent.reset(&obj.blob()); + rawEvent.reset(obj.blob().get()); rawEvent.loadPutMessageIterator(&putIter, true); ASSERT_EQ(1, putIter.next()); @@ -1040,8 +1037,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(rawEvent.isValid()); BSLS_ASSERT_OPT(rawEvent.isPutEvent()); @@ -1142,7 +1138,7 @@ static void test1_breathingTest() ASSERT_GT(obj.eventSize(), k_PAYLOAD_LEN); ASSERT_EQ(obj.messageCount(), 1); - rawEvent.reset(&obj.blob()); + rawEvent.reset(obj.blob().get()); rawEvent.loadPutMessageIterator(&putIter, true); ASSERT_EQ(1, putIter.next()); @@ -1267,8 +1263,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(rawEvent.isValid()); BSLS_ASSERT_OPT(rawEvent.isPutEvent()); @@ -1368,7 +1363,7 @@ static void test1_breathingTest() ASSERT_GT(obj.eventSize(), k_PAYLOAD_BIGGER_LEN); ASSERT_EQ(obj.messageCount(), 1); - rawEvent.reset(&obj.blob()); + rawEvent.reset(obj.blob().get()); rawEvent.loadPutMessageIterator(&putIter, true); ASSERT_EQ(1, putIter.next()); @@ -1471,8 +1466,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(rawEvent.isValid()); BSLS_ASSERT_OPT(rawEvent.isPutEvent()); @@ -1553,7 +1547,7 @@ static void test1_breathingTest() ASSERT_GT(obj.eventSize(), payload.length()); ASSERT_EQ(obj.messageCount(), 1); - rawEvent.reset(&obj.blob()); + rawEvent.reset(obj.blob().get()); rawEvent.loadPutMessageIterator(&putIter, true); ASSERT_EQ(1, putIter.next()); @@ -1711,8 +1705,7 @@ static void test2_manipulators_one() } // Iterate and check - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT_OPT(true == rawEvent.isValid()); BSLS_ASSERT_OPT(true == rawEvent.isPutEvent()); @@ -1842,8 +1835,7 @@ static void test3_eventTooBig() // Get blob and use bmqp iterator to test. Note that bmqp event and bmqp // iterators are lower than bmqp builders, and thus, can be used to test // them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); BSLS_ASSERT(true == rawEvent.isPutEvent()); @@ -1921,8 +1913,7 @@ static void test4_manipulators_two() } // Iterate and check - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); BSLS_ASSERT(true == rawEvent.isPutEvent()); @@ -1995,8 +1986,7 @@ static void test5_putEventWithZeroLengthMessage() ASSERT_EQ_D(0, rc, bmqt::EventBuilderResult::e_SUCCESS); // Iterate and check - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); BSLS_ASSERT(true == rawEvent.isPutEvent()); @@ -2210,8 +2200,7 @@ static void test7_multiplePackMessage() // Get blob and use bmqp iterator to test. Note that bmqp event and // bmqp iterators are lower than bmqp builders, and thus, can be used // to test them. - const bdlbb::Blob& eventBlob = obj.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(obj.blob().get(), s_allocator_p); ASSERT(rawEvent.isValid()); ASSERT(rawEvent.isPutEvent()); @@ -2301,7 +2290,7 @@ static void test7_multiplePackMessage() #endif ASSERT_EQ(obj.compressionAlgorithmType(), bmqt::CompressionAlgorithmType::e_NONE); - rawEvent.reset(&obj.blob()); + rawEvent.reset(obj.blob().get()); rawEvent.loadPutMessageIterator(&putIter, true); // we want to test the 3rd message so we call next thrice @@ -2435,7 +2424,7 @@ static void testN1_decodeFromFile() BSLS_ASSERT(ofile.good() == true); - bdlbb::BlobUtil::copy(buf, obj.blob(), 0, obj.blob().length()); + bdlbb::BlobUtil::copy(buf, *obj.blob(), 0, obj.blob()->length()); ofile.write(buf, k_SIZE); ofile.close(); bsl::memset(buf, 0, k_SIZE); @@ -2454,9 +2443,9 @@ static void testN1_decodeFromFile() bdlbb::BlobBuffer dataBlobBuffer(dataBufferSp, k_SIZE); outBlob.appendDataBuffer(dataBlobBuffer); - outBlob.setLength(obj.blob().length()); + outBlob.setLength(obj.blob()->length()); - ASSERT_EQ(bdlbb::BlobUtil::compare(obj.blob(), outBlob), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(*obj.blob(), outBlob), 0); // Decode event bmqp::Event rawEvent(&outBlob, s_allocator_p); diff --git a/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.cpp index 650579bbd..637351a76 100644 --- a/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.cpp @@ -43,11 +43,19 @@ RecoveryEventBuilder::RecoveryEventBuilder(BlobSpPool* blobSpPool_p, bslma::Allocator* allocator) : d_blobSpPool_p(blobSpPool_p) , d_blob_sp(0, allocator) // initialized in `reset()` +, d_emptyBlob_sp(blobSpPool_p->getObject()) , d_msgCount(0) { // PRECONDITIONS BSLS_ASSERT_SAFE(blobSpPool_p); + // Assume that items built with the given `blobSpPool_p` either all have or + // all don't have buffer factory, and check it once for `d_emptyBlob_sp`. + // We require this since we do `Blob::setLength`: + BSLS_ASSERT_SAFE( + NULL != d_emptyBlob_sp->factory() && + "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); + reset(); } @@ -56,11 +64,6 @@ void RecoveryEventBuilder::reset() { d_blob_sp = d_blobSpPool_p->getObject(); - // The following prerequisite is necessary since we do `Blob::setLength`: - BSLS_ASSERT_SAFE( - NULL != d_blob_sp->factory() && - "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); - d_msgCount = 0; // NOTE: Since RecoveryEventBuilder owns the blob and we just reset it, we @@ -151,27 +154,11 @@ RecoveryEventBuilder::packMessage(unsigned int partitionId, return bmqt::EventBuilderResult::e_SUCCESS; } -const bdlbb::Blob& RecoveryEventBuilder::blob() const -{ - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return ProtocolUtil::emptyBlob(); // RETURN - } - - // Fix packet's length in header now that we know it .. Following is valid - // (see comment in reset) - EventHeader& eh = *reinterpret_cast( - d_blob_sp->buffer(0).data()); - eh.setLength(d_blob_sp->length()); - - return *d_blob_sp; -} - -bsl::shared_ptr RecoveryEventBuilder::blob_sp() const +const bsl::shared_ptr& RecoveryEventBuilder::blob() const { if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return bsl::shared_ptr(); // RETURN + return d_emptyBlob_sp; // RETURN } // Fix packet's length in header now that we know it .. Following is valid diff --git a/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.h b/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.h index 5a74b0815..e75398b4e 100644 --- a/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.h @@ -42,18 +42,22 @@ /// Usage ///----- //.. -// bdlbb::PooledBlobBufferFactory bufferFactory(1024, d_allocator_p); -// bmqp::RecoveryEventBuilder builder(&bufferFactory, d_allocator_p); +// bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +// bmqp::BlobPoolUtil::BlobSpPool blobSpPool( +// bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p)); +// bmqp::RecoveryEventBuilder builder(&blobSpPool, d_allocator_p); // // // Append multiple messages // builder.appendMessage(0, 1, bmqt::MessageGUID(), 1); // builder.appendMessage(-1, 2, bmqt::MessageGUID(), 1); // -// const bdlbb::Blob& eventBlob = builder.blob(); +// const bsl::shared_ptr& eventBlob = builder.blob(); // // Send the blob ... // // // We can reset the builder to reuse it; note that this invalidates the -// // 'eventBlob' retrieved above +// // 'eventBlob' shared pointer reference retrieved above. To keep the +// // bdlbb::Blob valid the shared pointer should be copied, and the copy +// // should be passed and kept in IO components. // builder.reset(); // //.. @@ -95,6 +99,9 @@ class RecoveryEventBuilder BSLS_CPP11_FINAL { /// `mutable` to skip writing the length until the blob is retrieved. mutable bsl::shared_ptr d_blob_sp; + /// Empty blob to be returned when no messages were added to this builder. + bsl::shared_ptr d_emptyBlob_sp; + int d_msgCount; // number of messages currently in the // event @@ -149,17 +156,13 @@ class RecoveryEventBuilder BSLS_CPP11_FINAL { /// Return the number of messages currently in the event being built. int messageCount() const; - /// Return a reference not offering modifiable access to the blob built - /// by this event. If no messages were added, this will return an empty - /// blob, i.e., a blob with length == 0. - const bdlbb::Blob& blob() const; - - /// Return a shared pointer to the built Blob. If no messages were added, - /// this will return an empty shared pointer. - /// Note that a shared pointer is returned by value, so the user holds to - /// the copy of a pointer. The Blob in that copy will be valid even if we - /// `reset` this builder and modify the internal shared pointer. - bsl::shared_ptr blob_sp() const; + /// Return a reference to the shared pointer to the built Blob. If no + /// messages were added, the Blob object under this reference will be + /// empty. + /// Note that this accessor exposes an internal shared pointer object, and + /// it is the user's responsibility to make a copy of it if it needs to be + /// passed and kept in another thread while this builder object is used. + const bsl::shared_ptr& blob() const; }; // ============================================================================ diff --git a/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.t.cpp index 5784d74d8..54842f55d 100644 --- a/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_recoveryeventbuilder.t.cpp @@ -148,9 +148,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test. Note that bmqp event and bmqp // iterators are lower than bmqp builders, and thus, can be used to test // them. - - const bdlbb::Blob& eventBlob = reb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(reb.blob().get(), s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); BSLS_ASSERT(true == rawEvent.isRecoveryEvent()); @@ -178,7 +176,7 @@ static void test1_breathingTest() ASSERT_EQ(recoveryIter.loadChunkPosition(&position), 0); int res, compareResult; res = bmqu::BlobUtil::compareSection(&compareResult, - eventBlob, + *reb.blob(), position, CHUNK, CHUNK_LEN); @@ -219,8 +217,7 @@ static void test2_multipleMessagesTest() } // Iterate and check - const bdlbb::Blob& eventBlob = reb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(reb.blob().get(), s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); BSLS_ASSERT(true == rawEvent.isRecoveryEvent()); @@ -257,7 +254,7 @@ static void test2_multipleMessagesTest() int res, compareResult; res = bmqu::BlobUtil::compareSection(&compareResult, - eventBlob, + *reb.blob(), chunkPosition, D.d_chunk.c_str(), D.d_chunk.size()); @@ -327,8 +324,7 @@ static void test3_eventTooBigTest() static_cast(reb.eventSize())); ASSERT_EQ(reb.messageCount(), 1); - const bdlbb::Blob& eventBlob = reb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(reb.blob().get(), s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); BSLS_ASSERT(true == rawEvent.isRecoveryEvent()); @@ -353,7 +349,7 @@ static void test3_eventTooBigTest() ASSERT_EQ(recoveryIter.loadChunkPosition(&position), 0); int res, compareResult; res = bmqu::BlobUtil::compareSection(&compareResult, - eventBlob, + *reb.blob(), position, k_SMALL_CHUNK, k_SMALL_CHUNK_LEN); @@ -395,8 +391,7 @@ static void test4_emptyPayloadTest() // static_cast(reb.eventSize())); ASSERT_EQ(reb.messageCount(), 1); - const bdlbb::Blob& eventBlob = reb.blob(); - bmqp::Event rawEvent(&eventBlob, s_allocator_p); + bmqp::Event rawEvent(reb.blob().get(), s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); BSLS_ASSERT(true == rawEvent.isRecoveryEvent()); diff --git a/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.cpp index a4d707766..97446aa37 100644 --- a/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.cpp @@ -38,11 +38,19 @@ RejectEventBuilder::RejectEventBuilder(BlobSpPool* blobSpPool_p, bslma::Allocator* allocator) : d_blobSpPool_p(blobSpPool_p) , d_blob_sp(0, allocator) // initialized in `reset()` +, d_emptyBlob_sp(blobSpPool_p->getObject()) , d_msgCount(0) { // PRECONDITIONS BSLS_ASSERT_SAFE(blobSpPool_p); + // Assume that items built with the given `blobSpPool_p` either all have or + // all don't have buffer factory, and check it once for `d_emptyBlob_sp`. + // We require this since we do `Blob::setLength`: + BSLS_ASSERT_SAFE( + NULL != d_emptyBlob_sp->factory() && + "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); + reset(); } @@ -50,11 +58,6 @@ void RejectEventBuilder::reset() { d_blob_sp = d_blobSpPool_p->getObject(); - // The following prerequisite is necessary since we do `Blob::setLength`: - BSLS_ASSERT_SAFE( - NULL != d_blob_sp->factory() && - "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); - d_msgCount = 0; // NOTE: Since RejectEventBuilder owns the blob and we just reset it, we @@ -117,27 +120,7 @@ RejectEventBuilder::appendMessage(int queueId, return bmqt::EventBuilderResult::e_SUCCESS; } -const bdlbb::Blob& RejectEventBuilder::blob() const -{ - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); - - // Empty event - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return ProtocolUtil::emptyBlob(); // RETURN - } - - // Fix packet's length in header now that we know it. Following is valid - // (see comment in reset). - EventHeader& eh = *reinterpret_cast( - d_blob_sp->buffer(0).data()); - eh.setLength(d_blob_sp->length()); - - return *d_blob_sp; -} - -bsl::shared_ptr RejectEventBuilder::blob_sp() const +const bsl::shared_ptr& RejectEventBuilder::blob() const { // PRECONDITIONS BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); @@ -145,7 +128,7 @@ bsl::shared_ptr RejectEventBuilder::blob_sp() const // Empty event if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return bsl::shared_ptr(); // RETURN + return d_emptyBlob_sp; // RETURN } // Fix packet's length in header now that we know it. Following is valid diff --git a/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.h b/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.h index cf4448611..46e814a33 100644 --- a/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.h @@ -41,18 +41,22 @@ /// Usage ///----- //.. -// bdlbb::BlobPooledBlobBufferFactory bufferFactory(1024, d_allocator_p); -// bmqp::RejectEventBuilder builder(&bufferFactory, d_allocator_p); +// bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +// bmqp::BlobPoolUtil::BlobSpPool blobSpPool( +// bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p)); +// bmqp::RejectEventBuilder builder(&blobSpPool, d_allocator_p); // // // Append multiple messages, from same or different queue // builder.appendMessage(k_QUEUEID1, k_SUBQUEUEID1, bmqt::MessageGUID(), 5); // builder.appendMessage(k_QUEUEID2, k_SUBQUEUEID2, bmqt::MessageGUID(), 16); // -// const bdlbb::Blob& eventBlob = builder.blob(); +// const bsl::shared_ptr& eventBlob = builder.blob(); // // Send the blob ... // // // We can reset the builder to reuse it; note that this invalidates the -// // 'eventBlob' retrieved above +// // 'eventBlob' shared pointer reference retrieved above. To keep the +// // bdlbb::Blob valid the shared pointer should be copied, and the copy +// // should be passed and kept in IO components. // builder.reset(); // //.. @@ -92,6 +96,9 @@ class RejectEventBuilder { /// `mutable` to skip writing the length until the blob is retrieved. mutable bsl::shared_ptr d_blob_sp; + /// Empty blob to be returned when no messages were added to this builder. + bsl::shared_ptr d_emptyBlob_sp; + int d_msgCount; // number of messages currently in the // event @@ -111,9 +118,9 @@ class RejectEventBuilder { public: // CREATORS - /// Create a new `PushEventBuilder` using the specified `blobSpPool_p` and - /// `allocator` for the blob. We require BlobSpPool to build Blobs with - /// set BlobBufferFactory since we might want to expand the built Blob + /// Create a new `RejectEventBuilder` using the specified `blobSpPool_p` + /// and `allocator` for the blob. We require BlobSpPool to build Blobs + /// with set BlobBufferFactory since we might want to expand the built Blob /// dynamically. RejectEventBuilder(BlobSpPool* blobSpPool_p, bslma::Allocator* allocator); @@ -144,17 +151,13 @@ class RejectEventBuilder { /// were added, this will return 0. int eventSize() const; - /// Return a reference not offering modifiable access to the blob built - /// by this event. If no messages were added, this will return an empty - /// blob, i.e., a blob with length == 0.*/ - const bdlbb::Blob& blob() const; - - /// Return a shared pointer to the built Blob. If no messages were added, - /// this will return an empty shared pointer. - /// Note that a shared pointer is returned by value, so the user holds to - /// the copy of a pointer. The Blob in that copy will be valid even if we - /// `reset` this builder and modify the internal shared pointer. - bsl::shared_ptr blob_sp() const; + /// Return a reference to the shared pointer to the built Blob. If no + /// messages were added, the Blob object under this reference will be + /// empty. + /// Note that this accessor exposes an internal shared pointer object, and + /// it is the user's responsibility to make a copy of it if it needs to be + /// passed and kept in another thread while this builder object is used. + const bsl::shared_ptr& blob() const; }; // ============================================================================ diff --git a/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.t.cpp index 55f498cab..802c8a099 100644 --- a/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_rejecteventbuilder.t.cpp @@ -95,10 +95,10 @@ static void verifyContent(const bmqp::RejectEventBuilder& builder, (data.size() * sizeof(bmqp::RejectMessage)); ASSERT_EQ(static_cast(builder.messageCount()), data.size()); ASSERT_EQ(static_cast(builder.eventSize()), expectedSize); - ASSERT_EQ(static_cast(builder.blob().length()), expectedSize); + ASSERT_EQ(static_cast(builder.blob()->length()), expectedSize); PVV("Iterating over messages"); - bmqp::Event event(&builder.blob(), s_allocator_p); + bmqp::Event event(builder.blob().get(), s_allocator_p); ASSERT(event.isValid()); ASSERT(event.isRejectEvent()); @@ -144,7 +144,7 @@ static void test1_breathingTest() ASSERT_EQ(obj.messageCount(), 0); ASSERT_NE(obj.maxMessageCount(), 0); ASSERT_EQ(obj.eventSize(), 0); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); PVV("Appending one message"); appendMessages(&obj, &messages, 1); @@ -193,7 +193,7 @@ static void test3_reset() PV("Verifying accessors"); ASSERT_EQ(obj.messageCount(), 0); ASSERT_EQ(obj.eventSize(), 0); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); PV("Appending another message"); messages.clear(); @@ -277,11 +277,11 @@ static void testN1_decodeFromFile() PVV("Appending messages"); appendMessages(&obj, &messages, k_NUM_MSGS); - ASSERT_NE(obj.blob().length(), 0); + ASSERT_NE(obj.blob()->length(), 0); os << "msg_reject_" << guid << ".bin" << bsl::ends; - const int blobLen = obj.blob().length(); + const int blobLen = obj.blob()->length(); char* buf = new char[blobLen]; /// Functor invoked to delete the file at the specified `filePath` @@ -301,7 +301,7 @@ static void testN1_decodeFromFile() bsl::ofstream ofile(os.str().data(), bsl::ios::binary); BSLS_ASSERT(ofile.good()); - bdlbb::BlobUtil::copy(buf, obj.blob(), 0, blobLen); + bdlbb::BlobUtil::copy(buf, *obj.blob(), 0, blobLen); ofile.write(buf, blobLen); ofile.close(); bsl::memset(buf, 0, blobLen); @@ -321,7 +321,7 @@ static void testN1_decodeFromFile() outBlob.appendDataBuffer(dataBlobBuffer); outBlob.setLength(blobLen); - ASSERT_EQ(bdlbb::BlobUtil::compare(obj.blob(), outBlob), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(*obj.blob(), outBlob), 0); // Decode event bmqp::Event event(&outBlob, s_allocator_p); diff --git a/src/groups/bmq/bmqp/bmqp_requestmanager.h b/src/groups/bmq/bmqp/bmqp_requestmanager.h index c5e949f62..3888cc306 100644 --- a/src/groups/bmq/bmqp/bmqp_requestmanager.h +++ b/src/groups/bmq/bmqp/bmqp_requestmanager.h @@ -1365,7 +1365,7 @@ bmqt::GenericResult::Enum RequestManager::sendRequest( // Send the request request->d_sendTime = bmqsys::Time::highResolutionTimer(); - bmqt::GenericResult::Enum sendRc = sendFn(d_schemaEventBuilder.blob_sp()); + bmqt::GenericResult::Enum sendRc = sendFn(d_schemaEventBuilder.blob()); if (sendRc != bmqt::GenericResult::e_SUCCESS) { bmqu::MemOutStream errorDesc; errorDesc << "WRITE_FAILED, status: " << sendRc; diff --git a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.h b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.h index 7d5f9bafe..e19932a04 100644 --- a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.h @@ -163,13 +163,7 @@ class SchemaEventBuilder { /// Note that if `setMessage` has not been called on this /// SchemaEventBuilder, or if `reset` has been called since, the blob /// returned will be an empty one. - const bdlbb::Blob& blob() const; - - /// Return the fully formatted blob corresponding to the message built. - /// Note that if `setMessage` has not been called on this - /// SchemaEventBuilder, or if `reset` has been called since, the blob - /// returned will be an empty one. - bsl::shared_ptr blob_sp() const; + const bsl::shared_ptr& blob() const; }; // ============================= @@ -291,16 +285,7 @@ int SchemaEventBuilder::setMessage(const TYPE& message, EventType::Enum type) return 0; } -inline const bdlbb::Blob& SchemaEventBuilder::blob() const -{ - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); - BSLS_ASSERT_SAFE(d_blob_sp->length() % 4 == 0); - - return *d_blob_sp; -} - -inline bsl::shared_ptr SchemaEventBuilder::blob_sp() const +inline const bsl::shared_ptr& SchemaEventBuilder::blob() const { // PRECONDITIONS BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT); diff --git a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.t.cpp index daf515e59..170e2db6b 100644 --- a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.t.cpp @@ -79,7 +79,7 @@ static void test1_breathingTest() s_allocator_p); PVV(test.d_line << ": Verifying accessors"); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); PVV(test.d_line << ": Create a message"); bmqp_ctrlmsg::ControlMessage message(s_allocator_p); @@ -90,11 +90,11 @@ static void test1_breathingTest() // Encode the message rc = obj.setMessage(message, bmqp::EventType::e_CONTROL); ASSERT_EQ(rc, 0); - ASSERT_NE(obj.blob().length(), 0); - ASSERT_EQ(obj.blob().length() % 4, 0); + ASSERT_NE(obj.blob()->length(), 0); + ASSERT_EQ(obj.blob()->length() % 4, 0); PVV(test.d_line << ": Decode and compare message"); - bmqp::Event event(&obj.blob(), s_allocator_p); + bmqp::Event event(obj.blob().get(), s_allocator_p); ASSERT_EQ(event.isValid(), true); ASSERT_EQ(event.isControlEvent(), true); @@ -109,7 +109,7 @@ static void test1_breathingTest() PVV("Reset"); obj.reset(); - ASSERT_EQ(obj.blob().length(), 0); + ASSERT_EQ(obj.blob()->length(), 0); } } @@ -131,7 +131,7 @@ void testDecodeFromFileHelper(bmqp::SchemaEventBuilder* obj, // Encode the message rc = obj->setMessage(message, bmqp::EventType::e_CONTROL); ASSERT_EQ(rc, 0); - ASSERT_NE(obj->blob().length(), 0); + ASSERT_NE(obj->blob()->length(), 0); bmqu::MemOutStream os(s_allocator_p); bdlb::Guid guid = bdlb::GuidUtil::generate(); @@ -154,15 +154,15 @@ void testDecodeFromFileHelper(bmqp::SchemaEventBuilder* obj, BSLS_ASSERT(ofile.good() == true); - const int blobLen = obj->blob().length(); + const int blobLen = obj->blob()->length(); char* buf = new char[blobLen]; - bdlbb::BlobUtil::copy(buf, obj->blob(), 0, blobLen); + bdlbb::BlobUtil::copy(buf, *obj->blob(), 0, blobLen); ofile.write(buf, blobLen); ofile.close(); bsl::memset(buf, 0, blobLen); obj->reset(); - ASSERT_EQ(obj->blob().length(), 0); + ASSERT_EQ(obj->blob()->length(), 0); // Read blob from file bsl::ifstream ifile(os.str().data(), bsl::ios::binary); diff --git a/src/groups/bmq/bmqp/bmqp_storageeventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_storageeventbuilder.cpp index a9d178ba6..762b5ff75 100644 --- a/src/groups/bmq/bmqp/bmqp_storageeventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_storageeventbuilder.cpp @@ -111,12 +111,21 @@ StorageEventBuilder::StorageEventBuilder(int storageProtocolVersion, , d_storageProtocolVersion(storageProtocolVersion) , d_eventType(eventType) , d_blob_sp(0, allocator) // initialized in `reset()` +, d_emptyBlob_sp(blobSpPool_p->getObject()) , d_msgCount(0) { // PRECONDITIONS BSLS_ASSERT_SAFE(blobSpPool_p); BSLS_ASSERT_SAFE(EventType::e_STORAGE == eventType || EventType::e_PARTITION_SYNC == eventType); + + // Assume that items built with the given `blobSpPool_p` either all have or + // all don't have buffer factory, and check it once for `d_emptyBlob_sp`. + // We require this since we do `Blob::setLength`: + BSLS_ASSERT_SAFE( + NULL != d_emptyBlob_sp->factory() && + "Passed BlobSpPool must build Blobs with set BlobBufferFactory"); + reset(); } @@ -181,27 +190,11 @@ StorageEventBuilder::packMessageRaw(const bdlbb::Blob& blob, return bmqt::EventBuilderResult::e_SUCCESS; } -const bdlbb::Blob& StorageEventBuilder::blob() const -{ - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return ProtocolUtil::emptyBlob(); // RETURN - } - - // Fix packet's length in header now that we know it .. Following is valid - // (see comment in reset) - EventHeader& eh = *reinterpret_cast( - d_blob_sp->buffer(0).data()); - eh.setLength(d_blob_sp->length()); - - return *d_blob_sp; -} - -bsl::shared_ptr StorageEventBuilder::blob_sp() const +const bsl::shared_ptr& StorageEventBuilder::blob() const { if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - return bsl::shared_ptr(); // RETURN + return d_emptyBlob_sp; // RETURN } // Fix packet's length in header now that we know it .. Following is valid diff --git a/src/groups/bmq/bmqp/bmqp_storageeventbuilder.h b/src/groups/bmq/bmqp/bmqp_storageeventbuilder.h index 4132ab149..84b3015f7 100644 --- a/src/groups/bmq/bmqp/bmqp_storageeventbuilder.h +++ b/src/groups/bmq/bmqp/bmqp_storageeventbuilder.h @@ -42,18 +42,22 @@ /// Usage ///----- //.. -// bdlbb::PooledBlobBufferFactory bufferFactory(1024, d_allocator_p); -// bmqp::StorageEventBuilder builder(&bufferFactory, d_allocator_p); +// bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p); +// bmqp::BlobPoolUtil::BlobSpPool blobSpPool( +// bmqp::BlobPoolUtil::createBlobPool(&bufferFactory, s_allocator_p)); +// bmqp::StorageEventBuilder builder(&blobSpPool, d_allocator_p); // // // Append multiple messages // builder.packMessage(...); -// builder.packMessage(...; +// builder.packMessage(...); // -// const bdlbb::Blob& eventBlob = builder.blob(); +// const bsl::shared_ptr& eventBlob = builder.blob(); // // Send the blob ... // // // We can reset the builder to reuse it; note that this invalidates the -// // 'eventBlob' retrieved above +// // 'eventBlob' shared pointer reference retrieved above. To keep the +// // bdlbb::Blob valid the shared pointer should be copied, and the copy +// // should be passed and kept in IO components. // builder.reset(); // //.. @@ -105,6 +109,9 @@ class StorageEventBuilder BSLS_CPP11_FINAL { /// `mutable` to skip writing the length until the blob is retrieved. mutable bsl::shared_ptr d_blob_sp; + /// Empty blob to be returned when no messages were added to this builder. + bsl::shared_ptr d_emptyBlob_sp; + int d_msgCount; // number of messages currently in the event @@ -199,17 +206,13 @@ class StorageEventBuilder BSLS_CPP11_FINAL { /// Return the number of messages currently in the event being built. int messageCount() const; - /// Return a reference not offering modifiable access to the blob built - /// by this event. If no messages were added, this will return an empty - /// blob, i.e., a blob with length == 0. - const bdlbb::Blob& blob() const; - - /// Return a shared pointer to the built Blob. If no messages were added, - /// this will return an empty shared pointer. - /// Note that a shared pointer is returned by value, so the user holds to - /// the copy of a pointer. The Blob in that copy will be valid even if we - /// `reset` this builder and modify the internal shared pointer. - bsl::shared_ptr blob_sp() const; + /// Return a reference to the shared pointer to the built Blob. If no + /// messages were added, the Blob object under this reference will be + /// empty. + /// Note that this accessor exposes an internal shared pointer object, and + /// it is the user's responsibility to make a copy of it if it needs to be + /// passed and kept in another thread while this builder object is used. + const bsl::shared_ptr& blob() const; }; // ============================================================================ diff --git a/src/groups/bmq/bmqp/bmqp_storageeventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_storageeventbuilder.t.cpp index 39c5cf1e7..2e7ad389d 100644 --- a/src/groups/bmq/bmqp/bmqp_storageeventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_storageeventbuilder.t.cpp @@ -229,7 +229,7 @@ static void test1_breathingTest() // Get blob and use bmqp iterator to test // Note that bmqp event and bmqp iterators are lower than bmqp builders, // and thus, can be used to test them. - const bdlbb::Blob& eventBlob = seb.blob(); + const bdlbb::Blob& eventBlob = *seb.blob(); bmqp::Event rawEvent(&eventBlob, s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); @@ -305,7 +305,7 @@ static void test2_storageEventHavingMultipleMessages() } // Iterate and check - const bdlbb::Blob& eventBlob = seb.blob(); + const bdlbb::Blob& eventBlob = *seb.blob(); bmqp::Event rawEvent(&eventBlob, s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); @@ -443,7 +443,7 @@ static void test3_packMessage_payloadTooBig() static_cast(seb.eventSize())); ASSERT_EQ(seb.messageCount(), 1); - const bdlbb::Blob& eventBlob = seb.blob(); + const bdlbb::Blob& eventBlob = *seb.blob(); bmqp::Event rawEvent(&eventBlob, s_allocator_p); BSLS_ASSERT(true == rawEvent.isValid()); @@ -526,7 +526,7 @@ static void test4_packMessageRaw() ASSERT_EQ_D(dataIdx, rc, bmqt::EventBuilderResult::e_SUCCESS); } - const bdlbb::Blob& eventA = sebA.blob(); + const bdlbb::Blob& eventA = *sebA.blob(); bmqp::Event rawEventA(&eventA, s_allocator_p); BSLS_ASSERT(rawEventA.isValid() == true); BSLS_ASSERT(rawEventA.isStorageEvent() == true); @@ -562,7 +562,7 @@ static void test4_packMessageRaw() ASSERT_EQ(iterA.isValid(), false); // Finally, iterate over event 'B' and verify. - const bdlbb::Blob& eventB = sebB.blob(); + const bdlbb::Blob& eventB = *sebB.blob(); bmqp::Event rawEventB(&eventB, s_allocator_p); BSLS_ASSERT(true == rawEventB.isValid()); @@ -656,7 +656,7 @@ static void test5_packMessageRaw_emptyMessage() // Above we packMessageRaw with 'length' of 10 because we need an // arbitrary 'length > 0' to not trigger an assert and at the same time // ensure that packing an empty blob succeeds. - ASSERT_EQ(bdlbb::BlobUtil::compare(seb.blob(), emptyBlob), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(*seb.blob(), emptyBlob), 0); ASSERT_EQ(seb.messageCount(), 0); ASSERT_EQ(seb.eventSize(), static_cast(sizeof(bmqp::EventHeader))); } @@ -702,7 +702,7 @@ static void test6_packMessageRaw_invalidPosition() bmqu::BlobPosition invalidPosition(-1, -1); ASSERT_NE(seb.packMessageRaw(message, invalidPosition, message.length()), bmqt::EventBuilderResult::e_SUCCESS); - ASSERT_EQ(bdlbb::BlobUtil::compare(seb.blob(), k_EMPTY_BLOB), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(*seb.blob(), k_EMPTY_BLOB), 0); ASSERT_EQ(seb.messageCount(), 0); ASSERT_EQ(seb.eventSize(), static_cast(sizeof(bmqp::EventHeader))); } diff --git a/src/groups/bmq/bmqu/bmqu_objectplaceholder.h b/src/groups/bmq/bmqu/bmqu_objectplaceholder.h index 6d4948f1b..a9134ea1e 100644 --- a/src/groups/bmq/bmqu/bmqu_objectplaceholder.h +++ b/src/groups/bmq/bmqu/bmqu_objectplaceholder.h @@ -45,7 +45,7 @@ #if BSLS_COMPILERFEATURES_SIMULATE_CPP11_FEATURES // Include version that can be compiled with C++03 -// Generated on Tue Oct 15 17:38:31 2024 +// Generated on Wed Nov 6 15:02:33 2024 // Command line: sim_cpp11_features.pl bmqu_objectplaceholder.h #define COMPILING_BMQU_OBJECTPLACEHOLDER_H #include diff --git a/src/groups/mqb/mqba/mqba_adminsession.cpp b/src/groups/mqb/mqba/mqba_adminsession.cpp index 5488e6eb6..364f4d193 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.cpp +++ b/src/groups/mqb/mqba/mqba_adminsession.cpp @@ -106,13 +106,11 @@ bmqp_ctrlmsg::ClientIdentity* extractClientIdentity( // struct AdminSessionState // ------------------------- -AdminSessionState::AdminSessionState(BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, - bmqp::EncodingType::Enum encodingType, - bslma::Allocator* allocator) +AdminSessionState::AdminSessionState(BlobSpPool* blobSpPool, + bmqp::EncodingType::Enum encodingType, + bslma::Allocator* allocator) : d_allocator_p(allocator) , d_dispatcherClientData() -, d_bufferFactory_p(bufferFactory) , d_blobSpPool_p(blobSpPool) , d_schemaEventBuilder(blobSpPool, encodingType, allocator) { @@ -135,7 +133,7 @@ void AdminSession::sendPacket() bdlb::ScopeExitAny resetBlobScopeGuard( bdlf::BindUtil::bind(&bmqp::SchemaEventBuilder::reset, &d_state.d_schemaEventBuilder)); - const bdlbb::Blob& blob = d_state.d_schemaEventBuilder.blob(); + const bdlbb::Blob& blob = *d_state.d_schemaEventBuilder.blob(); // This method is the centralized *single* place where we should try to // send data to the client over the channel. @@ -194,7 +192,7 @@ void AdminSession::finalizeAdminCommand( // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); BSLS_ASSERT_SAFE(adminCommandCtrlMsg.choice().isAdminCommandValue()); - BSLS_ASSERT_SAFE(d_state.d_schemaEventBuilder.blob().length() == 0); + BSLS_ASSERT_SAFE(d_state.d_schemaEventBuilder.blob()->length() == 0); // Send success/error response to client bdlma::LocalSequentialAllocator<2048> localAllocator( @@ -274,7 +272,6 @@ AdminSession::AdminSession( const bsl::string& sessionDescription, mqbi::Dispatcher* dispatcher, AdminSessionState::BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bdlmt::EventScheduler* scheduler, const mqbnet::Session::AdminCommandEnqueueCb& adminCb, bslma::Allocator* allocator) @@ -285,7 +282,6 @@ AdminSession::AdminSession( , d_description(sessionDescription, allocator) , d_channel_sp(channel) , d_state(blobSpPool, - bufferFactory, bmqp::SchemaEventBuilderUtil::bestEncodingSupported( d_clientIdentity_p->features()), allocator) diff --git a/src/groups/mqb/mqba/mqba_adminsession.h b/src/groups/mqb/mqba/mqba_adminsession.h index 135ed3db0..a05a127e5 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.h +++ b/src/groups/mqb/mqba/mqba_adminsession.h @@ -92,9 +92,6 @@ struct AdminSessionState { // Dispatcher client data associated to // this session. - bdlbb::BlobBufferFactory* d_bufferFactory_p; - // Blob buffer factory to use. - BlobSpPool* d_blobSpPool_p; // Pool of shared pointers to blob to // use. @@ -118,14 +115,13 @@ struct AdminSessionState { // CREATORS - /// Constructor of a new session state using the specified `dispatcher`, - /// `blobSpPool` and `bufferFactory`. The specified `encodingType` is - /// the encoding which the schema event builder will use. Memory - /// allocations are performed using the specified `allocator`. - AdminSessionState(BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, - bmqp::EncodingType::Enum encodingType, - bslma::Allocator* allocator); + /// Constructor of a new session state using the specified `dispatcher` and + /// `blobSpPool`. The specified `encodingType` is the encoding which the + /// schema event builder will use. Memory allocations are performed using + /// the specified `allocator`. + AdminSessionState(BlobSpPool* blobSpPool, + bmqp::EncodingType::Enum encodingType, + bslma::Allocator* allocator); }; // ================== @@ -230,20 +226,18 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient { // CREATORS /// Constructor of a new session associated to the specified `channel` - /// and using the specified `dispatcher`, `blobSpPool`, `bufferFactory` - /// and `scheduler`. The specified `negotiationMessage` represents the - /// identity received from the peer during negotiation, and the - /// specified `sessionDescription` is the short form description of the - /// session. Memory allocations are performed using the specified - /// `allocator`. The specified `adminEnqueueCb` callback is used to - /// enqueue admin commands to entity that is responsible for executing - /// admin commands. + /// and using the specified `dispatcher`, `blobSpPool` and `scheduler`. + /// The specified `negotiationMessage` represents the identity received + /// from the peer during negotiation, and the specified + /// `sessionDescription` is the short form description of the session. + /// Memory allocations are performed using the specified `allocator`. + /// The specified `adminEnqueueCb` callback is used to enqueue admin + /// commands to entity that is responsible for executing admin commands. AdminSession(const bsl::shared_ptr& channel, const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage, const bsl::string& sessionDescription, mqbi::Dispatcher* dispatcher, AdminSessionState::BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bdlmt::EventScheduler* scheduler, const mqbnet::Session::AdminCommandEnqueueCb& adminEnqueueCb, bslma::Allocator* allocator); diff --git a/src/groups/mqb/mqba/mqba_adminsession.t.cpp b/src/groups/mqb/mqba/mqba_adminsession.t.cpp index 789022b3a..7e63c2c25 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.t.cpp +++ b/src/groups/mqb/mqba/mqba_adminsession.t.cpp @@ -171,7 +171,6 @@ class TestBench { "sessionDescription", setInDispatcherThread(&d_mockDispatcher), &d_blobSpPool, - &d_bufferFactory, &d_scheduler, adminEnqueueCb, allocator) @@ -256,7 +255,7 @@ static void test1_watermark() int rc = builder.setMessage(admin, bmqp::EventType::e_CONTROL); ASSERT_EQ(rc, 0); - bmqp::Event adminEvent(&builder.blob(), s_allocator_p); + bmqp::Event adminEvent(builder.blob().get(), s_allocator_p); BSLS_ASSERT(adminEvent.isValid()); BSLS_ASSERT(adminEvent.isControlEvent()); diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index a81103bc3..e29b6b87a 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -372,7 +372,8 @@ void ClientSession::sendErrorResponse( sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true); } -void ClientSession::sendPacket(const bdlbb::Blob& blob, bool flushBuilders) +void ClientSession::sendPacket(const bsl::shared_ptr& blob, + bool flushBuilders) { dispatcher()->execute( bdlf::BindUtil::bind(&ClientSession::sendPacketDispatched, @@ -382,8 +383,9 @@ void ClientSession::sendPacket(const bdlbb::Blob& blob, bool flushBuilders) this); } -void ClientSession::sendPacketDispatched(const bdlbb::Blob& blob, - bool flushBuilders) +void ClientSession::sendPacketDispatched( + const bsl::shared_ptr& blob, + bool flushBuilders) { // executed by the *CLIENT* dispatcher thread @@ -434,7 +436,7 @@ void ClientSession::sendPacketDispatched(const bdlbb::Blob& blob, // the message to the channelBufferQueue, and we'll send it later, once we // get the lowWatermark notification. bmqio::Status status; - d_channel_sp->write(&status, blob); + d_channel_sp->write(&status, *blob); if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( status.category() != bmqio::StatusCategory::e_SUCCESS)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; @@ -446,7 +448,7 @@ void ClientSession::sendPacketDispatched(const bdlbb::Blob& blob, if (status.category() == bmqio::StatusCategory::e_LIMIT) { BALL_LOG_WARN << "#CLIENT_SEND_FAILURE " << description() << ": Failed to send data [size: " - << bmqu::PrintUtil::prettyNumber(blob.length()) + << bmqu::PrintUtil::prettyNumber(blob->length()) << " bytes] to client due to channel watermark limit" << "; enqueuing to the ChannelBufferQueue."; d_state.d_channelBufferQueue.push_back(blob); @@ -454,7 +456,7 @@ void ClientSession::sendPacketDispatched(const bdlbb::Blob& blob, else { BALL_LOG_INFO << "#CLIENT_SEND_FAILURE " << description() << ": Failed to send data [size: " - << bmqu::PrintUtil::prettyNumber(blob.length()) + << bmqu::PrintUtil::prettyNumber(blob->length()) << " bytes] to client with status: " << status; } } @@ -477,10 +479,11 @@ void ClientSession::flushChannelBufferQueue() // Try to send as many data as possible while (!d_state.d_channelBufferQueue.empty()) { - const bdlbb::Blob& blob = d_state.d_channelBufferQueue.front(); + const bsl::shared_ptr& blob_sp = + d_state.d_channelBufferQueue.front(); bmqio::Status status; - d_channel_sp->write(&status, blob); + d_channel_sp->write(&status, *blob_sp); if (status.category() == bmqio::StatusCategory::e_LIMIT) { // We are hitting the limit again, can't continue.. stop sending // and we'll resume with the next lowWatermark notification. diff --git a/src/groups/mqb/mqba/mqba_clientsession.h b/src/groups/mqb/mqba/mqba_clientsession.h index 47c9e7ada..7e7d9f762 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.h +++ b/src/groups/mqb/mqba/mqba_clientsession.h @@ -155,7 +155,7 @@ struct ClientSessionState { bslma::Allocator* d_allocator_p; // Allocator to use. - bsl::deque d_channelBufferQueue; + bsl::deque > d_channelBufferQueue; // Queue of data pending being sent to // the client. This should almost // always be empty, and is meant to @@ -183,8 +183,10 @@ struct ClientSessionState { // context for any queue in this // domain. + /// Blob buffer factory to use. + /// TODO: this field should be removed once we retire the code for + /// message properties conversion. bdlbb::BlobBufferFactory* d_bufferFactory_p; - // Blob buffer factory to use. BlobSpPool* d_blobSpPool_p; // Pool of shared pointers to blob to @@ -427,8 +429,10 @@ class ClientSession : public mqbnet::Session, /// ACK) will be flushed and sent before `blob`; this is necessary to /// guarantee strict serialization of events when sending a control /// message. - void sendPacket(const bdlbb::Blob& blob, bool flushBuilders); - void sendPacketDispatched(const bdlbb::Blob& blob, bool flushBuilders); + void sendPacket(const bsl::shared_ptr& blob, + bool flushBuilders); + void sendPacketDispatched(const bsl::shared_ptr& blob, + bool flushBuilders); /// Flush as much as possible of the content of the internal /// `channelBufferQueue`. diff --git a/src/groups/mqb/mqba/mqba_clientsession.t.cpp b/src/groups/mqb/mqba/mqba_clientsession.t.cpp index 0a8f06c00..ee437aa01 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.t.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.t.cpp @@ -777,7 +777,7 @@ class TestBench { int rc = obj.setMessage(controlMessage, bmqp::EventType::e_CONTROL); ASSERT_EQ(rc, 0); - bmqp::Event event(&obj.blob(), d_allocator_p); + bmqp::Event event(obj.blob().get(), d_allocator_p); d_cs.processEvent(event); } @@ -805,7 +805,7 @@ class TestBench { int rc = obj.setMessage(controlMessage, bmqp::EventType::e_CONTROL); ASSERT_EQ(rc, 0); - bmqp::Event event(&obj.blob(), d_allocator_p); + bmqp::Event event(obj.blob().get(), d_allocator_p); d_cs.processEvent(event); } @@ -1908,7 +1908,7 @@ static void test9_newStylePush() ASSERT_EQ(bmqt::EventBuilderResult::e_SUCCESS, rc); mqbi::DispatcherEvent putEvent(s_allocator_p); - bmqp::Event rawEvent(&peb.blob(), s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT(rawEvent.isValid()); BSLS_ASSERT(rawEvent.isPutEvent()); @@ -1917,15 +1917,11 @@ static void test9_newStylePush() rawEvent.loadPutMessageIterator(&putIt, false); BSLS_ASSERT(putIt.next()); - bsl::shared_ptr blobSp; - blobSp.createInplace(s_allocator_p, &tb.d_bufferFactory, s_allocator_p); - *blobSp = peb.blob(); - putEvent.setType(mqbi::DispatcherEventType::e_PUT) .setIsRelay(true) // Relay message .setSource(&tb.d_cs) // DispatcherClient *value .setPutHeader(putIt.header()) - .setBlob(blobSp); // const bsl::shared_ptr& value + .setBlob(peb.blob()); // const bsl::shared_ptr& value tb.dispatch(putEvent); @@ -2016,7 +2012,7 @@ static void test10_newStyleCompressedPush() ASSERT_EQ(bmqt::EventBuilderResult::e_SUCCESS, rc); mqbi::DispatcherEvent putEvent(s_allocator_p); - bmqp::Event rawEvent(&peb.blob(), s_allocator_p); + bmqp::Event rawEvent(peb.blob().get(), s_allocator_p); BSLS_ASSERT(rawEvent.isValid()); BSLS_ASSERT(rawEvent.isPutEvent()); @@ -2025,15 +2021,11 @@ static void test10_newStyleCompressedPush() rawEvent.loadPutMessageIterator(&putIt, false); BSLS_ASSERT(putIt.next()); - bsl::shared_ptr blobSp; - blobSp.createInplace(s_allocator_p, &tb.d_bufferFactory, s_allocator_p); - *blobSp = peb.blob(); - putEvent.setType(mqbi::DispatcherEventType::e_PUT) .setIsRelay(true) // Relay message .setSource(&tb.d_cs) // DispatcherClient *value .setPutHeader(putIt.header()) - .setBlob(blobSp) // const bsl::shared_ptr& value + .setBlob(peb.blob()) // const bsl::shared_ptr& value .setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::e_ZLIB); tb.dispatch(putEvent); diff --git a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp index dfea62a21..d7186ec05 100644 --- a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp +++ b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp @@ -699,7 +699,7 @@ int SessionNegotiator::sendNegotiationMessage( // Send response event bmqio::Status status; - context->d_channelSp->write(&status, builder.blob()); + context->d_channelSp->write(&status, *builder.blob()); if (!status) { errorDescription << "Failed sending NegotiationMessage " << "[status: " << status << ", message: " << message @@ -736,7 +736,6 @@ void SessionNegotiator::createSession(bsl::ostream& errorDescription, description, d_dispatcher_p, d_blobSpPool_p, - d_bufferFactory_p, d_scheduler_p, d_adminCb, d_allocator_p); diff --git a/src/groups/mqb/mqba/mqba_sessionnegotiator.h b/src/groups/mqb/mqba/mqba_sessionnegotiator.h index b4db32e66..adc966fc6 100644 --- a/src/groups/mqb/mqba/mqba_sessionnegotiator.h +++ b/src/groups/mqb/mqba/mqba_sessionnegotiator.h @@ -150,9 +150,10 @@ class SessionNegotiator : public mqbnet::Negotiator { bslma::Allocator* d_allocator_p; // Allocator to use + /// Buffer factory to use in constructed client sessions + /// TODO: this field should be removed once we retire the code for + /// message properties conversion in `mqba::ClientSession`. bdlbb::BlobBufferFactory* d_bufferFactory_p; - // Buffer factory to inject into new client - // sessions mqbi::Dispatcher* d_dispatcher_p; // Dispatcher to inject into new client diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index 6ae5fe55a..2b465498a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -592,7 +592,6 @@ ClusterOrchestrator::ClusterOrchestrator( // Strong d_clusterData_p, clusterState, - &d_clusterData_p->bufferFactory(), &d_clusterData_p->blobSpPool())), k_WATCHDOG_TIMEOUT_DURATION, d_allocators.get("ClusterStateManager"))) @@ -612,7 +611,6 @@ ClusterOrchestrator::ClusterOrchestrator( // Strong d_clusterData_p, clusterState, - &d_clusterData_p->bufferFactory(), &d_clusterData_p->blobSpPool())), d_allocators.get("ClusterStateManager"))), d_allocator_p) diff --git a/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp b/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp index ebaa4bfe2..e90ef1b40 100644 --- a/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_recoverymanager.cpp @@ -1552,7 +1552,7 @@ int RecoveryManager::sendFile(RequestContext* context, } bmqt::GenericResult::Enum writeRc = context->requesterNode()->write( - builder.blob_sp(), + builder.blob(), bmqp::EventType::e_RECOVERY); if (bmqt::GenericResult::e_SUCCESS != writeRc) { @@ -1600,7 +1600,7 @@ int RecoveryManager::sendFile(RequestContext* context, } bmqt::GenericResult::Enum writeRc = context->requesterNode()->write( - builder.blob_sp(), + builder.blob(), bmqp::EventType::e_RECOVERY); if (bmqt::GenericResult::e_SUCCESS != writeRc) { @@ -1803,7 +1803,7 @@ int RecoveryManager::replayPartition( .syncConfig() .partitionSyncEventSize() <= builder.eventSize()) { bmqt::GenericResult::Enum writeRc = destination->write( - builder.blob_sp(), + builder.blob(), bmqp::EventType::e_PARTITION_SYNC); if (bmqt::GenericResult::e_SUCCESS != writeRc) { @@ -1827,7 +1827,7 @@ int RecoveryManager::replayPartition( if (0 < builder.messageCount()) { bmqt::GenericResult::Enum writeRc = destination->write( - builder.blob_sp(), + builder.blob(), bmqp::EventType::e_PARTITION_SYNC); if (bmqt::GenericResult::e_SUCCESS != writeRc) { @@ -3297,12 +3297,7 @@ void RecoveryManager::processStorageEvent( // nothing to buffer in this event. if (0 < seb.messageCount()) { - bsl::shared_ptr blobSp; - blobSp.createInplace(d_allocator_p, - &d_clusterData_p->bufferFactory(), - d_allocator_p); - *blobSp = seb.blob(); - recoveryCtx.addStorageEvent(blobSp); + recoveryCtx.addStorageEvent(seb.blob()); } recoveryCtx.setNewSyncPoint(syncPoint); diff --git a/src/groups/mqb/mqbc/mqbc_controlmessagetransmitter.cpp b/src/groups/mqb/mqbc/mqbc_controlmessagetransmitter.cpp index bc2d83f95..99d2c1221 100644 --- a/src/groups/mqb/mqbc/mqbc_controlmessagetransmitter.cpp +++ b/src/groups/mqb/mqbc/mqbc_controlmessagetransmitter.cpp @@ -51,9 +51,8 @@ void ControlMessageTransmitter::sendMessageHelper( return; // RETURN } - bmqt::GenericResult::Enum writeRc = destination->write( - schemaBuilder->blob_sp(), - bmqp::EventType::e_CONTROL); + bmqt::GenericResult::Enum writeRc = + destination->write(schemaBuilder->blob(), bmqp::EventType::e_CONTROL); if (bmqt::GenericResult::e_SUCCESS != writeRc) { BALL_LOG_ERROR << "#CLUSTER_SEND_FAILURE " << "Failed to write schema message: " << message @@ -84,7 +83,7 @@ void ControlMessageTransmitter::broadcastMessageHelper( // Broadcast to cluster, using the unicast channel to ensure ordering of // events. - d_cluster_p->netCluster().writeAll(schemaBuilder->blob_sp(), + d_cluster_p->netCluster().writeAll(schemaBuilder->blob(), bmqp::EventType::e_CONTROL); BALL_LOG_INFO << "Broadcasted message '" << message @@ -115,7 +114,7 @@ void ControlMessageTransmitter::broadcastMessageHelper( bmqp::HighAvailabilityFeatures::k_BROADCAST_TO_PROXIES, negoMsg.clientIdentity().features())) { bmqio::Status status; - sessionSp->channel()->write(&status, schemaBuilder->blob()); + sessionSp->channel()->write(&status, *schemaBuilder->blob()); if (status.category() == bmqio::StatusCategory::e_SUCCESS) { BALL_LOG_INFO << "Sent message '" << message << "' to proxy " << sessionSp->description(); @@ -190,7 +189,7 @@ void ControlMessageTransmitter::sendMessage( } bmqio::Status status; - channel->write(&status, d_schemaBuilder.blob()); + channel->write(&status, *d_schemaBuilder.blob()); if (status.category() != bmqio::StatusCategory::e_SUCCESS) { BALL_LOG_ERROR << "#CLUSTER_SEND_FAILURE " << "Failed to write schema message: " << message diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp index 791183632..16ea3acbd 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp @@ -268,12 +268,12 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, ++advisoryIt) { ClusterMessageInfo& info = advisoryIt->second; - bdlbb::Blob record(d_bufferFactory_p, d_allocator_p); + bsl::shared_ptr record = d_blobSpPool_p->getObject(); ClusterStateRecordType::Enum recordType = info.d_clusterMessage.choice().isLeaderAdvisoryValue() ? ClusterStateRecordType::e_SNAPSHOT : ClusterStateRecordType::e_UPDATE; - rc = ClusterStateLedgerUtil::appendRecord(&record, + rc = ClusterStateLedgerUtil::appendRecord(record.get(), info.d_clusterMessage, advisoryIt->first, currentTime(), @@ -283,9 +283,9 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, } rc = d_ledger_mp->writeRecord(&(info.d_recordId), - record, + *record, bmqu::BlobPosition(), - record.length()); + record->length()); if (rc != 0) { return 10 * rc + rc_WRITE_RECORD_FAILURE; // RETURN } @@ -343,9 +343,9 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, ClusterMessageInfo info; info.d_clusterMessage.choice().makeLeaderAdvisory(leaderAdvisory); - bdlbb::Blob record(d_bufferFactory_p, d_allocator_p); - rc = ClusterStateLedgerUtil::appendRecord( - &record, + bsl::shared_ptr record = d_blobSpPool_p->getObject(); + rc = ClusterStateLedgerUtil::appendRecord( + record.get(), info.d_clusterMessage, leaderAdvisory.sequenceNumber(), currentTime(), @@ -355,9 +355,9 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, } rc = d_ledger_mp->writeRecord(&(info.d_recordId), - record, + *record, bmqu::BlobPosition(), - record.length()); + record->length()); if (rc != 0) { return 10 * rc + rc_WRITE_RECORD_FAILURE; // RETURN } @@ -415,8 +415,8 @@ int IncoreClusterStateLedger::applyAdvisoryInternal( } // Do leader logic: apply and broadcast advisory, then apply its ack - bdlbb::Blob advisoryRecord(d_bufferFactory_p, d_allocator_p); - int rc = ClusterStateLedgerUtil::appendRecord(&advisoryRecord, + bsl::shared_ptr advisoryRecord = d_blobSpPool_p->getObject(); + int rc = ClusterStateLedgerUtil::appendRecord(advisoryRecord.get(), clusterMessage, sequenceNumber, currentTime(), @@ -425,7 +425,7 @@ int IncoreClusterStateLedger::applyAdvisoryInternal( return 10 * rc + rc_CREATE_RECORD_FAILURE; // RETURN } - rc = applyRecordInternal(advisoryRecord, + rc = applyRecordInternal(*advisoryRecord, 0, clusterMessage, sequenceNumber, @@ -545,9 +545,9 @@ int IncoreClusterStateLedger::applyRecordInternal( ackMessage.choice().makeLeaderAdvisoryAck().sequenceNumberAcked() = sequenceNumber; - bdlbb::Blob ackRecord(d_bufferFactory_p, d_allocator_p); + bsl::shared_ptr ackRecord = d_blobSpPool_p->getObject(); rc = ClusterStateLedgerUtil::appendRecord( - &ackRecord, + ackRecord.get(), ackMessage, sequenceNumber, currentTime(), @@ -558,7 +558,7 @@ int IncoreClusterStateLedger::applyRecordInternal( // If leader, apply Ack to self. Else, reply Ack back to leader if (isSelfLeader()) { - rc = applyRecordInternal(ackRecord, + rc = applyRecordInternal(*ackRecord, 0, ackMessage, sequenceNumber, @@ -571,7 +571,7 @@ int IncoreClusterStateLedger::applyRecordInternal( bsl::shared_ptr ackEvent = d_blobSpPool_p->getObject(); - constructEventBlob(ackEvent.get(), ackRecord); + constructEventBlob(ackEvent.get(), *ackRecord); mqbnet::ClusterNode* leaderNode = d_clusterData_p->electorInfo().leaderNode(); @@ -700,9 +700,10 @@ int IncoreClusterStateLedger::applyRecordInternal( << ", creating and applying commit advisory: " << commitMessage << "."; - bdlbb::Blob commitRecord(d_bufferFactory_p, d_allocator_p); + bsl::shared_ptr commitRecord = + d_blobSpPool_p->getObject(); rc = ClusterStateLedgerUtil::appendRecord( - &commitRecord, + commitRecord.get(), commitMessage, commitAdvisory.sequenceNumber(), currentTime(), @@ -711,7 +712,7 @@ int IncoreClusterStateLedger::applyRecordInternal( return 10 * rc + rc_CREATE_COMMIT_FAILURE; // RETURN } - rc = applyRecordInternal(commitRecord, + rc = applyRecordInternal(*commitRecord, 0, commitMessage, commitAdvisory.sequenceNumber(), @@ -1183,13 +1184,11 @@ IncoreClusterStateLedger::IncoreClusterStateLedger( ClusterStateLedgerConsistency::Enum consistencyLevel, ClusterData* clusterData, ClusterState* clusterState, - bdlbb::BlobBufferFactory* bufferFactory, BlobSpPool* blobSpPool_p, bslma::Allocator* allocator) : d_allocator_p(allocator) , d_isFirstLeaderAdvisory(true) , d_isOpen(false) -, d_bufferFactory_p(bufferFactory) , d_blobSpPool_p(blobSpPool_p) , d_description(allocator) , d_commitCb() diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h index 8e5b12816..0e30680e9 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.h @@ -166,11 +166,6 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger { // Flag to indicate open/close status // of this object - bdlbb::BlobBufferFactory* d_bufferFactory_p; - // Buffer factory for the headers and - // payloads of the messages to be - // written - /// Pool of shared pointers to blobs BlobSpPool* d_blobSpPool_p; @@ -306,7 +301,6 @@ class IncoreClusterStateLedger BSLS_KEYWORD_FINAL : public ClusterStateLedger { ClusterStateLedgerConsistency::Enum consistencyLevel, ClusterData* clusterData, ClusterState* clusterState, - bdlbb::BlobBufferFactory* bufferFactory, BlobSpPool* blobSpPool_p, bslma::Allocator* allocator); diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp index 0b6d4835a..e406f095e 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.t.cpp @@ -287,7 +287,6 @@ struct Tester { d_consistencyLevel, d_cluster_mp->_clusterData(), &d_cluster_mp->_state(), - d_cluster_mp->_bufferFactory(), d_cluster_mp->_blobSpPool(), s_allocator_p), s_allocator_p); diff --git a/src/groups/mqb/mqbc/mqbc_recoverymanager.cpp b/src/groups/mqb/mqbc/mqbc_recoverymanager.cpp index 64097eac3..a31c6e5d1 100644 --- a/src/groups/mqb/mqbc/mqbc_recoverymanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_recoverymanager.cpp @@ -117,13 +117,12 @@ void RecoveryManager::ReceiveDataContext::reset() // CREATORS RecoveryManager::RecoveryManager( - BlobSpPool* blobSpPool_p, const mqbcfg::ClusterDefinition& clusterConfig, - const mqbc::ClusterData& clusterData, + mqbc::ClusterData& clusterData, const mqbs::DataStoreConfig& dataStoreConfig, bslma::Allocator* allocator) : d_allocator_p(allocator) -, d_blobSpPool_p(blobSpPool_p) +, d_blobSpPool_p(&clusterData.blobSpPool()) , d_clusterConfig(clusterConfig) , d_dataStoreConfig(dataStoreConfig) , d_clusterData(clusterData) @@ -509,7 +508,7 @@ int RecoveryManager::processSendDataChunks( .syncConfig() .partitionSyncEventSize() <= builder.eventSize()) { const bmqt::GenericResult::Enum writeRc = destination->write( - builder.blob_sp(), + builder.blob(), bmqp::EventType::e_PARTITION_SYNC); if (bmqt::GenericResult::e_SUCCESS != writeRc) { @@ -533,7 +532,7 @@ int RecoveryManager::processSendDataChunks( if (0 < builder.messageCount()) { const bmqt::GenericResult::Enum writeRc = destination->write( - builder.blob_sp(), + builder.blob(), bmqp::EventType::e_PARTITION_SYNC); if (bmqt::GenericResult::e_SUCCESS != writeRc) { diff --git a/src/groups/mqb/mqbc/mqbc_recoverymanager.h b/src/groups/mqb/mqbc/mqbc_recoverymanager.h index 9434ac18e..b21f53290 100644 --- a/src/groups/mqb/mqbc/mqbc_recoverymanager.h +++ b/src/groups/mqb/mqbc/mqbc_recoverymanager.h @@ -299,12 +299,11 @@ class RecoveryManager { // CREATORS - /// Create a `RecoveryManager` object with the specified `bufferFactory`, - /// `clusterConfig`, `dataStoreConfig`, and `clusterData`. Use the - /// specified `allocator` for any memory allocation. - RecoveryManager(BlobSpPool* blobSpPool_p, - const mqbcfg::ClusterDefinition& clusterConfig, - const mqbc::ClusterData& clusterData, + /// Create a `RecoveryManager` object with the specified `clusterConfig`, + /// `clusterData` and `dataStoreConfig`. Use the specified `allocator` + /// for any memory allocation. + RecoveryManager(const mqbcfg::ClusterDefinition& clusterConfig, + mqbc::ClusterData& clusterData, const mqbs::DataStoreConfig& dataStoreConfig, bslma::Allocator* allocator); diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 9ac129f20..5ee817fee 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3344,7 +3344,6 @@ StorageManager::StorageManager( , d_lowDiskspaceWarning(false) , d_unrecognizedDomainsLock() , d_unrecognizedDomains(allocator) -, d_blobSpPool_p(&clusterData->blobSpPool()) , d_domainFactory_p(domainFactory) , d_dispatcher_p(dispatcher) , d_cluster_p(cluster) @@ -3505,7 +3504,7 @@ int StorageManager::start(bsl::ostream& errorDescription) d_dispatcher_p, partitionCfg, &d_fileStores, - d_blobSpPool_p, + &d_clusterData_p->blobSpPool(), &d_allocators, errorDescription, d_replicationFactor, @@ -3533,8 +3532,7 @@ int StorageManager::start(bsl::ostream& errorDescription) "RecoveryManager"); d_recoveryManager_mp.load(new (*recoveryManagerAllocator) - RecoveryManager(d_blobSpPool_p, - d_clusterConfig, + RecoveryManager(d_clusterConfig, *d_clusterData_p, dsCfg, recoveryManagerAllocator), diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index 5ba16299b..581cde5a9 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -235,9 +235,6 @@ class StorageManager // // THREAD: Protected by 'd_unrecognizedDomainsLock'. - BlobSpPool* d_blobSpPool_p; - // SharedObjectPool of blobs to use - mqbi::DomainFactory* d_domainFactory_p; // Domain factory to use diff --git a/src/groups/mqb/mqbnet/mqbnet_channel.h b/src/groups/mqb/mqbnet/mqbnet_channel.h index 1d76c98fa..4a5df5e84 100644 --- a/src/groups/mqb/mqbnet/mqbnet_channel.h +++ b/src/groups/mqb/mqbnet/mqbnet_channel.h @@ -1058,7 +1058,7 @@ Channel::flushBuilder(Builder& builder, bmqio::Status st; if (builder.messageCount()) { - channel->write(&st, builder.blob()); + channel->write(&st, *builder.blob()); if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY( st.category() == bmqio::StatusCategory::e_SUCCESS)) { diff --git a/src/groups/mqb/mqbnet/mqbnet_channel.t.cpp b/src/groups/mqb/mqbnet/mqbnet_channel.t.cpp index 53cb97b68..e13df7a02 100644 --- a/src/groups/mqb/mqbnet/mqbnet_channel.t.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_channel.t.cpp @@ -99,7 +99,7 @@ struct PseudoBuilder { } int messageCount() const { return d_payload_sp->length() ? 1 : 0; } void reset() { d_payload_sp = d_blobSpPool_p->getObject(); } - bsl::shared_ptr blob_sp() const { return d_payload_sp; } + bsl::shared_ptr blob() const { return d_payload_sp; } }; template struct Iterator { @@ -432,7 +432,7 @@ inline void Tester::test() bmqt::EventBuilderResult::e_OPTION_TOO_BIG == rc)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - d_history.push_back(d_builder.blob_sp()); + d_history.push_back(d_builder.blob()); d_builder.reset(); rc = build(); @@ -630,7 +630,7 @@ inline size_t Tester::verify( const bsl::shared_ptr& testChannel) { if (d_builder.messageCount()) { - d_history.push_back(d_builder.blob_sp()); + d_history.push_back(d_builder.blob()); d_builder.reset(); } diff --git a/src/groups/mqb/mqbnet/mqbnet_elector.cpp b/src/groups/mqb/mqbnet/mqbnet_elector.cpp index ebf01ed27..9d25d1f64 100644 --- a/src/groups/mqb/mqbnet/mqbnet_elector.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_elector.cpp @@ -2091,7 +2091,7 @@ void Elector::emitIOEvent(const ElectorStateMachineOutput& output) } // Retrieve the encoded event - const bsl::shared_ptr blob = builder.blob_sp(); + const bsl::shared_ptr blob = builder.blob(); if (k_ALL_NODES_ID == output.destination()) { // Broadcast to cluster, using the unicast channel to ensure ordering // of events diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 790f028d8..042051ae6 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -6903,7 +6903,7 @@ void FileStore::dispatcherFlush(bool storage, bool queues) << d_storageEventBuilder.messageCount() << " STORAGE messages."; const int maxChannelPendingItems = d_cluster_p->broadcast( - d_storageEventBuilder.blob_sp()); + d_storageEventBuilder.blob()); if (maxChannelPendingItems > 0) { if (d_nagglePacketCount < k_NAGLE_PACKET_COUNT) { // back off