Skip to content

Commit

Permalink
ARTEMIS-5037: option to limit mirror propagation
Browse files Browse the repository at this point in the history
Add a new option in the Mirror settings to prevent a broker from
propagating messages.

When working with a topology where 4 nodes are forming a square and
where each node in that square mirrors its two neighbors: a message
leaving a corner can reach the opposite corner of the square by two
different routes. This is causing the message ordering to get broken.

example:
1 <-> 2
^     ^
|     |
v     v
4 <-> 3

A message from 1 will reach 3 by 2 and 4. Message duplication checks
will prevent the message from being duplicated but won't help regarding
the order of the messages. This is because a either the route by 2 or 4
can be faster than the other, so whomever wins the race sets the message
first.

Fixing the example:
Using the new option to not forward messages coming from a link, we
break the possibilities to have two routes to reach the opposite corner.

The above example is updated as followed:
* 2 never forwards messages coming from 1
* 1 never forwards messages coming from 2
* 3 never forwards messages coming from 4
* 4 never forwards messages coming from 3

Now, when a messages leaves 1:
* it reaches 2 and stops there
* it reaches 4
* it reaches 3 through 4 and stops there

Now, when a messages leaves 2:
* it reaches 1 and stops there
* it reaches 3
* it reaches 4 through 3 and stops there

Now, when a messages leaves 3:
* it reaches 4 and stops there
* it reaches 2
* it reaches 1 through 2 and stops there

Now, when a messages leaves 4:
* it reaches 3 and stops there
* it reaches 1
* it reaches 2 through 1 and stops there

The new test AMQPSquareMirroringTest.java is testing this exact setup.
  • Loading branch information
lavocatt committed Sep 11, 2024
1 parent daba842 commit 68f3cfe
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.connect;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
Expand Down Expand Up @@ -430,24 +432,27 @@ private void doConnect() {
final Queue queue = server.locateQueue(getMirrorSNF(replica));

final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
final Symbol[] desiredCapabilities;

ArrayList<Symbol> capabilities = new ArrayList<Symbol>();
capabilities.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
capabilities.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}
if (!replica.getCanForwardMessages()) {
capabilities.add(AMQPMirrorControllerSource.NO_FORWARD);
}

final Symbol[] desiredCapabilities = (Symbol[]) capabilities.toArray(new Symbol[]{});

final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};

connectSender(queue,
queue.getName().toString(),
mirrorControllerSource::setLink,
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
server.getNodeID().toString(),
desiredCapabilities,
null,
desiredCapabilities,
replica.getCanForwardMessages() ? null : new Symbol[] {AMQPMirrorControllerSource.NO_FORWARD},
requiredOfferedCapabilities);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.FEDERATION) {
// Starting the Federation triggers rebuild of federation links
Expand Down Expand Up @@ -1204,4 +1209,4 @@ private void doCloseConnector() {
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

Expand Down Expand Up @@ -301,6 +303,10 @@ private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean isBlockedByNoForward(Message message) {
return Boolean.valueOf(true).equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
}

private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
Expand Down Expand Up @@ -344,6 +350,11 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
return;
}

if (isBlockedByNoForward(message)) {
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
return;
}

logger.trace("sendMessage::{} send message {}", server, message);

try {
Expand Down Expand Up @@ -779,4 +790,4 @@ public boolean isAlreadyAcked(Queue queue) {



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
Expand All @@ -77,8 +78,10 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
Expand All @@ -92,6 +95,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement

private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();

private boolean canForwardMessages = true;

public static void setControllerInUse(MirrorController controller) {
CONTROLLER_THREAD_LOCAL.set(controller);
}
Expand Down Expand Up @@ -248,6 +253,13 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
if (receiver.getRemoteDesiredCapabilities() != null) {
for (Symbol capability : receiver.getRemoteDesiredCapabilities()) {
if (capability == NO_FORWARD) {
this.canForwardMessages = false;
}
}
}
}

@Override
Expand All @@ -265,6 +277,7 @@ protected void actualDelivery(Message message, Delivery delivery, DeliveryAnnota
OperationContext oldContext = recoverContext();
incrementSettle();


logger.trace("{}::actualDelivery call for {}", server, message);
setControllerInUse(this);

Expand Down Expand Up @@ -487,6 +500,7 @@ private void performAck(String nodeID,
* The caller of this method should give up any reference to messageCompletionAck when this method returns true.
* */
private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotations, ACKMessageOperation messageCompletionAck) throws Exception {

if (message.getMessageID() <= 0) {
message.setMessageID(server.getStorageManager().generateID());
}
Expand All @@ -512,6 +526,7 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

routingContext.setDuplicateDetection(false); // we do our own duplicate detection here


DuplicateIDCache duplicateIDCache;
if (lruDuplicateIDKey != null && lruDuplicateIDKey.equals(internalMirrorID)) {
duplicateIDCache = lruduplicateIDCache;
Expand All @@ -534,6 +549,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
if (!this.canForwardMessages) {
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
}

if (internalAddress != null) {
message.setAddress(internalAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme

boolean queueCreation = true;

boolean canForwardMessages = true;

boolean queueRemoval = true;

boolean messageAcknowledgements = true;
Expand Down Expand Up @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
return this;
}

public boolean getCanForwardMessages() {
return canForwardMessages;
}

public AMQPMirrorBrokerConnectionElement setCanForwardMessages(boolean canForwardMessages) {
this.canForwardMessages = canForwardMessages;
return this;
}

public boolean isQueueRemoval() {
return queueRemoval;
}
Expand Down Expand Up @@ -188,4 +199,4 @@ public boolean equals(Object obj) {
queueRemoval == other.queueRemoval &&
sync == other.sync;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,4 @@ public void testChained() throws Exception {
}

}
}
}
Loading

0 comments on commit 68f3cfe

Please sign in to comment.