diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java index d785cfa..043849b 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.HashMap; @@ -33,6 +34,8 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.WaitingConsumer; +import com.ibm.eventstreams.connect.mqsource.utils.MQQueueManagerAttrs; +import com.ibm.eventstreams.connect.mqsource.utils.SourceTaskStopper; import com.ibm.mq.MQException; import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueue; @@ -45,21 +48,19 @@ public class MQSourceTaskAuthIT { private static final String QUEUE_NAME = "DEV.QUEUE.2"; private static final String CHANNEL_NAME = "DEV.APP.SVRCONN"; private static final String APP_PASSWORD = "MySuperSecretPassword"; + private static final String ADMIN_PASSWORD = "MyAdminPassword"; @ClassRule public static GenericContainer MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") .withEnv("LICENSE", "accept") .withEnv("MQ_QMGR_NAME", QMGR_NAME) - .withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false") .withEnv("MQ_APP_PASSWORD", APP_PASSWORD) - .withExposedPorts(1414); + .withEnv("MQ_ADMIN_PASSWORD", ADMIN_PASSWORD) + .withExposedPorts(1414, 9443); - @Test - public void testAuthenticatedQueueManager() throws Exception { - waitForQueueManagerStartup(); - + private Map getConnectorProps() { Map connectorProps = new HashMap<>(); connectorProps.put("mq.queue.manager", QMGR_NAME); connectorProps.put("mq.connection.mode", "client"); @@ -71,9 +72,15 @@ public void testAuthenticatedQueueManager() throws Exception { connectorProps.put("mq.password", APP_PASSWORD); connectorProps.put("mq.message.body.jms", "false"); connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + return connectorProps; + } + + @Test + public void testAuthenticatedQueueManager() throws Exception { + waitForQueueManagerStartup(); MQSourceTask newConnectTask = new MQSourceTask(); - newConnectTask.start(connectorProps); + newConnectTask.start(getConnectorProps()); MQMessage message1 = new MQMessage(); message1.writeString("hello"); @@ -86,15 +93,53 @@ public void testAuthenticatedQueueManager() throws Exception { for (SourceRecord kafkaMessage : kafkaMessages) { assertNull(kafkaMessage.key()); assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema()); + + newConnectTask.commitRecord(kafkaMessage); } assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value()); assertArrayEquals("world".getBytes(), (byte[]) kafkaMessages.get(1).value()); - newConnectTask.stop(); + SourceTaskStopper stopper = new SourceTaskStopper(newConnectTask); + stopper.run(); + } + + + + @Test + public void verifyJmsConnClosed() throws Exception { + + int restApiPortNumber = MQ_CONTAINER.getMappedPort(9443); + + // count number of connections to the qmgr at the start + int numQmgrConnectionsBefore = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD); + + // start the source connector so that it connects to the qmgr + MQSourceTask connectTask = new MQSourceTask(); + connectTask.start(getConnectorProps()); + + // count number of connections to the qmgr now - it should have increased + int numQmgrConnectionsDuring = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD); + + // stop the source connector so it disconnects from the qmgr + connectTask.stop(); + + // count number of connections to the qmgr now - it should have decreased + int numQmgrConnectionsAfter = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD); + + // verify number of connections changed as expected + assertTrue("connections should have increased after starting the source task", + numQmgrConnectionsDuring > numQmgrConnectionsBefore); + assertTrue("connections should have decreased after calling stop()", + numQmgrConnectionsAfter < numQmgrConnectionsDuring); + + // cleanup + SourceTaskStopper stopper = new SourceTaskStopper(connectTask); + stopper.run(); } + private void waitForQueueManagerStartup() throws TimeoutException { WaitingConsumer logConsumer = new WaitingConsumer(); MQ_CONTAINER.followOutput(logConsumer); diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 1b5f772..45b4dbe 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -31,10 +31,23 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; import org.junit.Test; +import com.ibm.eventstreams.connect.mqsource.utils.SourceTaskStopper; + + public class MQSourceTaskIT extends AbstractJMSContextIT { + private MQSourceTask connectTask = null; + + @After + public void cleanup() throws InterruptedException { + SourceTaskStopper stopper = new SourceTaskStopper(connectTask); + stopper.run(); + } + + private static final String MQ_QUEUE = "DEV.QUEUE.1"; private Map createDefaultConnectorProperties() { @@ -51,42 +64,42 @@ private Map createDefaultConnectorProperties() { @Test public void verifyJmsTextMessages() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); TextMessage message1 = getJmsContext().createTextMessage("hello"); TextMessage message2 = getJmsContext().createTextMessage("world"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2)); - List kafkaMessages = newConnectTask.poll(); + List kafkaMessages = connectTask.poll(); assertEquals(2, kafkaMessages.size()); for (SourceRecord kafkaMessage : kafkaMessages) { assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); + + connectTask.commitRecord(kafkaMessage); } assertEquals("hello", kafkaMessages.get(0).value()); assertEquals("world", kafkaMessages.get(1).value()); - - newConnectTask.stop(); } @Test public void verifyJmsJsonMessages() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); List messages = new ArrayList<>(); for (int i = 0; i < 5; i++) { @@ -97,7 +110,7 @@ public void verifyJmsJsonMessages() throws Exception { } putAllMessagesToQueue(MQ_QUEUE, messages); - List kafkaMessages = newConnectTask.poll(); + List kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); for (int i = 0; i < 5; i++) { SourceRecord kafkaMessage = kafkaMessages.get(i); @@ -106,23 +119,23 @@ public void verifyJmsJsonMessages() throws Exception { Map value = (Map) kafkaMessage.value(); assertEquals(Long.valueOf(i), value.get("i")); - } - newConnectTask.stop(); + connectTask.commitRecord(kafkaMessage); + } } @Test public void verifyJmsMessageHeaders() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); TextMessage message = getJmsContext().createTextMessage("helloworld"); message.setStringProperty("teststring", "myvalue"); @@ -131,7 +144,7 @@ public void verifyJmsMessageHeaders() throws Exception { putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = newConnectTask.poll(); + List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); SourceRecord kafkaMessage = kafkaMessages.get(0); assertNull(kafkaMessage.key()); @@ -143,21 +156,21 @@ public void verifyJmsMessageHeaders() throws Exception { assertEquals("11", kafkaMessage.headers().lastWithName("volume").value()); assertEquals("42.0", kafkaMessage.headers().lastWithName("decimalmeaning").value()); - newConnectTask.stop(); + connectTask.commitRecord(kafkaMessage); } @Test public void verifyMessageBatchIndividualCommits() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.batch.size", "10"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); List messages = new ArrayList<>(); for (int i = 1; i <= 35; i++) { @@ -169,49 +182,47 @@ public void verifyMessageBatchIndividualCommits() throws Exception { List kafkaMessages; - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - newConnectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage); } - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - newConnectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage); } - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); for (SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - newConnectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage); } - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); for (SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); - newConnectTask.commitRecord(kafkaMessage); + connectTask.commitRecord(kafkaMessage); } - - newConnectTask.stop(); } @Test public void verifyMessageBatchGroupCommits() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.batch.size", "10"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); List messages = new ArrayList<>(); for (int i = 1; i <= 35; i++) { @@ -221,46 +232,48 @@ public void verifyMessageBatchGroupCommits() throws Exception { List kafkaMessages; - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - newConnectTask.commit(); - newConnectTask.commit(); + for (SourceRecord m : kafkaMessages) { + connectTask.commitRecord(m); + } - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - newConnectTask.commit(); - newConnectTask.commit(); + for (SourceRecord m : kafkaMessages) { + connectTask.commitRecord(m); + } - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - newConnectTask.commit(); - newConnectTask.commit(); + for (SourceRecord m : kafkaMessages) { + connectTask.commitRecord(m); + } - kafkaMessages = newConnectTask.poll(); + kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); - newConnectTask.commit(); - newConnectTask.commit(); - - newConnectTask.stop(); + for (SourceRecord m : kafkaMessages) { + connectTask.commitRecord(m); + } } @Test public void verifyMessageIdAsKey() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSMessageID"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); TextMessage message = getJmsContext().createTextMessage("testmessage"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = newConnectTask.poll(); + List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); SourceRecord kafkaMessage = kafkaMessages.get(0); @@ -270,21 +283,21 @@ public void verifyMessageIdAsKey() throws Exception { assertEquals("testmessage", kafkaMessage.value()); - newConnectTask.stop(); + connectTask.commitRecord(kafkaMessage); } @Test public void verifyCorrelationIdAsKey() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationID"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); TextMessage message1 = getJmsContext().createTextMessage("first message"); message1.setJMSCorrelationID("verifycorrel"); @@ -292,40 +305,40 @@ public void verifyCorrelationIdAsKey() throws Exception { message2.setJMSCorrelationID("ID:5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2)); - List kafkaMessages = newConnectTask.poll(); + List kafkaMessages = connectTask.poll(); assertEquals(2, kafkaMessages.size()); SourceRecord kafkaMessage1 = kafkaMessages.get(0); assertEquals("verifycorrel", kafkaMessage1.key()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage1.keySchema()); assertEquals("first message", kafkaMessage1.value()); + connectTask.commitRecord(kafkaMessage1); SourceRecord kafkaMessage2 = kafkaMessages.get(1); assertEquals("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4", kafkaMessage2.key()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage2.keySchema()); assertEquals("second message", kafkaMessage2.value()); - - newConnectTask.stop(); + connectTask.commitRecord(kafkaMessage2); } @Test public void verifyCorrelationIdBytesAsKey() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationIDAsBytes"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); TextMessage message = getJmsContext().createTextMessage("testmessagewithcorrelbytes"); message.setJMSCorrelationID("verifycorrelbytes"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = newConnectTask.poll(); + List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); SourceRecord kafkaMessage = kafkaMessages.get(0); @@ -334,26 +347,26 @@ public void verifyCorrelationIdBytesAsKey() throws Exception { assertEquals("testmessagewithcorrelbytes", kafkaMessage.value()); - newConnectTask.stop(); + connectTask.commitRecord(kafkaMessage); } @Test public void verifyDestinationAsKey() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + connectTask = new MQSourceTask(); Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSDestination"); - newConnectTask.start(connectorConfigProps); + connectTask.start(connectorConfigProps); TextMessage message = getJmsContext().createTextMessage("testmessagewithdest"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = newConnectTask.poll(); + List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); SourceRecord kafkaMessage = kafkaMessages.get(0); @@ -362,6 +375,6 @@ public void verifyDestinationAsKey() throws Exception { assertEquals("testmessagewithdest", kafkaMessage.value()); - newConnectTask.stop(); + connectTask.commitRecord(kafkaMessage); } } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/JsonRestApi.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/JsonRestApi.java new file mode 100644 index 0000000..6335e9a --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/JsonRestApi.java @@ -0,0 +1,102 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.net.URL; +import java.nio.charset.Charset; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.X509Certificate; +import java.util.Base64; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.json.JSONException; +import org.json.JSONObject; + +public class JsonRestApi { + + public static JSONObject jsonPost(String url, String username, String password, String payload) throws IOException, KeyManagementException, NoSuchAlgorithmException, JSONException { + URL urlObj = new URL(url); + HttpsURLConnection urlConnection = (HttpsURLConnection) urlObj.openConnection(); + urlConnection.setHostnameVerifier(new IgnoreCertVerifier()); + urlConnection.setSSLSocketFactory(getTrustAllCertsFactory()); + urlConnection.setRequestProperty("Authorization", getAuthHeader(username, password)); + urlConnection.setRequestProperty("Content-Type", "application/json"); + urlConnection.setRequestProperty("ibm-mq-rest-csrf-token", "junit"); + urlConnection.setDoOutput(true); + + try(OutputStream os = urlConnection.getOutputStream()) { + byte[] input = payload.getBytes("utf-8"); + os.write(input, 0, input.length); + } + + try (InputStream input = urlConnection.getInputStream()){ + BufferedReader re = new BufferedReader(new InputStreamReader(input, Charset.forName("utf-8"))); + return new JSONObject(read(re)); + } + } + + + private static String read(Reader re) throws IOException { + StringBuilder str = new StringBuilder(); + int ch; + do { + ch = re.read(); + str.append((char) ch); + } while (ch != -1); + return str.toString(); + } + + + private static String getAuthHeader(String username, String password) { + String userpass = username + ":" + password; + String basicAuth = "Basic " + new String(Base64.getEncoder().encode(userpass.getBytes())); + return basicAuth; + } + + private static class IgnoreCertVerifier implements HostnameVerifier { + @Override + public boolean verify(String host, SSLSession session) { + return true; + } + } + + private static SSLSocketFactory getTrustAllCertsFactory() throws NoSuchAlgorithmException, KeyManagementException { + TrustManager[] trustAllCerts = new TrustManager[] { + new X509TrustManager() { + public java.security.cert.X509Certificate[] getAcceptedIssuers() { return null; } + public void checkClientTrusted(X509Certificate[] certs, String authType) { } + public void checkServerTrusted(X509Certificate[] certs, String authType) { } + } + }; + SSLContext sc = SSLContext.getInstance("SSL"); + sc.init(null, trustAllCerts, new java.security.SecureRandom()); + return sc.getSocketFactory(); + } +} diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/MQQueueManagerAttrs.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/MQQueueManagerAttrs.java new file mode 100644 index 0000000..fd56d9f --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/MQQueueManagerAttrs.java @@ -0,0 +1,52 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource.utils; + +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import org.json.JSONException; +import org.json.JSONObject; + +public class MQQueueManagerAttrs { + + private static final String REQ_GET_SVRCONNS = "{" + + " \"type\": \"runCommand\"," + + " \"parameters\": {" + + " \"command\": \"display conn(*) where (channel EQ 'DEV.APP.SVRCONN')\"" + + " }" + + "}"; + + + public static int getNumConnections(String qmgrname, int portnum, String password) throws KeyManagementException, NoSuchAlgorithmException, IOException, JSONException { + String url = "https://localhost:" + portnum + "/ibmmq/rest/v2/admin/action/qmgr/" + qmgrname + "/mqsc"; + JSONObject connectionInfo = JsonRestApi.jsonPost(url, "admin", password, REQ_GET_SVRCONNS); + + int completionCode = connectionInfo.getInt("overallCompletionCode"); + int reasonCode = connectionInfo.getInt("overallReasonCode"); + + if (completionCode == 2 && reasonCode == 3008) { + return 0; + } + else if (completionCode == 0 && reasonCode == 0) { + return connectionInfo.getJSONArray("commandResponse").length(); + } + else { + return -1; + } + } +} diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/SourceTaskStopper.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/SourceTaskStopper.java new file mode 100644 index 0000000..531599b --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/SourceTaskStopper.java @@ -0,0 +1,63 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.kafka.connect.source.SourceTask; + + +/** + * Stops an instance of the MQSourceTask in a way that will ensure + * it commits the work that it has completed so far. + * + * This is needed for tests so that subsequent tests don't disrupt + * each other. + */ +public class SourceTaskStopper { + + private static ExecutorService executor = Executors.newCachedThreadPool(); + + private SourceTask sourceTask; + + public SourceTaskStopper(SourceTask task) { + sourceTask = task; + } + + public void run() throws InterruptedException { + // start the poll in a background thread + executor.submit(new PollStarter()); + + // the pollstarter thread will block waiting for messages + // that don't ever come so wait briefly before stopping + // it from another thread + Thread.sleep(200); + sourceTask.stop(); + } + + + class PollStarter implements Runnable { + @Override + public void run() { + try { + sourceTask.poll(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 071737c..c8177ef 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -37,7 +37,6 @@ public class MQSourceTask extends SourceTask { private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called - private AtomicBoolean receivingMessages = new AtomicBoolean(); // Whether currently receiving messages private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested private JMSReader reader; @@ -114,8 +113,6 @@ public MQSourceTask() { log.debug("Starting poll cycle {}", currentPollCycle); try { - receivingMessages.set(true); - if (!stopNow.get()) { log.info("Polling for records"); SourceRecord src; @@ -133,7 +130,6 @@ public MQSourceTask() { } } finally { - receivingMessages.set(false); } synchronized(this) { @@ -219,16 +215,7 @@ public void commit() throws InterruptedException { stopNow.set(true); - boolean willClose = false; - synchronized(this) { - if (receivingMessages.get()) { - log.debug("Will close connection"); - willClose = true; - } - } - - if (willClose) { // Close the connection to MQ to clean up if (reader != null) { reader.close();