Skip to content

Commit

Permalink
Move printing logic to the PrinterManager
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Pryakhin <[email protected]>
  • Loading branch information
waldgange committed Nov 24, 2024
1 parent 03264da commit 0db568f
Show file tree
Hide file tree
Showing 11 changed files with 706 additions and 297 deletions.
6 changes: 6 additions & 0 deletions src/applications/bmqstoragetool/bmqstoragetool.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ parseArgs(CommandLineArguments& arguments, int argc, const char* argv[])
"path to a .bmq_csl file",
balcl::TypeInfo(&arguments.d_cslFile),
balcl::OccurrenceInfo::e_OPTIONAL},
{"print-mode",
"print mode",
"can be one of the following: [HUMAN, JSON_PRETTY, JSON_LINE]. "
"Defailt value is HUMAN",
balcl::TypeInfo(&arguments.d_printMode),
balcl::OccurrenceInfo::e_OPTIONAL},
{"guid",
"guid",
"message guid",
Expand Down
240 changes: 46 additions & 194 deletions src/applications/bmqstoragetool/m_bmqstoragetool_messagedetails.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,59 +53,11 @@ class AppKeyMatcher {
}
};

/// Print the specified message record `rec` and QueueInfo pointed by the
/// specified `queueInfo_p` to the specified `stream`, using the specified
/// `allocator` for memory allocation.
void printRecord(bsl::ostream& stream,
const mqbs::MessageRecord& rec,
const bmqp_ctrlmsg::QueueInfo* queueInfo_p,
bslma::Allocator* allocator)
{
bsl::vector<const char*> fields(allocator);
fields.push_back("PrimaryLeaseId");
fields.push_back("SequenceNumber");
fields.push_back("Timestamp");
fields.push_back("Epoch");
fields.push_back("FileKey");
fields.push_back("QueueKey");
if (queueInfo_p)
fields.push_back("QueueUri");
fields.push_back("RefCount");
fields.push_back("MsgOffsetDwords");
fields.push_back("GUID");
fields.push_back("Crc32c");

bmqu::AlignedPrinter printer(stream, &fields);
printer << rec.header().primaryLeaseId() << rec.header().sequenceNumber();

bsls::Types::Uint64 epochValue = rec.header().timestamp();
bdlt::Datetime datetime;
const int rc = bdlt::EpochUtil::convertFromTimeT64(&datetime, epochValue);
if (0 != rc) {
printer << 0;
}
else {
printer << datetime;
}

bmqu::MemOutStream fileKeyStr(allocator), queueKeyStr(allocator);
fileKeyStr << rec.fileKey();
queueKeyStr << rec.queueKey();

printer << epochValue << fileKeyStr.str() << queueKeyStr.str();
if (queueInfo_p)
printer << queueInfo_p->uri();
printer << rec.refCount() << rec.messageOffsetDwords() << rec.messageGUID()
<< rec.crc32c();

stream << "\n";
}

