From fbbb6958355483b5e69db6250f3cd45e100b78a4 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Tue, 25 Jun 2024 19:31:14 -0400 Subject: [PATCH] wait for unconfirmed before buffering confirms Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 20 +++++++++++++------ .../test_puts_retransmission.py | 14 +++++++++++-- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 8640c21f3..9d3999b7e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -2633,12 +2633,20 @@ void ClusterQueueHelper::notifyQueue(QueueContext* queueContext, } if (isOpen) { - queue->dispatcher()->execute( - bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream, - queue, - generationCount, - upstreamSubQueueId), - queue); + if (generationCount == 0) { + BALL_LOG_INFO << d_cluster_p->description() + << ": has deconfigured queue [" + << queueContext->uri() << "], subStream id [" + << upstreamSubQueueId << "]"; + } + else { + queue->dispatcher()->execute( + bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream, + queue, + generationCount, + upstreamSubQueueId), + queue); + } } else { queue->dispatcher()->execute( diff --git a/src/integration-tests/test_puts_retransmission.py b/src/integration-tests/test_puts_retransmission.py index 829d9860b..b27b08bd1 100644 --- a/src/integration-tests/test_puts_retransmission.py +++ b/src/integration-tests/test_puts_retransmission.py @@ -508,7 +508,12 @@ def test_shutdown_primary_keep_replica(self, multi_node: Cluster): # If shutting down primary, the replica needs to wait for new primary. self.active_node.wait_status(wait_leader=True, wait_ready=False) - self.inspect_results(allow_duplicates=False) + # Do allow duplicates for the scenario when a CONFIRM had passed Proxy + # but did not reach the replication. New Primary then redelivers and + # the Proxy cannot detect the duplicate because it had removed the GUID + # upon the first CONFIRM + + self.inspect_results(allow_duplicates=True) def test_shutdown_replica(self, multi_node: Cluster): self.setup_cluster_fanout(multi_node) @@ -521,7 +526,12 @@ def test_shutdown_replica(self, multi_node: Cluster): # Because the quorum is 3, cluster is still healthy after shutting down # replica. - self.inspect_results(allow_duplicates=False) + # Do allow duplicates for the scenario when a CONFIRM had passed Proxy + # but did not reach the replication. New Primary then redelivers and + # the Proxy cannot detect the duplicate because it had removed the GUID + # upon the first CONFIRM + + self.inspect_results(allow_duplicates=True) def test_kill_primary_convert_replica(self, multi_node: Cluster): self.setup_cluster_fanout(multi_node)