Skip to content

Commit

Permalink
Fix: correctly set d_hasReceipts for FileBackedStorage on configure
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Nov 25, 2024
1 parent b9f5335 commit c0f8832
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
d_state_p->setStorage(storageMp);
}
else {
d_state_p->storage()->setConsistency(domainCfg.consistency());
rc = d_state_p->storage()->configure(errorDescription,
domainCfg.storage().config(),
domainCfg.storage().queueLimits(),
Expand Down
1 change: 1 addition & 0 deletions src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ void QueueEngineTester::init(const mqbconfm::Domain& domainConfig,
limits.messages() = bsl::numeric_limits<bsls::Types::Int64>::max();
limits.bytes() = bsl::numeric_limits<bsls::Types::Int64>::max();

storage_p->setConsistency(domainConfig.consistency());
rc = storage_p->configure(errorDescription,
config,
limits,
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription,
limits.messages() = bsl::numeric_limits<bsls::Types::Int64>::max();
limits.bytes() = bsl::numeric_limits<bsls::Types::Int64>::max();

storageMp->setConsistency(domainCfg.consistency());
int rc = storageMp->configure(errorDescription,
config,
limits,
Expand Down Expand Up @@ -243,6 +244,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription,
d_allocator_p);
}
else {
d_state_p->storage()->setConsistency(domainCfg.consistency());
rc = d_state_p->storage()->configure(
errorDescription,
domainCfg.storage().config(),
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbc/mqbc_storageutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3210,6 +3210,8 @@ int StorageUtil::makeStorage(bsl::ostream& errorDescription,
// Configure the storage. Note that if a queue calls 'makeStorage' twice,
// its storage will be configured twice as things are currently.

// Do not change consistency level of `storageSp`, use the one provided on
// construction instead.
const int rc = storageSp->configure(errorDescription,
storageDef.config(),
storageDef.queueLimits(),
Expand Down
4 changes: 4 additions & 0 deletions src/groups/mqb/mqbi/mqbi_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ class Storage {
const bsls::Types::Int64 messageTtl,
const int maxDeliveryAttempts) = 0;

/// Set the consistency level associated to this storage to the specified
/// `value`.
virtual bool setConsistency(const mqbconfm::Consistency& value) = 0;

virtual void setQueue(mqbi::Queue* queue) = 0;

/// Close this storage.
Expand Down
8 changes: 8 additions & 0 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ int FileBackedStorage::configure(
return 0;
}

bool FileBackedStorage::setConsistency(const mqbconfm::Consistency& value)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(value.isEventualValue() || value.isStrongValue());

d_hasReceipts = value.isEventualValue();
}

void FileBackedStorage::setQueue(mqbi::Queue* queue)
{
d_virtualStorageCatalog.setQueue(queue);
Expand Down
7 changes: 6 additions & 1 deletion src/groups/mqb/mqbs/mqbs_filebackedstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
bmqp::SchemaLearner::Context d_schemaLearnerContext;
// Context for replicated data.

const bool d_hasReceipts;
bool d_hasReceipts;

bmqt::MessageGUID d_currentlyAutoConfirming;
// Message being evaluated and possibly auto confirmed.
Expand Down Expand Up @@ -365,6 +365,11 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
const bsls::Types::Int64 messageTtl,
int maxDeliveryAttempts) BSLS_KEYWORD_OVERRIDE;

/// Set the consistency level associated to this storage to the specified
/// `value`.
bool
setConsistency(const mqbconfm::Consistency& value) BSLS_KEYWORD_OVERRIDE;

/// Return the resource capacity meter associated to this storage.
virtual mqbu::CapacityMeter* capacityMeter() BSLS_KEYWORD_OVERRIDE;

Expand Down
11 changes: 11 additions & 0 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ int InMemoryStorage::configure(
return 0;
}

bool InMemoryStorage::setConsistency(const mqbconfm::Consistency& value)
{
BALL_LOG_WARN_BLOCK
{
if (value.isStrongValue()) {
BALL_LOG_OUTPUT_STREAM << "Trying to configure strong consistency "
<< "for in-memory storage";
}
}
}

void InMemoryStorage::setQueue(mqbi::Queue* queue)
{
d_virtualStorageCatalog.setQueue(queue);
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
const bsls::Types::Int64 messageTtl,
const int maxDeliveryAttempts) BSLS_KEYWORD_OVERRIDE;

/// Set the consistency level associated to this storage to the specified
/// `value`.
bool
setConsistency(const mqbconfm::Consistency& value) BSLS_KEYWORD_OVERRIDE;

virtual void setQueue(mqbi::Queue* queue) BSLS_KEYWORD_OVERRIDE;

/// Close this storage.
Expand Down
2 changes: 1 addition & 1 deletion src/integration-tests/test_strong_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def test_change_consistency(self, multi_node: Cluster):
wait_ack=False,
)
# Cannot wait for too long because resumed nodes may disconnect
# because of missed hearbeats and fail to send the receipt.
# because of missed heartbeats and fail to send the receipt.
assert not self.consumer.wait_push_event(timeout=3)

# Verify that message is now delivered.
Expand Down

0 comments on commit c0f8832

Please sign in to comment.