/// Find AppId in the specified `appIds` by the specified `appKey` and store
/// the result in the specified `appId`. Return `true` on success and `false
/// otherwise.
bool findQueueAppIdByAppKey(
bsl::string* appId,
static bool findQueueAppIdByAppKey(
bsl::string_view* appId,
const bsl::vector<BloombergLP::bmqp_ctrlmsg::AppIdInfo>& appIds,
const mqbu::StorageKey& appKey)
{
Expand All @@ -125,110 +77,6 @@ bool findQueueAppIdByAppKey(
return false;
}

/// Print the specified confirm record `rec` and QueueInfo pointed by the
/// specified `queueInfo_p` to the specified `stream`, using the specified
/// `allocator` for memory allocation.
void printRecord(bsl::ostream& stream,
const mqbs::ConfirmRecord& rec,
const bmqp_ctrlmsg::QueueInfo* queueInfo_p,
bslma::Allocator* allocator)
{
bsl::vector<const char*> fields(allocator);
fields.push_back("PrimaryLeaseId");
fields.push_back("SequenceNumber");
fields.push_back("Timestamp");
fields.push_back("Epoch");
fields.push_back("QueueKey");
if (queueInfo_p)
fields.push_back("QueueUri");
fields.push_back("AppKey");
if (queueInfo_p)
fields.push_back("AppId");
fields.push_back("GUID");

bmqu::MemOutStream queueKeyStr(allocator), appKeyStr(allocator);
queueKeyStr << rec.queueKey();

if (rec.appKey().isNull()) {
appKeyStr << "** NULL **";
}
else {
appKeyStr << rec.appKey();
}

bsl::string appIdStr;
if (queueInfo_p) {
if (!findQueueAppIdByAppKey(&appIdStr,
queueInfo_p->appIds(),
rec.appKey())) {
appIdStr = "** NULL **";
}
}

bmqu::AlignedPrinter printer(stream, &fields);
printer << rec.header().primaryLeaseId() << rec.header().sequenceNumber();

bsls::Types::Uint64 epochValue = rec.header().timestamp();
bdlt::Datetime datetime;
const int rc = bdlt::EpochUtil::convertFromTimeT64(&datetime, epochValue);
if (0 != rc) {
printer << 0;
}
else {
printer << datetime;
}

printer << epochValue << queueKeyStr.str();
if (queueInfo_p)
printer << queueInfo_p->uri();
printer << appKeyStr.str();
if (queueInfo_p)
printer << appIdStr;
printer << rec.messageGUID();
stream << "\n";
}

/// Print the specified delete record `rec` and QueueInfo pointed by the
/// specified `queueInfo_p` to the specified `stream`, using the specified
/// `allocator` for memory allocation.
void printRecord(bsl::ostream& stream,
const mqbs::DeletionRecord& rec,
const bmqp_ctrlmsg::QueueInfo* queueInfo_p,
bslma::Allocator* allocator)
{
bsl::vector<const char*> fields(allocator);
fields.push_back("PrimaryLeaseId");
fields.push_back("SequenceNumber");
fields.push_back("Timestamp");
fields.push_back("Epoch");
fields.push_back("QueueKey");
if (queueInfo_p)
fields.push_back("QueueUri");
fields.push_back("DeletionFlag");
fields.push_back("GUID");

bmqu::MemOutStream queueKeyStr(allocator);
queueKeyStr << rec.queueKey();

bmqu::AlignedPrinter printer(stream, &fields);
printer << rec.header().primaryLeaseId() << rec.header().sequenceNumber();

bsls::Types::Uint64 epochValue = rec.header().timestamp();
bdlt::Datetime datetime;
const int rc = bdlt::EpochUtil::convertFromTimeT64(&datetime, epochValue);
if (0 != rc) {
printer << 0;
}
else {
printer << datetime;
}
printer << epochValue << queueKeyStr.str();
if (queueInfo_p)
printer << queueInfo_p->uri();
printer << rec.deletionRecordFlag() << rec.messageGUID();
stream << "\n";
}

} // close unnamed namespace

// =====================
Expand All @@ -239,13 +87,22 @@ void printRecord(bsl::ostream& stream,
MessageDetails::MessageDetails(const mqbs::MessageRecord& record,
bsls::Types::Uint64 recordIndex,
bsls::Types::Uint64 recordOffset,
const QueueMap& queueMap,
bslma::Allocator* allocator)
: d_messageRecord(
RecordDetails<mqbs::MessageRecord>(record, recordIndex, recordOffset))
, d_confirmRecords(allocator)
, d_deleteRecord()
, d_queueInfo_p(0)
, d_allocator_p(allocator)
{
// NOTHING
// Check if queueInfo is present for queue key
bmqp_ctrlmsg::QueueInfo queueInfo(d_allocator_p);
if (queueMap.findInfoByKey(&queueInfo,
d_messageRecord.d_record.queueKey())) {
d_queueInfo_p = &queueInfo;
d_messageRecord.d_queueUri = d_queueInfo_p->uri();
}
}

void MessageDetails::addConfirmRecord(const mqbs::ConfirmRecord& record,
Expand All @@ -254,62 +111,57 @@ void MessageDetails::addConfirmRecord(const mqbs::ConfirmRecord& record,
{
d_confirmRecords.push_back(
RecordDetails<mqbs::ConfirmRecord>(record, recordIndex, recordOffset));
if (d_queueInfo_p) {
RecordDetails<mqbs::ConfirmRecord>& details =
*d_confirmRecords.rbegin();
details.d_queueUri = d_queueInfo_p->uri();
if (!findQueueAppIdByAppKey(&details.d_appId,
d_queueInfo_p->appIds(),
record.appKey())) {
details.d_appId = "** NULL **";
}
}
}

void MessageDetails::addDeleteRecord(const mqbs::DeletionRecord& record,
bsls::Types::Uint64 recordIndex,
bsls::Types::Uint64 recordOffset)
{
d_deleteRecord = RecordDetails<mqbs::DeletionRecord>(record,
recordIndex,
recordOffset);
d_deleteRecord.makeValueInplace(
RecordDetails<mqbs::DeletionRecord>(record,
recordIndex,
recordOffset));
if (d_queueInfo_p)
d_deleteRecord->d_queueUri = d_queueInfo_p->uri();
}

void MessageDetails::print(bsl::ostream& os, const QueueMap& queueMap) const
unsigned int MessageDetails::dataRecordOffset() const
{
// Check if queueInfo is present for queue key
bmqp_ctrlmsg::QueueInfo queueInfo(d_allocator_p);
const bool queueInfoPresent = queueMap.findInfoByKey(
&queueInfo,
d_messageRecord.d_record.queueKey());
bmqp_ctrlmsg::QueueInfo* queueInfo_p = queueInfoPresent ? &queueInfo : 0;

// Print message record
bmqu::MemOutStream ss(d_allocator_p);
ss << "MESSAGE Record, index: " << d_messageRecord.d_recordIndex
<< ", offset: " << d_messageRecord.d_recordOffset;
bsl::string delimiter(ss.length(), '=', d_allocator_p);
os << delimiter << '\n' << ss.str() << '\n';

printRecord(os, d_messageRecord.d_record, queueInfo_p, d_allocator_p);
return d_messageRecord.d_record.messageOffsetDwords();
}

// Print confirmations records
if (!d_confirmRecords.empty()) {
bsl::vector<RecordDetails<mqbs::ConfirmRecord> >::const_iterator it =
d_confirmRecords.begin();
for (; it != d_confirmRecords.end(); ++it) {
os << "CONFIRM Record, index: " << it->d_recordIndex
<< ", offset: " << it->d_recordOffset << '\n';
printRecord(os, it->d_record, queueInfo_p, d_allocator_p);
}
}
bsls::Types::Uint64 MessageDetails::messageRecordIndex() const
{
return d_messageRecord.d_recordIndex;
}

// Print deletion record
if (d_deleteRecord.d_isValid) {
os << "DELETE Record, index: " << d_deleteRecord.d_recordIndex
<< ", offset: " << d_deleteRecord.d_recordOffset << '\n';
printRecord(os, d_deleteRecord.d_record, queueInfo_p, d_allocator_p);
}
const MessageDetails::RecordDetails<mqbs::MessageRecord>&
MessageDetails::messageRecord() const
{
return d_messageRecord;
}

unsigned int MessageDetails::dataRecordOffset() const
const bsl::vector<MessageDetails::RecordDetails<mqbs::ConfirmRecord> >&
MessageDetails::confirmRecords() const
{
return d_messageRecord.d_record.messageOffsetDwords();
return d_confirmRecords;
}

bsls::Types::Uint64 MessageDetails::messageRecordIndex() const
const bdlb::NullableValue<
MessageDetails::RecordDetails<mqbs::DeletionRecord> >&
MessageDetails::deleteRecord() const
{
return d_messageRecord.d_recordIndex;
return d_deleteRecord;
}

} // close package namespace
Expand Down
Loading

0 comments on commit 0db568f

Please sign in to comment.