Skip to content

Commit

Permalink
Perf[MQB/BMQ]: pass shared pointers to Blobs
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Oct 25, 2024
1 parent bb7d697 commit 093fcc5
Show file tree
Hide file tree
Showing 39 changed files with 634 additions and 449 deletions.
6 changes: 3 additions & 3 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5609,7 +5609,7 @@ void BrokerSession::actionResumeHealthSensitiveQueues()
bmqt::GenericResult::Enum
BrokerSession::requestWriterCb(const RequestManagerType::RequestSp& context,
const bmqp::QueueId& queueId,
const bdlbb::Blob& blob,
const bsl::shared_ptr<bdlbb::Blob>& blob,
bsls::Types::Int64 watermark)
{
// executed by the FSM thread
Expand All @@ -5621,7 +5621,7 @@ BrokerSession::requestWriterCb(const RequestManagerType::RequestSp& context,

if (isBuffered) {
const bmqt::MessageGUID guid =
d_messageCorrelationIdContainer.add(context, queueId, blob);
d_messageCorrelationIdContainer.add(context, queueId, *blob);
char guidHex[bmqt::MessageGUID::e_SIZE_HEX];
guid.toHex(guidHex);
context->adoptUserData(
Expand All @@ -5640,7 +5640,7 @@ BrokerSession::requestWriterCb(const RequestManagerType::RequestSp& context,
return bmqt::GenericResult::e_SUCCESS; // RETURN
}

bmqt::GenericResult::Enum res = writeOrBuffer(blob, watermark);
bmqt::GenericResult::Enum res = writeOrBuffer(*blob, watermark);

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
res != bmqt::GenericResult::e_SUCCESS && isBuffered)) {
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqimp/bmqimp_brokersession.h
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,7 @@ class BrokerSession BSLS_CPP11_FINAL {
bmqt::GenericResult::Enum
requestWriterCb(const RequestManagerType::RequestSp& context,
const bmqp::QueueId& queueId,
const bdlbb::Blob& blob,
const bsl::shared_ptr<bdlbb::Blob>& blob,
bsls::Types::Int64 highWatermark);

/// Write the specified `blob` into the channel providing the specified
Expand Down
47 changes: 35 additions & 12 deletions src/groups/bmq/bmqp/bmqp_ackeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ namespace bmqp {

AckEventBuilder::AckEventBuilder(bdlbb::BlobBufferFactory* bufferFactory,
bslma::Allocator* allocator)
: d_blob(bufferFactory, allocator)
: d_allocator_p(bslma::Default::allocator(allocator))
, d_bufferFactory_p(bufferFactory)
, d_blob_sp(0, allocator) // initialized in `reset()`
, d_msgCount(0)
{
reset();
}

void AckEventBuilder::reset()
{
d_blob.removeAll();
d_blob_sp.createInplace(d_allocator_p, d_bufferFactory_p, d_allocator_p);
d_msgCount = 0;

// NOTE: Since AckEventBuilder owns the blob and we just reset it, we have
Expand All @@ -56,16 +58,16 @@ void AckEventBuilder::reset()
// Use placement new to create the object directly in the blob buffer,
// while still calling it's constructor (to memset memory and initialize
// some fields).
d_blob.setLength(sizeof(EventHeader) + sizeof(AckHeader));
BSLS_ASSERT_SAFE(d_blob.numDataBuffers() == 1 &&
d_blob_sp->setLength(sizeof(EventHeader) + sizeof(AckHeader));
BSLS_ASSERT_SAFE(d_blob_sp->numDataBuffers() == 1 &&
"The buffers allocated by the supplied bufferFactory "
"are too small");

// EventHeader
new (d_blob.buffer(0).data()) EventHeader(EventType::e_ACK);
new (d_blob_sp->buffer(0).data()) EventHeader(EventType::e_ACK);

// AckHeader
new (d_blob.buffer(0).data() + sizeof(EventHeader)) AckHeader();
new (d_blob_sp->buffer(0).data() + sizeof(EventHeader)) AckHeader();
}

bmqt::EventBuilderResult::Enum
Expand All @@ -82,9 +84,9 @@ AckEventBuilder::appendMessage(int status,

// Resize the blob to have space for an 'AckMessage' at the end ...
bmqu::BlobPosition offset;
bmqu::BlobUtil::reserve(&offset, &d_blob, sizeof(AckMessage));
bmqu::BlobUtil::reserve(&offset, d_blob_sp.get(), sizeof(AckMessage));

bmqu::BlobObjectProxy<AckMessage> ackMessage(&d_blob,
bmqu::BlobObjectProxy<AckMessage> ackMessage(d_blob_sp.get(),
offset,
false, // no read
true); // write mode
Expand All @@ -107,7 +109,7 @@ AckEventBuilder::appendMessage(int status,
const bdlbb::Blob& AckEventBuilder::blob() const
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_blob.length() <= EventHeader::k_MAX_SIZE_SOFT);
BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT);

// Empty event
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) {
Expand All @@ -117,10 +119,31 @@ const bdlbb::Blob& AckEventBuilder::blob() const

// Fix packet's length in header now that we know it. Following is valid
// (see comment in reset).
EventHeader& eh = *reinterpret_cast<EventHeader*>(d_blob.buffer(0).data());
eh.setLength(d_blob.length());
EventHeader& eh = *reinterpret_cast<EventHeader*>(
d_blob_sp->buffer(0).data());
eh.setLength(d_blob_sp->length());

return d_blob;
return *d_blob_sp;
}

bsl::shared_ptr<bdlbb::Blob> AckEventBuilder::blob_sp() const
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT);

// Empty event
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
return bsl::shared_ptr<bdlbb::Blob>(); // RETURN
}

// Fix packet's length in header now that we know it. Following is valid
// (see comment in reset).
EventHeader& eh = *reinterpret_cast<EventHeader*>(
d_blob_sp->buffer(0).data());
eh.setLength(d_blob_sp->length());

return d_blob_sp;
}

} // close package namespace
Expand Down
16 changes: 11 additions & 5 deletions src/groups/bmq/bmqp/bmqp_ackeventbuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ namespace bmqp {
class AckEventBuilder BSLS_CPP11_FINAL {
private:
// DATA
mutable bdlbb::Blob d_blob; // blob being built by this object.
// This has been done mutable to be able to
// skip writing the length until the blob
// is retrieved.
bslma::Allocator* d_allocator_p;

bdlbb::BlobBufferFactory* d_bufferFactory_p;

/// Blob being built by this object.
/// `mutable` to skip writing the length until the blob is retrieved.
mutable bsl::shared_ptr<bdlbb::Blob> d_blob_sp;

int d_msgCount; // number of messages currently in the
// event

Expand Down Expand Up @@ -141,6 +145,8 @@ class AckEventBuilder BSLS_CPP11_FINAL {
/// by this event. If no messages were added, this will return an empty
/// blob, i.e., a blob with length == 0.
const bdlbb::Blob& blob() const;

bsl::shared_ptr<bdlbb::Blob> blob_sp() const;
};

// ============================================================================
Expand Down Expand Up @@ -171,7 +177,7 @@ inline int AckEventBuilder::eventSize() const
return 0; // RETURN
}

return d_blob.length();
return d_blob_sp->length();
}

} // close package namespace
Expand Down
47 changes: 35 additions & 12 deletions src/groups/bmq/bmqp/bmqp_confirmeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ namespace bmqp {
ConfirmEventBuilder::ConfirmEventBuilder(
bdlbb::BlobBufferFactory* bufferFactory,
bslma::Allocator* allocator)
: d_blob(bufferFactory, allocator)
: d_allocator_p(bslma::Default::allocator(allocator))
, d_bufferFactory_p(bufferFactory)
, d_blob_sp(0, allocator) // initialized in `reset()`
, d_msgCount(0)
{
reset();
}

void ConfirmEventBuilder::reset()
{
d_blob.removeAll();
d_blob_sp.createInplace(d_allocator_p, d_bufferFactory_p, d_allocator_p);

d_msgCount = 0;

Expand All @@ -58,16 +60,16 @@ void ConfirmEventBuilder::reset()
// ConfirmHeader Use placement new to create the object directly in the
// blob buffer, while still calling it's constructor (to memset memory and
// initialize some fields).
d_blob.setLength(sizeof(EventHeader) + sizeof(ConfirmHeader));
BSLS_ASSERT_SAFE(d_blob.numDataBuffers() == 1 &&
d_blob_sp->setLength(sizeof(EventHeader) + sizeof(ConfirmHeader));
BSLS_ASSERT_SAFE(d_blob_sp->numDataBuffers() == 1 &&
"The buffers allocated by the supplied bufferFactory "
"are too small");

// EventHeader
new (d_blob.buffer(0).data()) EventHeader(EventType::e_CONFIRM);
new (d_blob_sp->buffer(0).data()) EventHeader(EventType::e_CONFIRM);

// ConfirmHeader
new (d_blob.buffer(0).data() + sizeof(EventHeader)) ConfirmHeader();
new (d_blob_sp->buffer(0).data() + sizeof(EventHeader)) ConfirmHeader();
}

bmqt::EventBuilderResult::Enum
Expand All @@ -87,9 +89,9 @@ ConfirmEventBuilder::appendMessage(int queueId,

// Resize the blob to have space for an 'ConfirmMessage' at the end ...
bmqu::BlobPosition offset;
bmqu::BlobUtil::reserve(&offset, &d_blob, sizeof(ConfirmMessage));
bmqu::BlobUtil::reserve(&offset, d_blob_sp.get(), sizeof(ConfirmMessage));

bmqu::BlobObjectProxy<ConfirmMessage> confirmMessage(&d_blob,
bmqu::BlobObjectProxy<ConfirmMessage> confirmMessage(d_blob_sp.get(),
offset,
false, // no read
true); // write mode
Expand All @@ -111,7 +113,7 @@ ConfirmEventBuilder::appendMessage(int queueId,
const bdlbb::Blob& ConfirmEventBuilder::blob() const
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_blob.length() <= EventHeader::k_MAX_SIZE_SOFT);
BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT);

// Empty event
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) {
Expand All @@ -121,10 +123,31 @@ const bdlbb::Blob& ConfirmEventBuilder::blob() const

// Fix packet's length in header now that we know it. Following is valid
// (see comment in reset).
EventHeader& eh = *reinterpret_cast<EventHeader*>(d_blob.buffer(0).data());
eh.setLength(d_blob.length());
EventHeader& eh = *reinterpret_cast<EventHeader*>(
d_blob_sp->buffer(0).data());
eh.setLength(d_blob_sp->length());

return d_blob;
return *d_blob_sp;
}

bsl::shared_ptr<bdlbb::Blob> ConfirmEventBuilder::blob_sp() const
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_blob_sp->length() <= EventHeader::k_MAX_SIZE_SOFT);

// Empty event
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(messageCount() == 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
return bsl::shared_ptr<bdlbb::Blob>(); // RETURN
}

// Fix packet's length in header now that we know it. Following is valid
// (see comment in reset).
EventHeader& eh = *reinterpret_cast<EventHeader*>(
d_blob_sp->buffer(0).data());
eh.setLength(d_blob_sp->length());

return d_blob_sp;
}

} // close package namespace
Expand Down
18 changes: 13 additions & 5 deletions src/groups/bmq/bmqp/bmqp_confirmeventbuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,16 @@ namespace bmqp {
class ConfirmEventBuilder {
private:
// DATA
mutable bdlbb::Blob d_blob; // blob being built by this object
// This has been done mutable to be able to
// skip writing the length until the blob
// is retrieved.
/// Allocator to use.
bslma::Allocator* d_allocator_p;

/// Buffer factory used for blob construction.
bdlbb::BlobBufferFactory* d_bufferFactory_p;

/// Blob being built by this object.
/// `mutable` to skip writing the length until the blob is retrieved.
mutable bsl::shared_ptr<bdlbb::Blob> d_blob_sp;

int d_msgCount; // number of messages currently in the
// event

Expand Down Expand Up @@ -140,6 +146,8 @@ class ConfirmEventBuilder {
/// by this event. If no messages were added, this will return an empty
/// blob, i.e., a blob with length == 0.
const bdlbb::Blob& blob() const;

bsl::shared_ptr<bdlbb::Blob> blob_sp() const;
};

// ============================================================================
Expand Down Expand Up @@ -170,7 +178,7 @@ inline int ConfirmEventBuilder::eventSize() const
return 0; // RETURN
}

return d_blob.length();
return d_blob_sp->length();
}

} // close package namespace
Expand Down
Loading

0 comments on commit 093fcc5

Please sign in to comment.