Skip to content

Commit

Permalink
ARTEMIS-5232 Running all possible permutations with a single server e…
Browse files Browse the repository at this point in the history
…xecution on TransferTest
  • Loading branch information
clebertsuconic committed Jan 13, 2025
1 parent e448f69 commit 1db50e3
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ public Object execute(ActionContext context) throws Exception {

context.out.println("Connection brokerURL = " + sourceURL);

return doTransfer(context);
}

private int doTransfer(ActionContext context) throws Exception {
ConnectionFactory sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID);
Connection sourceConnection = sourceConnectionFactory.createConnection();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,66 +30,30 @@
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;

import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.messages.Transfer;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(ParameterizedTestExtension.class)
public class TransferTest extends SmokeTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String SERVER_NAME_0 = "transfer1";
public static final String SERVER_NAME_1 = "transfer2";
private static final int NUMBER_OF_MESSAGES = 200;
private static final int NUMBER_OF_MESSAGES = 10;
private static final int PARTIAL_MESSAGES = 10;
String sourceTransferProtocol = "amqp";
String targetTransferProtocol = "amqp";
String senderProtocol = "amqp";
String consumerProtocol = "amqp";

/*
<execution>
<phase>test-compile</phase>
<id>create-transfer-1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<noWeb>true</noWeb>
<instance>${basedir}/target/transfer1</instance>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-transfer-2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<noWeb>true</noWeb>
<portOffset>100</portOffset>
<instance>${basedir}/target/transfer2</instance>
</configuration>
</execution>
*/


@BeforeAll
Expand All @@ -104,30 +68,26 @@ public static void createServers() throws Exception {
{
HelperCreate cliCreateServer = helperCreate();
cliCreateServer.setUser("admin").setPassword("admin").setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(server0Location);
cliCreateServer.addArgs("--disable-persistence");
cliCreateServer.createServer();
}

{
HelperCreate cliCreateServer = helperCreate();
cliCreateServer.setUser("admin").setPassword("admin").setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(server1Location).setPortOffset(100);
cliCreateServer.addArgs("--disable-persistence");
cliCreateServer.createServer();
}
}


