diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c9a123e --- /dev/null +++ b/.travis.yml @@ -0,0 +1,6 @@ +language: java + +services: + - docker + +script: mvn clean verify diff --git a/pom.xml b/pom.xml index c3a3925..189d50c 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ connect-api 2.6.0 provided - + org.apache.kafka connect-json @@ -84,6 +84,15 @@ 1.7.25 test + + + + + org.testcontainers + testcontainers + 1.17.3 + test + @@ -97,18 +106,44 @@ -Xlint:unchecked + + maven-surefire-plugin - 3.0.0-M1 + 3.0.0-M7 + ${surefire.jacoco.args} ${project.version} + + + + maven-failsafe-plugin + 3.0.0-M7 + + ${failsafe.jacoco.args} + + ${project.version} + + + + + integration-tests + + integration-test + verify + + + + + + maven-assembly-plugin - 3.1.1 + 3.4.1 package @@ -123,6 +158,110 @@ + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.3.0 + + + add-test-source + process-test-sources + + add-test-source + + + + src/integration/java + + + + + + + + + org.jacoco + jacoco-maven-plugin + 0.8.8 + + + before-unit-test-execution + + prepare-agent + + + ${project.build.directory}/jacoco-output/jacoco-unit-tests.exec + surefire.jacoco.args + + + + after-unit-test-execution + test + + report + + + ${project.build.directory}/jacoco-output/jacoco-unit-tests.exec + ${project.reporting.outputDirectory}/jacoco-unit-test-coverage-report + + + + before-integration-test-execution + pre-integration-test + + prepare-agent + + + ${project.build.directory}/jacoco-output/jacoco-integration-tests.exec + failsafe.jacoco.args + + + + after-integration-test-execution + post-integration-test + + report + + + ${project.build.directory}/jacoco-output/jacoco-integration-tests.exec + ${project.reporting.outputDirectory}/jacoco-integration-test-coverage-report + + + + merge-unit-and-integration + post-integration-test + + merge + + + + + ${project.build.directory}/jacoco-output/ + + *.exec + + + + ${project.build.directory}/jacoco-output/merged.exec + + + + create-merged-report + post-integration-test + + report + + + ${project.build.directory}/jacoco-output/merged.exec + ${project.reporting.outputDirectory}/jacoco-merged-test-coverage-report + + + + diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java new file mode 100644 index 0000000..36c04e3 --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java @@ -0,0 +1,146 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource; + +import java.util.List; +import java.util.concurrent.TimeoutException; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.junit.ClassRule; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.WaitingConsumer; + +import com.ibm.mq.jms.MQConnectionFactory; +import com.ibm.msg.client.jms.JmsConnectionFactory; +import com.ibm.msg.client.jms.JmsFactoryFactory; +import com.ibm.msg.client.wmq.WMQConstants; + +/** + * Helper class for integration tests that have a dependency on JMSContext. + * + * It starts a queue manager in a test container, and uses it to create + * a JMSContext instance, that can be used in tests. + */ +public class AbstractJMSContextIT { + + private static final String QMGR_NAME = "MYQMGR"; + private static final String CHANNEL_NAME = "DEV.APP.SVRCONN"; + + @ClassRule + public static GenericContainer MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") + .withEnv("LICENSE", "accept") + .withEnv("MQ_QMGR_NAME", QMGR_NAME) + .withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false") + .withExposedPorts(1414); + + private JMSContext jmsContext; + + + /** + * Returns a JMS context pointing at a developer queue manager running in a + * test container. + */ + public JMSContext getJmsContext() throws Exception { + if (jmsContext == null) { + waitForQueueManagerStartup(); + + MQConnectionFactory mqcf = new MQConnectionFactory(); + mqcf.setTransportType(WMQConstants.WMQ_CM_CLIENT); + mqcf.setChannel(CHANNEL_NAME); + mqcf.setQueueManager(QMGR_NAME); + mqcf.setConnectionNameList(getConnectionName()); + + jmsContext = mqcf.createContext(); + } + + return jmsContext; + } + + + /** + * Gets the host port that has been mapped to the default MQ 1414 port in the test container. + */ + public Integer getMQPort() { + return MQ_CONTAINER.getMappedPort(1414); + } + + public String getQmgrName() { + return QMGR_NAME; + } + public String getChannelName() { + return CHANNEL_NAME; + } + public String getConnectionName() { + return "localhost(" + getMQPort().toString() + ")"; + } + + + /** + * Waits until we see a log line in the queue manager test container that indicates + * the queue manager is ready. + */ + private void waitForQueueManagerStartup() throws TimeoutException { + WaitingConsumer logConsumer = new WaitingConsumer(); + MQ_CONTAINER.followOutput(logConsumer); + logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I")); + } + + + + + /** + * Puts all messages to the specified MQ queue. Used in tests to + * give the Connector something to get. + */ + public void putAllMessagesToQueue(String queueName, List messages) throws JMSException { + Connection connection = null; + Session session = null; + Destination destination = null; + MessageProducer producer = null; + + JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); + + JmsConnectionFactory cf = ff.createConnectionFactory(); + cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost"); + cf.setIntProperty(WMQConstants.WMQ_PORT, getMQPort()); + cf.setStringProperty(WMQConstants.WMQ_CHANNEL, getChannelName()); + cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); + cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, getQmgrName()); + cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, false); + + connection = cf.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + destination = session.createQueue(queueName); + producer = session.createProducer(destination); + + connection.start(); + + for (Message message : messages) { + message.setJMSDestination(destination); + producer.send(message); + } + + connection.close(); + } +} diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java new file mode 100644 index 0000000..d785cfa --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java @@ -0,0 +1,121 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.ClassRule; +import org.junit.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.WaitingConsumer; + +import com.ibm.mq.MQException; +import com.ibm.mq.MQMessage; +import com.ibm.mq.MQQueue; +import com.ibm.mq.MQQueueManager; +import com.ibm.mq.constants.MQConstants; + +public class MQSourceTaskAuthIT { + + private static final String QMGR_NAME = "MYAUTHQMGR"; + private static final String QUEUE_NAME = "DEV.QUEUE.2"; + private static final String CHANNEL_NAME = "DEV.APP.SVRCONN"; + private static final String APP_PASSWORD = "MySuperSecretPassword"; + + + @ClassRule + public static GenericContainer MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") + .withEnv("LICENSE", "accept") + .withEnv("MQ_QMGR_NAME", QMGR_NAME) + .withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false") + .withEnv("MQ_APP_PASSWORD", APP_PASSWORD) + .withExposedPorts(1414); + + + @Test + public void testAuthenticatedQueueManager() throws Exception { + waitForQueueManagerStartup(); + + Map connectorProps = new HashMap<>(); + connectorProps.put("mq.queue.manager", QMGR_NAME); + connectorProps.put("mq.connection.mode", "client"); + connectorProps.put("mq.connection.name.list", "localhost(" + MQ_CONTAINER.getMappedPort(1414).toString() + ")"); + connectorProps.put("mq.channel.name", CHANNEL_NAME); + connectorProps.put("mq.queue", QUEUE_NAME); + connectorProps.put("mq.user.authentication.mqcsp", "true"); + connectorProps.put("mq.user.name", "app"); + connectorProps.put("mq.password", APP_PASSWORD); + connectorProps.put("mq.message.body.jms", "false"); + connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + MQSourceTask newConnectTask = new MQSourceTask(); + newConnectTask.start(connectorProps); + + MQMessage message1 = new MQMessage(); + message1.writeString("hello"); + MQMessage message2 = new MQMessage(); + message2.writeString("world"); + putAllMessagesToQueue(Arrays.asList(message1, message2)); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(2, kafkaMessages.size()); + for (SourceRecord kafkaMessage : kafkaMessages) { + assertNull(kafkaMessage.key()); + assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema()); + } + + assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value()); + assertArrayEquals("world".getBytes(), (byte[]) kafkaMessages.get(1).value()); + + newConnectTask.stop(); + } + + + private void waitForQueueManagerStartup() throws TimeoutException { + WaitingConsumer logConsumer = new WaitingConsumer(); + MQ_CONTAINER.followOutput(logConsumer); + logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I")); + } + + + private void putAllMessagesToQueue(List messages) throws MQException { + Hashtable props = new Hashtable<>(); + props.put(MQConstants.HOST_NAME_PROPERTY, "localhost"); + props.put(MQConstants.PORT_PROPERTY, MQ_CONTAINER.getMappedPort(1414)); + props.put(MQConstants.CHANNEL_PROPERTY, CHANNEL_NAME); + props.put(MQConstants.USER_ID_PROPERTY, "app"); + props.put(MQConstants.PASSWORD_PROPERTY, APP_PASSWORD); + + MQQueueManager qmgr = new MQQueueManager(QMGR_NAME, props); + + MQQueue q = qmgr.accessQueue(QUEUE_NAME, MQConstants.MQOO_OUTPUT); + + for (MQMessage message : messages) { + q.put(message); + } + } +} diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java new file mode 100644 index 0000000..1b5f772 --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -0,0 +1,367 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +public class MQSourceTaskIT extends AbstractJMSContextIT { + + private static final String MQ_QUEUE = "DEV.QUEUE.1"; + + private Map createDefaultConnectorProperties() { + Map props = new HashMap<>(); + props.put("mq.queue.manager", getQmgrName()); + props.put("mq.connection.mode", "client"); + props.put("mq.connection.name.list", getConnectionName()); + props.put("mq.channel.name", getChannelName()); + props.put("mq.queue", MQ_QUEUE); + props.put("mq.user.authentication.mqcsp", "false"); + return props; + } + + + @Test + public void verifyJmsTextMessages() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + newConnectTask.start(connectorConfigProps); + + TextMessage message1 = getJmsContext().createTextMessage("hello"); + TextMessage message2 = getJmsContext().createTextMessage("world"); + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2)); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(2, kafkaMessages.size()); + for (SourceRecord kafkaMessage : kafkaMessages) { + assertNull(kafkaMessage.key()); + assertNull(kafkaMessage.valueSchema()); + } + + assertEquals("hello", kafkaMessages.get(0).value()); + assertEquals("world", kafkaMessages.get(1).value()); + + newConnectTask.stop(); + } + + + + @Test + public void verifyJmsJsonMessages() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + newConnectTask.start(connectorConfigProps); + + List messages = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + messages.add(getJmsContext().createTextMessage( + "{ " + + "\"i\" : " + i + + "}")); + } + putAllMessagesToQueue(MQ_QUEUE, messages); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(5, kafkaMessages.size()); + for (int i = 0; i < 5; i++) { + SourceRecord kafkaMessage = kafkaMessages.get(i); + assertNull(kafkaMessage.key()); + assertNull(kafkaMessage.valueSchema()); + + Map value = (Map) kafkaMessage.value(); + assertEquals(Long.valueOf(i), value.get("i")); + } + + newConnectTask.stop(); + } + + + + @Test + public void verifyJmsMessageHeaders() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); + + newConnectTask.start(connectorConfigProps); + + TextMessage message = getJmsContext().createTextMessage("helloworld"); + message.setStringProperty("teststring", "myvalue"); + message.setIntProperty("volume", 11); + message.setDoubleProperty("decimalmeaning", 42.0); + + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(1, kafkaMessages.size()); + SourceRecord kafkaMessage = kafkaMessages.get(0); + assertNull(kafkaMessage.key()); + assertNull(kafkaMessage.valueSchema()); + + assertEquals("helloworld", kafkaMessage.value()); + + assertEquals("myvalue", kafkaMessage.headers().lastWithName("teststring").value()); + assertEquals("11", kafkaMessage.headers().lastWithName("volume").value()); + assertEquals("42.0", kafkaMessage.headers().lastWithName("decimalmeaning").value()); + + newConnectTask.stop(); + } + + + + @Test + public void verifyMessageBatchIndividualCommits() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.batch.size", "10"); + + newConnectTask.start(connectorConfigProps); + + List messages = new ArrayList<>(); + for (int i = 1; i <= 35; i++) { + messages.add(getJmsContext().createTextMessage("batch message " + i)); + } + putAllMessagesToQueue(MQ_QUEUE, messages); + + int nextExpectedMessage = 1; + + List kafkaMessages; + + kafkaMessages = newConnectTask.poll(); + assertEquals(10, kafkaMessages.size()); + for (SourceRecord kafkaMessage : kafkaMessages) { + assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); + newConnectTask.commitRecord(kafkaMessage); + } + + kafkaMessages = newConnectTask.poll(); + assertEquals(10, kafkaMessages.size()); + for (SourceRecord kafkaMessage : kafkaMessages) { + assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); + newConnectTask.commitRecord(kafkaMessage); + } + + kafkaMessages = newConnectTask.poll(); + assertEquals(10, kafkaMessages.size()); + for (SourceRecord kafkaMessage : kafkaMessages) { + assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); + newConnectTask.commitRecord(kafkaMessage); + } + + kafkaMessages = newConnectTask.poll(); + assertEquals(5, kafkaMessages.size()); + for (SourceRecord kafkaMessage : kafkaMessages) { + assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); + newConnectTask.commitRecord(kafkaMessage); + } + + newConnectTask.stop(); + } + + + + @Test + public void verifyMessageBatchGroupCommits() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.batch.size", "10"); + + newConnectTask.start(connectorConfigProps); + + List messages = new ArrayList<>(); + for (int i = 1; i <= 35; i++) { + messages.add(getJmsContext().createTextMessage("message " + i)); + } + putAllMessagesToQueue(MQ_QUEUE, messages); + + List kafkaMessages; + + kafkaMessages = newConnectTask.poll(); + assertEquals(10, kafkaMessages.size()); + newConnectTask.commit(); + newConnectTask.commit(); + + kafkaMessages = newConnectTask.poll(); + assertEquals(10, kafkaMessages.size()); + newConnectTask.commit(); + newConnectTask.commit(); + + kafkaMessages = newConnectTask.poll(); + assertEquals(10, kafkaMessages.size()); + newConnectTask.commit(); + newConnectTask.commit(); + + kafkaMessages = newConnectTask.poll(); + assertEquals(5, kafkaMessages.size()); + newConnectTask.commit(); + newConnectTask.commit(); + + newConnectTask.stop(); + } + + + + @Test + public void verifyMessageIdAsKey() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder.key.header", "JMSMessageID"); + + newConnectTask.start(connectorConfigProps); + + TextMessage message = getJmsContext().createTextMessage("testmessage"); + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + SourceRecord kafkaMessage = kafkaMessages.get(0); + assertEquals(message.getJMSMessageID().substring("ID:".length()), kafkaMessage.key()); + assertNotNull(message.getJMSMessageID()); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage.keySchema()); + + assertEquals("testmessage", kafkaMessage.value()); + + newConnectTask.stop(); + } + + + + @Test + public void verifyCorrelationIdAsKey() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationID"); + + newConnectTask.start(connectorConfigProps); + + TextMessage message1 = getJmsContext().createTextMessage("first message"); + message1.setJMSCorrelationID("verifycorrel"); + TextMessage message2 = getJmsContext().createTextMessage("second message"); + message2.setJMSCorrelationID("ID:5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4"); + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2)); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(2, kafkaMessages.size()); + + SourceRecord kafkaMessage1 = kafkaMessages.get(0); + assertEquals("verifycorrel", kafkaMessage1.key()); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage1.keySchema()); + assertEquals("first message", kafkaMessage1.value()); + + SourceRecord kafkaMessage2 = kafkaMessages.get(1); + assertEquals("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4", kafkaMessage2.key()); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage2.keySchema()); + assertEquals("second message", kafkaMessage2.value()); + + newConnectTask.stop(); + } + + + + @Test + public void verifyCorrelationIdBytesAsKey() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationIDAsBytes"); + + newConnectTask.start(connectorConfigProps); + + TextMessage message = getJmsContext().createTextMessage("testmessagewithcorrelbytes"); + message.setJMSCorrelationID("verifycorrelbytes"); + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + SourceRecord kafkaMessage = kafkaMessages.get(0); + assertArrayEquals("verifycorrelbytes".getBytes(), (byte[])kafkaMessage.key()); + assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.keySchema()); + + assertEquals("testmessagewithcorrelbytes", kafkaMessage.value()); + + newConnectTask.stop(); + } + + + + @Test + public void verifyDestinationAsKey() throws Exception { + MQSourceTask newConnectTask = new MQSourceTask(); + + Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder.key.header", "JMSDestination"); + + newConnectTask.start(connectorConfigProps); + + TextMessage message = getJmsContext().createTextMessage("testmessagewithdest"); + putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); + + List kafkaMessages = newConnectTask.poll(); + assertEquals(1, kafkaMessages.size()); + + SourceRecord kafkaMessage = kafkaMessages.get(0); + assertEquals("queue:///" + MQ_QUEUE, kafkaMessage.key()); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage.keySchema()); + + assertEquals("testmessagewithdest", kafkaMessage.value()); + + newConnectTask.stop(); + } +} diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java new file mode 100644 index 0000000..7a187fa --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java @@ -0,0 +1,142 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource.builders; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import javax.jms.BytesMessage; +import javax.jms.MapMessage; +import javax.jms.MessageFormatException; +import javax.jms.TextMessage; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import com.ibm.eventstreams.connect.mqsource.AbstractJMSContextIT; + +public class DefaultRecordBuilderIT extends AbstractJMSContextIT { + + private static final String TOPIC = "MY.TOPIC"; + + + @Test + public void buildFromJmsMapMessage() throws Exception { + final String MESSAGE_CONTENTS = "This is the message contents"; + final boolean IS_JMS = true; + + // create MQ message + MapMessage message = getJmsContext().createMapMessage(); + message.setString("example", MESSAGE_CONTENTS); + + // use the builder to convert it to a Kafka record + DefaultRecordBuilder builder = new DefaultRecordBuilder(); + ConnectException exc = assertThrows(ConnectException .class, () -> { + builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + }); + + // verify the exception + assertEquals("Unsupported JMS message type", exc.getMessage()); + } + + + @Test + public void buildFromJmsTextMessage() throws Exception { + final String MESSAGE_CONTENTS = "This is the JMS message contents"; + final boolean IS_JMS = true; + + // create MQ message + TextMessage message = getJmsContext().createTextMessage(MESSAGE_CONTENTS); + + // use the builder to convert it to a Kafka record + DefaultRecordBuilder builder = new DefaultRecordBuilder(); + SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + + // verify the Kafka record + assertNull(record.key()); + assertEquals(MESSAGE_CONTENTS, record.value()); + assertNull(record.valueSchema()); + } + + + @Test + public void buildFromTextMessage() throws Exception { + final String MESSAGE_CONTENTS = "This is the message contents"; + final boolean IS_JMS = false; + + // create MQ message + TextMessage message = getJmsContext().createTextMessage(MESSAGE_CONTENTS); + + // use the builder to convert it to a Kafka record + DefaultRecordBuilder builder = new DefaultRecordBuilder(); + MessageFormatException exc = assertThrows(MessageFormatException.class, () -> { + builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + }); + + // verify the exception + assertEquals("JMSCC5002", exc.getErrorCode()); + assertTrue(exc.getMessage().contains("The message of type jms_text can not have its body assigned to")); + } + + + @Test + public void buildFromJmsBytesMessage() throws Exception { + final String MESSAGE_ORIGIN = "This is the data used for message contents"; + final byte[] MESSAGE_CONTENTS = MESSAGE_ORIGIN.getBytes(); + final boolean IS_JMS = true; + + // create MQ message + BytesMessage message = getJmsContext().createBytesMessage(); + message.writeBytes(MESSAGE_CONTENTS); + message.reset(); + + // use the builder to convert it to a Kafka record + DefaultRecordBuilder builder = new DefaultRecordBuilder(); + SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + + // verify the Kafka record + assertNull(record.key()); + assertArrayEquals(MESSAGE_CONTENTS, (byte[])record.value()); + assertNull(record.valueSchema()); + } + + + @Test + public void buildFromBytesMessage() throws Exception { + final String MESSAGE_ORIGIN = "This is the data used for message contents"; + final byte[] MESSAGE_CONTENTS = MESSAGE_ORIGIN.getBytes(); + final boolean IS_JMS = false; + + // create MQ message + BytesMessage message = getJmsContext().createBytesMessage(); + message.writeBytes(MESSAGE_CONTENTS); + message.reset(); + + // use the builder to convert it to a Kafka record + DefaultRecordBuilder builder = new DefaultRecordBuilder(); + SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + + // verify the Kafka record + assertNull(record.key()); + assertArrayEquals(MESSAGE_CONTENTS, (byte[])record.value()); + assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, record.valueSchema()); + } +} diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java new file mode 100644 index 0000000..ee12e3f --- /dev/null +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java @@ -0,0 +1,110 @@ +/** + * Copyright 2022 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.eventstreams.connect.mqsource.builders; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +import java.util.List; +import java.util.Map; + +import javax.jms.BytesMessage; +import javax.jms.MapMessage; +import javax.jms.TextMessage; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import com.ibm.eventstreams.connect.mqsource.AbstractJMSContextIT; + +public class JsonRecordBuilderIT extends AbstractJMSContextIT { + + private final String TOPIC = "MY.TOPIC"; + private final boolean IS_JMS = true; + + private final String MESSAGE_CONTENTS = + "{ " + + "\"hello\" : \"world\", " + + "\"test\" : 123, " + + "\"list\" : [ \"one\", \"two\", \"three\" ] " + + "}"; + + @SuppressWarnings("unchecked") + private void verifyJsonMap(Map value) { + assertEquals(3, value.keySet().size()); + assertEquals("world", value.get("hello")); + assertEquals(123L, value.get("test")); + String[] expected = { "one", "two", "three" }; + assertArrayEquals(expected, ((List) value.get("list")).toArray()); + } + + + + @Test + public void buildFromJmsTextMessage() throws Exception { + // create MQ message + TextMessage message = getJmsContext().createTextMessage(MESSAGE_CONTENTS); + + // use the builder to convert it to a Kafka record + JsonRecordBuilder builder = new JsonRecordBuilder(); + SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + + // verify the Kafka record + assertNull(record.key()); + assertNull(record.valueSchema()); + verifyJsonMap((Map) record.value()); + } + + + @Test + public void buildFromJmsBytesMessage() throws Exception { + // create MQ message + BytesMessage message = getJmsContext().createBytesMessage(); + message.writeBytes(MESSAGE_CONTENTS.getBytes()); + message.reset(); + + // use the builder to convert it to a Kafka record + JsonRecordBuilder builder = new JsonRecordBuilder(); + SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + + // verify the Kafka record + assertNull(record.key()); + assertNull(record.valueSchema()); + verifyJsonMap((Map) record.value()); + } + + + @Test + public void buildFromJmsMapMessage() throws Exception { + final String MESSAGE_CONTENTS = "This is the message contents"; + + // create MQ message + MapMessage message = getJmsContext().createMapMessage(); + message.setString("example", MESSAGE_CONTENTS); + + // use the builder to convert it to a Kafka record + JsonRecordBuilder builder = new JsonRecordBuilder(); + ConnectException exc = assertThrows(ConnectException .class, () -> { + builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + }); + + // verify the exception + assertEquals("Unsupported JMS message type", exc.getMessage()); + } +}