Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Nov 14, 2024
1 parent f536640 commit d330e16
Show file tree
Hide file tree
Showing 69 changed files with 468 additions and 637 deletions.
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqa/bmqa_confirmeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ const bdlbb::Blob& ConfirmEventBuilder::blob() const
// PRECONDITIONS
BSLS_ASSERT(d_impl.d_builder_p);

return d_impl.d_builder_p->blob();
return *d_impl.d_builder_p->blob();
}

} // close package namespace
Expand Down
18 changes: 9 additions & 9 deletions src/groups/bmq/bmqa/bmqa_message.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ static void test2_validPushMessagePrint()
bmqp::PushEventBuilder peb(&blobSpPool, s_allocator_p);
ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast<size_t>(peb.eventSize()));
ASSERT_EQ(sizeof(bmqp::EventHeader),
static_cast<size_t>(peb.blob().length()));
static_cast<size_t>(peb.blob()->length()));
ASSERT_EQ(0, peb.messageCount());

// Add SubQueueInfo option
Expand All @@ -199,7 +199,7 @@ static void test2_validPushMessagePrint()
ASSERT_EQ(sizeof(bmqp::EventHeader), static_cast<size_t>(peb.eventSize()));
// 'eventSize()' excludes unpacked messages
ASSERT_LT(sizeof(bmqp::EventHeader),
static_cast<size_t>(peb.blob().length()));
static_cast<size_t>(peb.blob()->length()));
// But the option is written to the underlying blob
rc = peb.packMessage(payload,
queueId,
Expand All @@ -211,7 +211,7 @@ static void test2_validPushMessagePrint()
ASSERT_LT(payload.length(), peb.eventSize());
ASSERT_EQ(1, peb.messageCount());

bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true);
bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true);
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->addCorrelationId(bmqt::CorrelationId());
Expand Down Expand Up @@ -303,7 +303,7 @@ static void test3_messageProperties()
queue->setId(queueId);
implPtr->insertQueue(subQueueId, queue);

bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true);
bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true);

implPtr->configureAsMessageEvent(bmqpEvent);
implPtr->addCorrelationId(bmqt::CorrelationId());
Expand Down Expand Up @@ -443,7 +443,7 @@ static void test4_subscriptionHandle()
static_cast<size_t>(peb.eventSize()));
// 'eventSize()' excludes unpacked messages
ASSERT_LT(sizeof(bmqp::EventHeader),
static_cast<size_t>(peb.blob().length()));
static_cast<size_t>(peb.blob()->length()));
// But the option is written to the underlying blob

// Add message
Expand All @@ -457,7 +457,7 @@ static void test4_subscriptionHandle()
ASSERT_LT(payload.length(), peb.eventSize());
ASSERT_EQ(1, peb.messageCount());

bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true);
bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true);
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(sId, queueSp);
Expand Down Expand Up @@ -506,7 +506,7 @@ static void test4_subscriptionHandle()
ASSERT_LT(payload.length(), peb.eventSize());
ASSERT_EQ(1, peb.messageCount());

bmqp::Event bmqpEvent(&peb.blob(), s_allocator_p, true);
bmqp::Event bmqpEvent(peb.blob().get(), s_allocator_p, true);
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(defaultSubscriptionId, queueSp);
Expand Down Expand Up @@ -548,7 +548,7 @@ static void test4_subscriptionHandle()
ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS);
ASSERT_EQ(1, builder.messageCount());

bmqp::Event bmqpEvent(&builder.blob(), s_allocator_p);
bmqp::Event bmqpEvent(builder.blob().get(), s_allocator_p);
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(queueSp);
Expand Down Expand Up @@ -582,7 +582,7 @@ static void test4_subscriptionHandle()
ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS);
ASSERT_EQ(1, builder.messageCount());

bmqp::Event bmqpEvent(&builder.blob(), s_allocator_p);
bmqp::Event bmqpEvent(builder.blob().get(), s_allocator_p);
implPtr->configureAsMessageEvent(bmqpEvent);

