Skip to content

Commit

Permalink
Fix: reconfigure poison pill on existing messages (bloomberg#256)
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 authored and alexander-e1off committed Oct 24, 2024
1 parent ef40daa commit 285d209
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 16 deletions.
11 changes: 2 additions & 9 deletions src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -3598,15 +3598,8 @@ inline RdaInfo& RdaInfo::setPotentiallyPoisonous(bool flag)
inline RdaInfo& RdaInfo::setCounter(unsigned int counter)
{
BSLS_ASSERT_SAFE(counter <= k_MAX_COUNTER_VALUE);
if (isUnlimited()) {
d_counter &= ~e_UNLIMITED;
d_counter += counter;
}
else {
d_counter &= ~k_MAX_COUNTER_VALUE;
d_counter += counter;
}

// Drop e_UNLIMITED bit flag if set, but save e_POISONOUS bit
d_counter = counter | (d_counter & e_POISONOUS);
return *this;
}

Expand Down
68 changes: 68 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <mwcu_outstreamformatsaver.h>

// BDE
#include <ball_logthrottle.h>
#include <bdlb_print.h>
#include <bdlb_scopeexit.h>
#include <bdlb_string.h>
Expand All @@ -65,6 +66,17 @@
namespace BloombergLP {
namespace mqbblp {

namespace {

const int k_MAX_INSTANT_MESSAGES = 10;
// Maximum messages logged with throttling in a short period of time.

const bsls::Types::Int64 k_NS_PER_MESSAGE =
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE / k_MAX_INSTANT_MESSAGES;
// Time interval between messages logged with throttling.

} // close unnamed namespace

// ---------------------
// class RootQueueEngine
// ---------------------
Expand Down Expand Up @@ -1356,6 +1368,62 @@ int RootQueueEngine::onRejectMessage(mqbi::QueueHandle* handle,
d_queueState_p->storage()->getIterator(&message, appKey, msgGUID);

if (storageRc == mqbi::StorageResult::e_SUCCESS) {
// Handle 'maxDeliveryAttempts' parameter reconfigure.
// To do this, we need to take care of the following:
// 1. Change the default 'maxDeliveryAttempts' in message storage
// classes, so it is set as a default for any new message iterator.
// 2. If possible, update the existing messages with the new value
// of 'maxDeliveryAttempts'.
// This code does the 2nd part. We need this fix to be able to get rid
// of poisonous messages already stored in a working cluster, without
// bouncing off.
//
// We use the domain's 'maxDeliveryAttempts' as a baseline to compare
// with each specific message's 'rdaInfo', and there might be a few
// cases:
// +=====================+===========+===============================+
// | maxDeliveryAttempts | rdaInfo | Action: |
// +=====================+===========+===============================+
// | Unlimited | Unlimited | Do nothing (same value) |
// +---------------------+-----------+-------------------------------+
// | Unlimited | Limited | Set 'rdaInfo' to unlimited |
// +---------------------+-----------+-------------------------------+
// | Limited | Unlimited | Set 'rdaInfo' to limited |
// | | | 'maxDeliveryAttempts' |
// +---------------------+-----------+-------------------------------+
// | Limited | Limited | Do nothing (not possible to |
// | | | deduce what to do in general) |
// +---------------------+-----------+-------------------------------+
// So this code handles only the situation when we want to switch
// 'rdaInfo' between limited and unlimited.
//
// See also: mqbblp_relayqueueengine
// Note that RelayQueueEngine doesn't contain a similar code to fix
// 'rdaInfo'. This is because we work with an assumption that if we
// have a poisonous message, all the consumers will crash anyway, so a
// replica/proxy will free the corresponding handles, and all message
// iterators will be recreated with the correct 'rdaInfo' received from
// primary, if a new consumer connects to the replica/proxy.
const int maxDeliveryAttempts =
d_queueState_p->domain()->config().maxDeliveryAttempts();
const bool domainIsUnlimited = (maxDeliveryAttempts == 0);
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
domainIsUnlimited != message->rdaInfo().isUnlimited())) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;

BALL_LOGTHROTTLE_WARN(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE)
<< "[THROTTLED] Mismatch between the message's RdaInfo "
<< message->rdaInfo() << " and the domain's "
<< "'maxDeliveryAttempts' setting [" << maxDeliveryAttempts
<< "], updating message's RdaInfo";
if (maxDeliveryAttempts > 0) {
message->rdaInfo().setCounter(maxDeliveryAttempts);
}
else {
message->rdaInfo().setUnlimited();
}
}

counter = message->rdaInfo().counter();

if (d_throttledRejectedMessages.requestPermission()) {
Expand Down
121 changes: 116 additions & 5 deletions src/integration-tests/test_reconfigure_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Testing runtime reconfiguration of domains.
"""
import time
from typing import Optional

import blazingmq.dev.it.testconstants as tc
from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import
Expand All @@ -10,6 +11,7 @@
multi_node,
tweak,
)
from blazingmq.dev.it.process.admin import AdminClient
from blazingmq.dev.it.process.client import Client

pytestmark = order(6)
Expand All @@ -36,7 +38,8 @@ def setup_cluster(self, cluster: Cluster):
# Returns 'True' if all of them succeed, and a false-y value otherwise.
def post_n_msgs(self, uri, n):
results = (
self.writer.post(uri, payload=["msg"], wait_ack=True) for _ in range(0, n)
self.writer.post(uri, payload=[f"msg{i}"], wait_ack=True)
for i in range(0, n)
)
return all(res == Client.e_SUCCESS for res in results)

Expand Down Expand Up @@ -296,7 +299,7 @@ def test_reconfigure_message_ttl(self, multi_node: Cluster):

@tweak.domain.max_delivery_attempts(0)
def test_reconfigure_max_delivery_attempts(self, multi_node: Cluster):
URI = f"bmq://{tc.DOMAIN_PRIORITY}/my-queue"
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda"
proxy = next(multi_node.proxy_cycle())

# Open the queue through the writer.
Expand All @@ -306,12 +309,12 @@ def do_test(expect_success):
# Write one message to 'URI'.
self.post_n_msgs(URI, 1)

# Open, read, and kill three consumers in sequence.
# Open, read, and kill five consumers in sequence.
for idx in range(0, 5):
client = proxy.create_client(f"reader-unstable-{idx}")
client.open(URI, flags=["read"], succeed=True)
client.check_exit_code = False
client.wait_push_event()
client.wait_push_event(timeout=5)
client.kill()
client.wait()

Expand All @@ -322,7 +325,7 @@ def do_test(expect_success):
if expect_success:
client.confirm(URI, "+1", succeed=True)
else:
assert not client.wait_push_event()
assert not client.wait_push_event(timeout=5)
client.stop_session(block=True)

# Expect that message will not expire after failed deliveries.
Expand All @@ -336,3 +339,111 @@ def do_test(expect_success):

# Expect that message will expire after failed deliveries.
do_test(False)

@tweak.domain.max_delivery_attempts(0)
def test_reconfigure_max_delivery_attempts_on_existing_messages(
self, multi_node: Cluster
) -> None:
cluster: Cluster = multi_node

# Stage 1: data preparation
# On this stage, we open a producer to a queue and post 5 messages
# with serial payloads: ["msg0", "msg1", "msg2", "msg3", "msg4"].
# We consider the very first message "msg0" poisonous.
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda-on-existing-msgs"
proxy = next(multi_node.proxy_cycle())

self.writer.open(URI, flags=["write,ack"], succeed=True)

# Post a sequence of messages with serial payloads:
assert self.post_n_msgs(URI, 5)

poisoned_message: str = "msg0"

def try_consume(consumer: Client) -> Optional[int]:
num_confirmed = 0
while consumer.wait_push_event(timeout=1, quiet=True):
msgs = consumer.list(URI, block=True)

assert len(msgs) == 1

if msgs[0].payload == poisoned_message:
# In this test, do not expect any messages confirmed before the poisoned one
assert num_confirmed == 0
return None

consumer.confirm(URI, "+1", succeed=True)
num_confirmed += 1
return num_confirmed

def run_consumers(max_attempts: int) -> int:
for idx in range(0, max_attempts):
consumer: Client = proxy.create_client(f"reader-unstable-{idx}")
consumer.open(
URI, flags=["read"], succeed=True, max_unconfirmed_messages=1
)

num_confirmed = try_consume(consumer)

# Each consumer either crashes on the very first poisoned message
# Or it confirms all the remaining non-poisoned messages and exits
if num_confirmed is None:
consumer.check_exit_code = False
consumer.kill()
consumer.wait()
continue
return num_confirmed

# We return earlier if any messages confirmed
return 0

# Stage 2: try to consume messages without poison pill detection enabled.
# Expect all the attempts to process messages failed, since all the
# consumers will crash on the poisoned message "msg0".
#
# The timeline:
# Before: queue is ["msg0", "msg1", "msg2", "msg3", "msg4"]
# consumer1: ["msg0"] -> crash
# ... ... ...
# consumer10: ["msg0"] -> crash
# After: queue is ["msg0", "msg1", "msg2", "msg3", "msg4"]
num_confirmed = run_consumers(max_attempts=10)
assert 0 == num_confirmed

# Stage 3: reconfigure maxDeliveryAttempts to enable poison pill detection.
# Use an admin session to validate that the setting change reached the broker.
host, port = cluster.admin_endpoint

admin = AdminClient()
admin.connect(host, port)

res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS")
assert '"maxDeliveryAttempts" : 0' in res

cluster.config.domains[
tc.DOMAIN_PRIORITY
].definition.parameters.max_delivery_attempts = 5
cluster.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)

res = admin.send_admin(f"DOMAINS DOMAIN {tc.DOMAIN_PRIORITY} INFOS")
assert '"maxDeliveryAttempts" : 5' in res

admin.stop()

# Stage 4: try to consume messages with poison pill detection enabled.
# Expect first 5 consumers to controllably crash on the very first message "msg0",
# and the 6th consumer will not receive message "msg0" anymore, since it was removed
# from the queue as poisonous. As a result, 6th consumer will receive the rest of the
# messages one by one and confirm them.
#
# The timeline:
# Before: queue is ["msg0", "msg1", "msg2", "msg3", "msg4"]
# consumer1: ["msg0"] -> crash
# consumer2: ["msg0"] -> crash
# consumer3: ["msg0"] -> crash
# consumer4: ["msg0"] -> crash
# consumer5: ["msg0"] -> crash -> "msg0" removed as poisonous after 5 attempts
# consumer6: ["msg1", "msg2", "msg3", "msg4"] -> confirm
# After: queue is []
num_confirmed = run_consumers(max_attempts=6)
assert 4 == num_confirmed
15 changes: 13 additions & 2 deletions src/python/blazingmq/dev/it/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import shutil
import signal
from typing import Any, Dict, List, Optional, Union
from typing import Dict, List, Optional, Tuple, Union
from pathlib import Path

import blazingmq.dev.it.process.proc
Expand Down Expand Up @@ -79,7 +79,7 @@ def __init__(
logging.getLogger("blazingmq.test"), extra=log_extra
)

self.last_known_leader = None
self.last_known_leader: Optional[Broker] = None
self._processes: Dict[str, Union[Broker, Client]] = {}
self._nodes: List[Broker] = []
self._virtual_nodes: List[Broker] = []
Expand All @@ -101,6 +101,17 @@ def name(self) -> str:

return self.config.name

@property
def admin_endpoint(self) -> Tuple[Optional[str], Optional[int]]:
"""
Return a tuple containing (host, port) of an admin endpoint of this cluster, if the
admin endpoint is not decided, return (None, None) tuple
"""
if not self.last_known_leader:
return None, None

return self.last_known_leader.config.host, int(self.last_known_leader.config.port)

def start(self, wait_leader=True, wait_ready=False):
"""
Start all the nodes and proxies in the cluster.
Expand Down

0 comments on commit 285d209

Please sign in to comment.