Skip to content

Commit

Permalink
Merge pull request #102 from dalelane/fixed_issue_73_jms_open_conn-tests
Browse files Browse the repository at this point in the history
fix: close JMS reader when connect task is stopped
  • Loading branch information
gmcrobert authored Jan 5, 2023
2 parents 8505dfe + 5b03c3c commit f6ca557
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -33,6 +34,8 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.WaitingConsumer;

import com.ibm.eventstreams.connect.mqsource.utils.MQQueueManagerAttrs;
import com.ibm.eventstreams.connect.mqsource.utils.SourceTaskStopper;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueue;
Expand All @@ -45,21 +48,19 @@ public class MQSourceTaskAuthIT {
private static final String QUEUE_NAME = "DEV.QUEUE.2";
private static final String CHANNEL_NAME = "DEV.APP.SVRCONN";
private static final String APP_PASSWORD = "MySuperSecretPassword";
private static final String ADMIN_PASSWORD = "MyAdminPassword";


@ClassRule
public static GenericContainer<?> MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest")
.withEnv("LICENSE", "accept")
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
.withEnv("MQ_APP_PASSWORD", APP_PASSWORD)
.withExposedPorts(1414);
.withEnv("MQ_ADMIN_PASSWORD", ADMIN_PASSWORD)
.withExposedPorts(1414, 9443);


@Test
public void testAuthenticatedQueueManager() throws Exception {
waitForQueueManagerStartup();

private Map<String, String> getConnectorProps() {
Map<String, String> connectorProps = new HashMap<>();
connectorProps.put("mq.queue.manager", QMGR_NAME);
connectorProps.put("mq.connection.mode", "client");
Expand All @@ -71,9 +72,15 @@ public void testAuthenticatedQueueManager() throws Exception {
connectorProps.put("mq.password", APP_PASSWORD);
connectorProps.put("mq.message.body.jms", "false");
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
return connectorProps;
}

@Test
public void testAuthenticatedQueueManager() throws Exception {
waitForQueueManagerStartup();

MQSourceTask newConnectTask = new MQSourceTask();
newConnectTask.start(connectorProps);
newConnectTask.start(getConnectorProps());

MQMessage message1 = new MQMessage();
message1.writeString("hello");
Expand All @@ -86,15 +93,53 @@ public void testAuthenticatedQueueManager() throws Exception {
for (SourceRecord kafkaMessage : kafkaMessages) {
assertNull(kafkaMessage.key());
assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema());

newConnectTask.commitRecord(kafkaMessage);
}

assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value());
assertArrayEquals("world".getBytes(), (byte[]) kafkaMessages.get(1).value());

newConnectTask.stop();
SourceTaskStopper stopper = new SourceTaskStopper(newConnectTask);
stopper.run();
}



@Test
public void verifyJmsConnClosed() throws Exception {

int restApiPortNumber = MQ_CONTAINER.getMappedPort(9443);

// count number of connections to the qmgr at the start
int numQmgrConnectionsBefore = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD);

// start the source connector so that it connects to the qmgr
MQSourceTask connectTask = new MQSourceTask();
connectTask.start(getConnectorProps());

// count number of connections to the qmgr now - it should have increased
int numQmgrConnectionsDuring = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD);

// stop the source connector so it disconnects from the qmgr
connectTask.stop();

// count number of connections to the qmgr now - it should have decreased
int numQmgrConnectionsAfter = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD);

// verify number of connections changed as expected
assertTrue("connections should have increased after starting the source task",
numQmgrConnectionsDuring > numQmgrConnectionsBefore);
assertTrue("connections should have decreased after calling stop()",
numQmgrConnectionsAfter < numQmgrConnectionsDuring);

// cleanup
SourceTaskStopper stopper = new SourceTaskStopper(connectTask);
stopper.run();
}



private void waitForQueueManagerStartup() throws TimeoutException {
WaitingConsumer logConsumer = new WaitingConsumer();
MQ_CONTAINER.followOutput(logConsumer);
Expand Down
Loading

0 comments on commit f6ca557

Please sign in to comment.