implPtr->insertQueue(queueSp);
Expand Down
4 changes: 2 additions & 2 deletions src/groups/bmq/bmqa/bmqa_messageevent.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ static void test2_ackMesageIteratorTest()
PVV("Appending messages");
appendMessages(&builder, &messages, k_NUM_MSGS);

bmqp::Event rawEvent(&builder.blob(), s_allocator_p);
bmqp::Event rawEvent(builder.blob().get(), s_allocator_p);

bsl::shared_ptr<bmqimp::Event> eventImpl;
eventImpl.createInplace(s_allocator_p, &bufferFactory, s_allocator_p);
Expand Down Expand Up @@ -239,7 +239,7 @@ static void test3_putMessageIteratorTest()
ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS);
}

bmqp::Event rawEvent(&builder.blob(), s_allocator_p);
bmqp::Event rawEvent(builder.blob().get(), s_allocator_p);

bsl::shared_ptr<bmqimp::Event> eventImpl;
eventImpl.createInplace(s_allocator_p, &bufferFactory, s_allocator_p);
Expand Down
8 changes: 4 additions & 4 deletions src/groups/bmq/bmqa/bmqa_mocksession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ Event MockSessionUtil::createAckEvent(const bsl::vector<AckParams>& acks,

// TODO: deprecate `createAckEvent` with bufferFactory arg and introduce
// another function with BlobSpPool arg.
bmqa::Session::BlobSpPool blobSpPool(
BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(bufferFactory, allocator));

bmqp::AckEventBuilder ackBuilder(&blobSpPool, alloc);
Expand All @@ -318,7 +318,7 @@ Event MockSessionUtil::createAckEvent(const bsl::vector<AckParams>& acks,
}

implPtr->configureAsMessageEvent(
bmqp::Event(&ackBuilder.blob(), alloc, true));
bmqp::Event(ackBuilder.blob().get(), alloc, true));
for (size_t i = 0; i != acks.size(); ++i) {
implPtr->addCorrelationId(acks[i].d_correlationId);
}
Expand All @@ -343,7 +343,7 @@ Event MockSessionUtil::createPushEvent(

// TODO: deprecate `createPushEvent` with bufferFactory arg and introduce
// another function with BlobSpPool arg.
bmqa::Session::BlobSpPool blobSpPool(
BlobSpPool blobSpPool(
bmqp::BlobPoolUtil::createBlobPool(bufferFactory, allocator));

bmqp::PushEventBuilder pushBuilder(&blobSpPool, alloc);
Expand Down Expand Up @@ -379,7 +379,7 @@ Event MockSessionUtil::createPushEvent(
implPtr->addCorrelationId(bmqt::CorrelationId());
}

bmqp::Event bmqpEvent(&pushBuilder.blob(), alloc, true);
bmqp::Event bmqpEvent(pushBuilder.blob().get(), alloc, true);
implPtr->configureAsMessageEvent(bmqpEvent);

return event;
Expand Down
18 changes: 15 additions & 3 deletions src/groups/bmq/bmqa/bmqa_mocksession.h
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@
#include <bdlb_variant.h>
#include <bdlbb_blob.h>
#include <bdlbb_pooledblobbufferfactory.h>
#include <bdlcc_sharedobjectpool.h>
#include <bsl_cstddef.h>
#include <bsl_deque.h>
#include <bsl_functional.h>
Expand Down Expand Up @@ -607,6 +608,13 @@ struct MockSessionUtil {
private:
// PRIVATE TYPES

/// Pool of shared pointers to Blobs
typedef bdlcc::SharedObjectPool<
bdlbb::Blob,
bdlcc::ObjectPoolFunctors::DefaultCreator,
bdlcc::ObjectPoolFunctors::RemoveAll<bdlbb::Blob> >
BlobSpPool;

/// Event impl shared pointer to access
/// the pimpl of `bmqa::Event`.
typedef bsl::shared_ptr<bmqimp::Event> EventImplSp;
Expand Down Expand Up @@ -751,9 +759,6 @@ class MockSession : public AbstractSession {
public:
// TYPES

/// Pool of shared pointers to Blobs
typedef bmqa::Session::BlobSpPool BlobSpPool;

// CLASS METHODS

/// Perform a one time initialization needed by components used in
Expand Down Expand Up @@ -803,6 +808,13 @@ class MockSession : public AbstractSession {

// PRIVATE TYPES

/// Pool of shared pointers to Blobs
typedef bdlcc::SharedObjectPool<
bdlbb::Blob,
bdlcc::ObjectPoolFunctors::DefaultCreator,
bdlcc::ObjectPoolFunctors::RemoveAll<bdlbb::Blob> >
BlobSpPool;

/// Aligned buffer holding the two key hash map
typedef bsls::AlignedBuffer<k_MAX_SIZEOF_BMQC_TWOKEYHASHMAP>
TwoKeyHashMapBuffer;
Expand Down
8 changes: 0 additions & 8 deletions src/groups/bmq/bmqa/bmqa_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,6 @@

// BDE
#include <ball_log.h>
#include <bdlcc_sharedobjectpool.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bslma_allocator.h>
Expand Down Expand Up @@ -680,13 +679,6 @@ class Session : public AbstractSession {
public:
// TYPES

/// Pool of shared pointers to Blobs
typedef bdlcc::SharedObjectPool<
bdlbb::Blob,
bdlcc::ObjectPoolFunctors::DefaultCreator,
bdlcc::ObjectPoolFunctors::RemoveAll<bdlbb::Blob> >
BlobSpPool;

/// Invoked as a response to an asynchronous open queue operation,
/// `OpenQueueCallback` is an alias for a callback function object
/// (functor) that takes as an argument the specified `result`,
Expand Down
1 change: 0 additions & 1 deletion src/groups/bmq/bmqimp/bmqimp_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ Application::Application(
NegotiatedChannelFactoryConfig(&d_statChannelFactory,
negotiationMessage,
sessionOptions.connectTimeout(),
&d_blobBufferFactory,
&d_blobSpPool,
allocator),
allocator)
Expand Down
8 changes: 4 additions & 4 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4530,7 +4530,7 @@ void BrokerSession::transferAckEvent(bmqp::AckEventBuilder* ackBuilder,
// Our event is full at this point so send this ack event to the user and
// reset the builder to append the ack that was rejected.

bmqp::Event event(&ackBuilder->blob(), d_allocator_p, true);
bmqp::Event event(ackBuilder->blob().get(), d_allocator_p, true);
// clone = true
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
d_messageDumper.isEventDumpEnabled<bmqp::EventType::e_ACK>())) {
Expand Down Expand Up @@ -4820,7 +4820,7 @@ bool BrokerSession::appendOrSend(
if (result == bmqt::EventBuilderResult::e_PAYLOAD_TOO_BIG) {
// Send the current event, reset the builder.
bmqt::GenericResult::Enum res = writeOrBuffer(
builder.blob(),
*builder.blob(),
d_sessionOptions.channelHighWatermark());

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
Expand Down Expand Up @@ -4943,7 +4943,7 @@ void BrokerSession::retransmitPendingMessages()
// Send the final PUT event if there are any messages in the builder
if (putBuilder.messageCount()) {
bmqt::GenericResult::Enum res = writeOrBuffer(
putBuilder.blob(),
*putBuilder.blob(),
d_sessionOptions.channelHighWatermark());

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
Expand Down Expand Up @@ -7330,7 +7330,7 @@ int BrokerSession::confirmMessage(const bsl::shared_ptr<bmqimp::Queue>& queue,
<< rc;
return rc; // RETURN
}
bool isAccepted = acceptUserEvent(builder.blob(), timeout);
bool isAccepted = acceptUserEvent(*builder.blob(), timeout);
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(!isAccepted)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

Expand Down
Loading

0 comments on commit d330e16

Please sign in to comment.