diff --git a/src/groups/bmq/bmqp/bmqp_protocol.h b/src/groups/bmq/bmqp/bmqp_protocol.h index dd3f97877a..1125df8578 100644 --- a/src/groups/bmq/bmqp/bmqp_protocol.h +++ b/src/groups/bmq/bmqp/bmqp_protocol.h @@ -3598,15 +3598,8 @@ inline RdaInfo& RdaInfo::setPotentiallyPoisonous(bool flag) inline RdaInfo& RdaInfo::setCounter(unsigned int counter) { BSLS_ASSERT_SAFE(counter <= k_MAX_COUNTER_VALUE); - if (isUnlimited()) { - d_counter &= ~e_UNLIMITED; - d_counter += counter; - } - else { - d_counter &= ~k_MAX_COUNTER_VALUE; - d_counter += counter; - } - + // Drop e_UNLIMITED bit flag if set, but save e_POISONOUS bit + d_counter = counter | (d_counter & e_POISONOUS); return *this; } diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 34e8945cd9..0ad94651d3 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -47,6 +47,7 @@ #include // BDE +#include #include #include #include @@ -65,6 +66,17 @@ namespace BloombergLP { namespace mqbblp { +namespace { + +const int k_MAX_INSTANT_MESSAGES = 10; +// Maximum messages logged with throttling in a short period of time. + +const bsls::Types::Int64 k_NS_PER_MESSAGE = + bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE / k_MAX_INSTANT_MESSAGES; +// Time interval between messages logged with throttling. + +} // close unnamed namespace + // --------------------- // class RootQueueEngine // --------------------- @@ -1356,6 +1368,62 @@ int RootQueueEngine::onRejectMessage(mqbi::QueueHandle* handle, d_queueState_p->storage()->getIterator(&message, appKey, msgGUID); if (storageRc == mqbi::StorageResult::e_SUCCESS) { + // Handle 'maxDeliveryAttempts' parameter reconfigure. + // To do this, we need to take care of the following: + // 1. Change the default 'maxDeliveryAttempts' in message storage + // classes, so it is set as a default for any new message iterator. + // 2. If possible, update the existing messages with the new value + // of 'maxDeliveryAttempts'. + // This code does the 2nd part. We need this fix to be able to get rid + // of poisonous messages already stored in a working cluster, without + // bouncing off. + // + // We use the domain's 'maxDeliveryAttempts' as a baseline to compare + // with each specific message's 'rdaInfo', and there might be a few + // cases: + // +=====================+===========+===============================+ + // | maxDeliveryAttempts | rdaInfo | Action: | + // +=====================+===========+===============================+ + // | Unlimited | Unlimited | Do nothing (same value) | + // +---------------------+-----------+-------------------------------+ + // | Unlimited | Limited | Set 'rdaInfo' to unlimited | + // +---------------------+-----------+-------------------------------+ + // | Limited | Unlimited | Set 'rdaInfo' to limited | + // | | | 'maxDeliveryAttempts' | + // +---------------------+-----------+-------------------------------+ + // | Limited | Limited | Do nothing (not possible to | + // | | | deduce what to do in general) | + // +---------------------+-----------+-------------------------------+ + // So this code handles only the situation when we want to switch + // 'rdaInfo' between limited and unlimited. + // + // See also: mqbblp_relayqueueengine + // Note that RelayQueueEngine doesn't contain a similar code to fix + // 'rdaInfo'. This is because we work with an assumption that if we + // have a poisonous message, all the consumers will crash anyway, so a + // replica/proxy will free the corresponding handles, and all message + // iterators will be recreated with the correct 'rdaInfo' received from + // primary, if a new consumer connects to the replica/proxy. + const int maxDeliveryAttempts = + d_queueState_p->domain()->config().maxDeliveryAttempts(); + const bool domainIsUnlimited = (maxDeliveryAttempts == 0); + if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( + domainIsUnlimited != message->rdaInfo().isUnlimited())) { + BSLS_PERFORMANCEHINT_UNLIKELY_HINT; + + BALL_LOGTHROTTLE_WARN(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) + << "[THROTTLED] Mismatch between the message's RdaInfo " + << message->rdaInfo() << " and the domain's " + << "'maxDeliveryAttempts' setting [" << maxDeliveryAttempts + << "], updating message's RdaInfo"; + if (maxDeliveryAttempts > 0) { + message->rdaInfo().setCounter(maxDeliveryAttempts); + } + else { + message->rdaInfo().setUnlimited(); + } + } + counter = message->rdaInfo().counter(); if (d_throttledRejectedMessages.requestPermission()) { diff --git a/src/integration-tests/test_reconfigure_domains.py b/src/integration-tests/test_reconfigure_domains.py index 5ee1ea6ed7..15a2340b38 100644 --- a/src/integration-tests/test_reconfigure_domains.py +++ b/src/integration-tests/test_reconfigure_domains.py @@ -2,6 +2,7 @@ Testing runtime reconfiguration of domains. """ import time +from typing import Optional import blazingmq.dev.it.testconstants as tc from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import @@ -10,6 +11,7 @@ multi_node, tweak, ) +from blazingmq.dev.it.process.admin import AdminClient from blazingmq.dev.it.process.client import Client pytestmark = order(6) @@ -36,7 +38,8 @@ def setup_cluster(self, cluster: Cluster): # Returns 'True' if all of them succeed, and a false-y value otherwise. def post_n_msgs(self, uri, n): results = ( - self.writer.post(uri, payload=["msg"], wait_ack=True) for _ in range(0, n) + self.writer.post(uri, payload=[f"msg{i}"], wait_ack=True) + for i in range(0, n) ) return all(res == Client.e_SUCCESS for res in results) @@ -296,7 +299,7 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster): @tweak.domain.max_delivery_attempts(0) def test_reconfigure_max_delivery_attempts(self, multi_node: Cluster): - URI = f"bmq://{tc.DOMAIN_PRIORITY}/my-queue" + URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda" proxy = next(multi_node.proxy_cycle()) # Open the queue through the writer. @@ -306,12 +309,12 @@ def do_test(expect_success): # Write one message to 'URI'. self.post_n_msgs(URI, 1) - # Open, read, and kill three consumers in sequence. + # Open, read, and kill five consumers in sequence. for idx in range(0, 5): client = proxy.create_client(f"reader-unstable-{idx}") client.open(URI, flags=["read"], succeed=True) client.check_exit_code = False - client.wait_push_event() + client.wait_push_event(timeout=5) client.kill() client.wait() @@ -322,7 +325,7 @@ def do_test(expect_success): if expect_success: client.confirm(URI, "+1", succeed=True) else: - assert not client.wait_push_event() + assert not client.wait_push_event(timeout=5) client.stop_session(block=True) # Expect that message will not expire after failed deliveries. @@ -336,3 +339,111 @@ def do_test(expect_success): # Expect that message will expire after failed deliveries. do_test(False) + + @tweak.domain.max_delivery_attempts(0) + def test_reconfigure_max_delivery_attempts_on_existing_messages( + self, multi_node: Cluster + ) -> None: + cluster: Cluster = multi_node + + # Stage 1: data preparation + # On this stage, we open a producer to a queue and post 5 messages + # with serial payloads: ["msg0", "msg1", "msg2", "msg3", "msg4"]. + # We consider the very first message "msg0" poisonous. + URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda-on-existing-msgs" + proxy = next(multi_node.proxy_cycle()) + + self.writer.open(URI, flags=["write,ack"], succeed=True) + + # Post a sequence of messages with serial payloads: + assert self.post_n_msgs(URI, 5) + + poisoned_message: str = "msg0" + + def try_consume(consumer: Client) -> Optional[int]: + num_confirmed = 0 + while consumer.wait_push_event(timeout=1, quiet=True): + msgs = consumer.list(URI, block=True) + + assert len(msgs) == 1 + + if msgs[0].payload == poisoned_message: + # In this test, do not expect any messages confirmed before the poisoned one + assert num_confirmed == 0 + return None + + consumer.confirm(URI, "+1", succeed=True) + num_confirmed += 1 + return num_confirmed + + def run_consumers(max_attempts: int) -> int: + for idx in range(0, max_attempts): + consumer: Client = proxy.create_client(f"reader-unstable-{idx}") + consumer.open( + URI, flags=["read"], succeed=True, max_unconfirmed_messages=1 + ) + + num_confirmed = try_consume(consumer) + + # Each consumer either crashes on the very first poisoned message + # Or it confirms all the remaining non-poisoned messages and exits + if num_confirmed is None: + consumer.check_exit_code = False + consumer.kill() + consumer.wait() + continue + return num_confirmed + + # We return earlier if any messages confirmed + return 0 + + # Stage 2: try to consume messages without poison pill detection enabled. + # Expect all the attempts to process messages failed, since all the + # consumers will crash on the poisoned message "msg0". + # + # The timeline: + # Before: queue is ["msg0", "msg1", "msg2", "msg3", "msg4"] + # consumer1: ["msg0"] -> crash + # ... ... ... + # consumer10: ["msg0"] -> crash + # After: queue is ["msg0", "msg1", "msg2", "msg3", "msg4"] + num_confirmed = run_consumers(max_attempts=10) + assert 0 == num_confirmed + + # Stage 3: reconfigure maxDeliveryAttempts to enable poison pill detection. + # Use an admin session to validate that the setting change reached the broker. + host, port = cluster.admin_endpoint + + admin = AdminClient() + admin.connect(host, port) + + res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS") + assert '"maxDeliveryAttempts" : 0' in res + + cluster.config.domains[ + tc.DOMAIN_PRIORITY + ].definition.parameters.max_delivery_attempts = 5 + cluster.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True) + + res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS") + assert '"maxDeliveryAttempts" : 5' in res + + admin.stop() + + # Stage 4: try to consume messages with poison pill detection enabled. + # Expect first 5 consumers to controllably crash on the very first message "msg0", + # and the 6th consumer will not receive message "msg0" anymore, since it was removed + # from the queue as poisonous. As a result, 6th consumer will receive the rest of the + # messages one by one and confirm them. + # + # The timeline: + # Before: queue is ["msg0", "msg1", "msg2", "msg3", "msg4"] + # consumer1: ["msg0"] -> crash + # consumer2: ["msg0"] -> crash + # consumer3: ["msg0"] -> crash + # consumer4: ["msg0"] -> crash + # consumer5: ["msg0"] -> crash -> "msg0" removed as poisonous after 5 attempts + # consumer6: ["msg1", "msg2", "msg3", "msg4"] -> confirm + # After: queue is [] + num_confirmed = run_consumers(max_attempts=6) + assert 4 == num_confirmed diff --git a/src/python/blazingmq/dev/it/cluster.py b/src/python/blazingmq/dev/it/cluster.py index dc294070fd..f51285a78d 100644 --- a/src/python/blazingmq/dev/it/cluster.py +++ b/src/python/blazingmq/dev/it/cluster.py @@ -7,7 +7,7 @@ import logging import shutil import signal -from typing import Any, Dict, List, Optional, Union +from typing import Dict, List, Optional, Tuple, Union from pathlib import Path import blazingmq.dev.it.process.proc @@ -79,7 +79,7 @@ def __init__( logging.getLogger("blazingmq.test"), extra=log_extra ) - self.last_known_leader = None + self.last_known_leader: Optional[Broker] = None self._processes: Dict[str, Union[Broker, Client]] = {} self._nodes: List[Broker] = [] self._virtual_nodes: List[Broker] = [] @@ -101,6 +101,17 @@ def name(self) -> str: return self.config.name + @property + def admin_endpoint(self) -> Tuple[Optional[str], Optional[int]]: + """ + Return a tuple containing (host, port) of an admin endpoint of this cluster, if the + admin endpoint is not decided, return (None, None) tuple + """ + if not self.last_known_leader: + return None, None + + return self.last_known_leader.config.host, int(self.last_known_leader.config.port) + def start(self, wait_leader=True, wait_ready=False): """ Start all the nodes and proxies in the cluster.