public TransferTest(String sender, String consumer, String source, String target) {
this.senderProtocol = sender;
this.consumerProtocol = consumer;
this.sourceTransferProtocol = source;
this.targetTransferProtocol = target;
public TransferTest() {
}

@Parameters(name = "sender={0}, consumer={1}, sourceOnTransfer={2}, targetOnTransfer={3}")
public static Collection<Object[]> getParams() {
public static Collection<String[]> mixProtocolOptions() {

String[] protocols = new String[]{"core", "amqp"};

ArrayList<Object[]> parameters = new ArrayList<>();
ArrayList<String[]> parameters = new ArrayList<>();

for (int i = 0; i < protocols.length; i++) {
for (int j = 0; j < protocols.length; j++) {
Expand All @@ -142,11 +102,11 @@ public static Collection<Object[]> getParams() {
return parameters;
}

private ConnectionFactory createConsumerCF() {
private ConnectionFactory createConsumerCF(String consumerProtocol) {
return CFUtil.createConnectionFactory(consumerProtocol, "tcp://localhost:61716");
}

private ConnectionFactory createSenderCF() {
private ConnectionFactory createSenderCF(String senderProtocol) {
return CFUtil.createConnectionFactory(senderProtocol, "tcp://localhost:61616");
}

Expand All @@ -159,30 +119,66 @@ public void before() throws Exception {
startServer(SERVER_NAME_1, 100, 30000);
}

@TestTemplate
public void testTransferSimpleQueueCopy() throws Exception {
internalTransferSimpleQueue(false);
@Test
public void testTryAllPermutations() throws Exception {

Collection<String[]> options = mixProtocolOptions();
int iteration = 0;
for (String[] option : options) {
logger.info("{} {} {} {} - iteration = {}", option[0], option[1], option[2], option[3], iteration);
internalTransferSimpleQueue("queue_a" + iteration, false, option[0], option[1], option[2], option[3]);
internalTransferSimpleQueue("queue_b" + iteration, true, option[0], option[1], option[2], option[3]);
testDurableSharedSubscrition("topic_c" + iteration, "queue_c" + iteration, option[0], option[1], option[2], option[3]);
testSharedSubscription("topic_d" + iteration, "queue_d" + iteration, option[0], option[1], option[2], option[3]);
testDurableConsumer("topic_e" + iteration, "queue_e" + iteration, option[0], option[1], option[2], option[3]);
iteration++;
}
}

@TestTemplate
public void testTransferSimpleQueue() throws Exception {
internalTransferSimpleQueue(true);
}
private static void callTransferQueue(String targetURL, String sourceQueue, String sourceTopic, String sharedDurableSubscription, String sharedSubscription, String durableConsumer, String clientID, String targetQueue, String sourceProtocol, String transferProtocol, boolean copy) throws Exception {

public String getQueueName() {
return getName();
}
Run.setEmbedded(true); // Telling the CLI to not use System.exit

Transfer transfer = new Transfer();
File artemisInstance = getFileServerLocation(SERVER_NAME_0);
File etc = new File(artemisInstance, "etc");
transfer.setHomeValues(HelperCreate.getHome(ARTEMIS_HOME_PROPERTY), getFileServerLocation(SERVER_NAME_0), etc);
transfer.setTargetURL(targetURL);
if (sourceQueue != null) {
transfer.setSourceQueue(sourceQueue);
}
if (sourceTopic != null) {
transfer.setSourceTopic(sourceTopic);
}
if (sharedDurableSubscription != null) {
transfer.setSharedDurableSubscription(sharedDurableSubscription);
}
if (sharedSubscription != null) {
transfer.setSharedSubscription(sharedSubscription);
}
if (durableConsumer != null) {
transfer.setDurableConsumer(durableConsumer);
}
if (clientID != null) {
transfer.setSourceClientID(clientID);
}
transfer.setReceiveTimeout(100);
transfer.setTargetQueue(targetQueue);
transfer.setSourceProtocol(sourceProtocol);
transfer.setTargetProtocol(transferProtocol);
transfer.setCopy(copy);
transfer.execute(new ActionContext());

public String getTopicName() {
return "Topic" + getName();
}

private void internalTransferSimpleQueue(boolean copy) throws Exception {
ConnectionFactory factory = createSenderCF();
private void internalTransferSimpleQueue(String queueName, boolean copy,
String senderProtocol, String consumerProtocol,
String sourceTransferProtocol, String targetTransferProtocol) throws Exception {
ConnectionFactory factory = createSenderCF(senderProtocol);
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

Queue queue = session.createQueue(getQueueName());
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

Expand All @@ -191,29 +187,14 @@ private void internalTransferSimpleQueue(boolean copy) throws Exception {
}

session.commit();
callTransferQueue("tcp://localhost:61716", queueName, null, null, null, null, null, queueName, sourceTransferProtocol, targetTransferProtocol, copy);

String[] argsArray = new String[]{"transfer", "--target-url", "tcp://localhost:61716", "--source-queue", getQueueName(), "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "0"};

if (copy) {
ArrayList<String> copyArgs = new ArrayList<>();
for (String a : argsArray) {
copyArgs.add(a);
}
if (copy) {
copyArgs.add("--copy");
}
argsArray = copyArgs.toArray(new String[copyArgs.size()]);
}

Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", argsArray);
transferProcess.waitFor();

ConnectionFactory factoryTarget = createConsumerCF();
ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol);
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);

Queue queueTarget = sessionTarget.createQueue(getQueueName());
Queue queueTarget = sessionTarget.createQueue(queueName);
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);

for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Expand Down Expand Up @@ -243,13 +224,12 @@ private void internalTransferSimpleQueue(boolean copy) throws Exception {
connectionTarget.close();
}

@TestTemplate
public void testDurableSharedSubscrition() throws Exception {
ConnectionFactory factory = createSenderCF();
public void testDurableSharedSubscrition(String topicName, String queueName, String senderProtocol, String consumerProtocol, String sourceTransferProtocol, String targetTransferProtocol) throws Exception {
ConnectionFactory factory = createSenderCF(senderProtocol);
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

Topic topic = session.createTopic(getTopicName());
Topic topic = session.createTopic(topicName);
MessageConsumer subscription = session.createSharedDurableConsumer(topic, "testSubs");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Expand All @@ -261,15 +241,14 @@ public void testDurableSharedSubscrition() throws Exception {
session.commit();
subscription.close();

Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--shared-durable-subscription", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "1000", "--verbose");
transferProcess.waitFor();
callTransferQueue("tcp://localhost:61716", null, topicName, "testSubs", null, null, null, queueName, sourceTransferProtocol, targetTransferProtocol, false);

ConnectionFactory factoryTarget = createConsumerCF();
ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol);
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);

Queue queueTarget = sessionTarget.createQueue(getQueueName());
Queue queueTarget = sessionTarget.createQueue(queueName);
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);

for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Expand All @@ -281,17 +260,17 @@ public void testDurableSharedSubscrition() throws Exception {

assertNull(consumer.receiveNoWait());

session.unsubscribe("testSubs");
connection.close();
connectionTarget.close();
}

@TestTemplate
public void testSharedSubscrition() throws Exception {
ConnectionFactory factory = createSenderCF();
public void testSharedSubscription(String topicName, String queueName, String senderProtocol, String consumerProtocol, String sourceTransferProtocol, String targetTransferProtocol) throws Exception {
ConnectionFactory factory = createSenderCF(senderProtocol);
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

Topic topic = session.createTopic(getTopicName());
Topic topic = session.createTopic(topicName);
MessageConsumer subscription = session.createSharedConsumer(topic, "testSubs");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Expand All @@ -305,21 +284,20 @@ public void testSharedSubscrition() throws Exception {

session.commit();

Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--shared-subscription", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "0", "--verbose");
transferProcess.waitFor();
callTransferQueue("tcp://localhost:61716", null, topicName, null, "testSubs", null, null, queueName, sourceTransferProtocol, targetTransferProtocol, false);

// this test is a bit tricky as the subscription would be removed when the consumer is gone...
// I'm adding a test for completion only
// and the subscription has to be closed only after the transfer,
// which will not receive all the messages as some messages will be in delivering mode
subscription.close();

ConnectionFactory factoryTarget = createConsumerCF();
ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol);
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);

Queue queueTarget = sessionTarget.createQueue(getQueueName());
Queue queueTarget = sessionTarget.createQueue(queueName);
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);

// we are keeping a non durable subscription so the temporary queue still up
Expand All @@ -335,14 +313,13 @@ public void testSharedSubscrition() throws Exception {
connectionTarget.close();
}

@TestTemplate
public void testDurableConsumer() throws Exception {
ConnectionFactory factory = createSenderCF();
public void testDurableConsumer(String topicName, String queueName, String senderProtocol, String consumerProtocol, String sourceTransferProtocol, String targetTransferProtocol) throws Exception {
ConnectionFactory factory = createSenderCF(senderProtocol);
Connection connection = factory.createConnection();
connection.setClientID("test");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

Topic topic = session.createTopic(getTopicName());
Topic topic = session.createTopic(topicName);
MessageConsumer subscription = session.createDurableConsumer(topic, "testSubs");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Expand All @@ -355,15 +332,14 @@ public void testDurableConsumer() throws Exception {
connection.close();
subscription.close();

Process transferProcess = ServerUtil.execute(getServerLocation(SERVER_NAME_0), "transfer", "transfer", "--target-url", "tcp://localhost:61716", "--source-topic", getTopicName(), "--source-client-id", "test", "--durable-consumer", "testSubs", "--target-queue", getQueueName(), "--source-protocol", sourceTransferProtocol, "--target-protocol", targetTransferProtocol, "--receive-timeout", "1000", "--verbose", "--silent");
transferProcess.waitFor();
callTransferQueue("tcp://localhost:61716", null, topicName, null, null, "testSubs", "test", queueName, sourceTransferProtocol, targetTransferProtocol, false);

ConnectionFactory factoryTarget = createConsumerCF();
ConnectionFactory factoryTarget = createConsumerCF(consumerProtocol);
Connection connectionTarget = factoryTarget.createConnection();
connectionTarget.start();
Session sessionTarget = connectionTarget.createSession(true, Session.SESSION_TRANSACTED);

Queue queueTarget = sessionTarget.createQueue(getQueueName());
Queue queueTarget = sessionTarget.createQueue(queueName);
MessageConsumer consumer = sessionTarget.createConsumer(queueTarget);

// we are keeping a non durable subscription so the temporary queue still up
Expand Down

0 comments on commit 1db50e3

Please sign in to comment.