diff --git a/.checkstyle/checkstyle.xml b/.checkstyle/checkstyle.xml new file mode 100644 index 0000000..c2abc0a --- /dev/null +++ b/.checkstyle/checkstyle.xml @@ -0,0 +1,143 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.checkstyle/intellij-checkstyle.xml b/.checkstyle/intellij-checkstyle.xml new file mode 100644 index 0000000..47e04ee --- /dev/null +++ b/.checkstyle/intellij-checkstyle.xml @@ -0,0 +1,139 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.checkstyle/suppressions.xml b/.checkstyle/suppressions.xml new file mode 100644 index 0000000..1334171 --- /dev/null +++ b/.checkstyle/suppressions.xml @@ -0,0 +1,11 @@ + + + + + + + + + \ No newline at end of file diff --git a/.github/workflows/checkstyle-and-tests.yml b/.github/workflows/checkstyle-and-tests.yml new file mode 100644 index 0000000..595f637 --- /dev/null +++ b/.github/workflows/checkstyle-and-tests.yml @@ -0,0 +1,48 @@ +name: Checkstyle and Tests + +on: + pull_request: + branches: + - 'master' + types: [opened, synchronize, reopened] + +jobs: + test: + name: Checkstyle and Tests + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + java-version: 8 + distribution: 'temurin' + - name: Get java-version + run: | + BUILD_VERSION=$( mvn help:evaluate -Dexpression=project.version -q -DforceStdout ) + echo "VERSION=$BUILD_VERSION" >> $GITHUB_ENV + - name: Compile + run: mvn -X compile + - name: Checkstyle + run: mvn -X validate + - name: Test + run: mvn -X test + verify-commits: + name: Verify Commits + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 0 + + - name: Verify Signed-off-by + run: | + for commit in $(git rev-list --no-merges HEAD^..HEAD); do + if ! git log -1 --format=%B "$commit" | grep -q "^Signed-off-by: "; then + echo "Commit $commit is missing Signed-off-by line." + exit 1 + fi + done \ No newline at end of file diff --git a/pom.xml b/pom.xml index d0196e3..958ff18 100644 --- a/pom.xml +++ b/pom.xml @@ -262,6 +262,31 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.2.0 + + UTF-8 + true + true + warning + true + false + ${project.build.sourceDirectory} + ${project.basedir}/.checkstyle/checkstyle.xml + checkstyle.config.path=${project.basedir} + + + + validate + validate + + check + + + + diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java index b5ca57f..d81e1f6 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java @@ -49,7 +49,7 @@ public class AbstractJMSContextIT { private static final String CHANNEL_NAME = "DEV.APP.SVRCONN"; @ClassRule - public static GenericContainer MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") + public static GenericContainer mqContainer = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") .withEnv("LICENSE", "accept") .withEnv("MQ_QMGR_NAME", QMGR_NAME) .withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false") @@ -66,7 +66,7 @@ public JMSContext getJmsContext() throws Exception { if (jmsContext == null) { waitForQueueManagerStartup(); - MQConnectionFactory mqcf = new MQConnectionFactory(); + final MQConnectionFactory mqcf = new MQConnectionFactory(); mqcf.setTransportType(WMQConstants.WMQ_CM_CLIENT); mqcf.setChannel(CHANNEL_NAME); mqcf.setQueueManager(QMGR_NAME); @@ -78,51 +78,50 @@ public JMSContext getJmsContext() throws Exception { return jmsContext; } - /** - * Gets the host port that has been mapped to the default MQ 1414 port in the test container. + * Gets the host port that has been mapped to the default MQ 1414 port in the + * test container. */ public Integer getMQPort() { - return MQ_CONTAINER.getMappedPort(1414); + return mqContainer.getMappedPort(1414); } public String getQmgrName() { return QMGR_NAME; } + public String getChannelName() { return CHANNEL_NAME; } + public String getConnectionName() { return "localhost(" + getMQPort().toString() + ")"; } - /** - * Waits until we see a log line in the queue manager test container that indicates - * the queue manager is ready. + * Waits until we see a log line in the queue manager test container that + * indicates + * the queue manager is ready. */ private void waitForQueueManagerStartup() throws TimeoutException { - WaitingConsumer logConsumer = new WaitingConsumer(); - MQ_CONTAINER.followOutput(logConsumer); + final WaitingConsumer logConsumer = new WaitingConsumer(); + mqContainer.followOutput(logConsumer); logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I")); } - - - /** * Puts all messages to the specified MQ queue. Used in tests to - * give the Connector something to get. + * give the Connector something to get. */ - public void putAllMessagesToQueue(String queueName, List messages) throws JMSException { + public void putAllMessagesToQueue(final String queueName, final List messages) throws JMSException { Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; - JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); + final JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); - JmsConnectionFactory cf = ff.createConnectionFactory(); + final JmsConnectionFactory cf = ff.createConnectionFactory(); cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost"); cf.setIntProperty(WMQConstants.WMQ_PORT, getMQPort()); cf.setStringProperty(WMQConstants.WMQ_CHANNEL, getChannelName()); @@ -138,7 +137,7 @@ public void putAllMessagesToQueue(String queueName, List messages) thro connection.start(); - for (Message message : messages) { + for (final Message message : messages) { message.setJMSDestination(destination); producer.send(message); } @@ -146,20 +145,19 @@ public void putAllMessagesToQueue(String queueName, List messages) thro connection.close(); } - /** * Gets all messages from the specified MQ queue. Used in tests to - * verify what is left on the test queue + * verify what is left on the test queue */ - public List getAllMessagesFromQueue(String queueName) throws JMSException { + public List getAllMessagesFromQueue(final String queueName) throws JMSException { Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; - JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); + final JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); - JmsConnectionFactory cf = ff.createConnectionFactory(); + final JmsConnectionFactory cf = ff.createConnectionFactory(); cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost"); cf.setIntProperty(WMQConstants.WMQ_PORT, getMQPort()); cf.setStringProperty(WMQConstants.WMQ_CHANNEL, getChannelName()); @@ -175,7 +173,7 @@ public List getAllMessagesFromQueue(String queueName) throws JMSExcepti connection.start(); - List messages = new ArrayList<>(); + final List messages = new ArrayList<>(); Message message; do { message = consumer.receiveNoWait(); diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java index 043849b..74bcb7a 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java @@ -52,7 +52,7 @@ public class MQSourceTaskAuthIT { @ClassRule - public static GenericContainer MQ_CONTAINER = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") + public static GenericContainer mqContainer = new GenericContainer<>("icr.io/ibm-messaging/mq:latest") .withEnv("LICENSE", "accept") .withEnv("MQ_QMGR_NAME", QMGR_NAME) .withEnv("MQ_APP_PASSWORD", APP_PASSWORD) @@ -61,10 +61,10 @@ public class MQSourceTaskAuthIT { private Map getConnectorProps() { - Map connectorProps = new HashMap<>(); + final Map connectorProps = new HashMap<>(); connectorProps.put("mq.queue.manager", QMGR_NAME); connectorProps.put("mq.connection.mode", "client"); - connectorProps.put("mq.connection.name.list", "localhost(" + MQ_CONTAINER.getMappedPort(1414).toString() + ")"); + connectorProps.put("mq.connection.name.list", "localhost(" + mqContainer.getMappedPort(1414).toString() + ")"); connectorProps.put("mq.channel.name", CHANNEL_NAME); connectorProps.put("mq.queue", QUEUE_NAME); connectorProps.put("mq.user.authentication.mqcsp", "true"); @@ -79,18 +79,18 @@ private Map getConnectorProps() { public void testAuthenticatedQueueManager() throws Exception { waitForQueueManagerStartup(); - MQSourceTask newConnectTask = new MQSourceTask(); + final MQSourceTask newConnectTask = new MQSourceTask(); newConnectTask.start(getConnectorProps()); - MQMessage message1 = new MQMessage(); + final MQMessage message1 = new MQMessage(); message1.writeString("hello"); - MQMessage message2 = new MQMessage(); + final MQMessage message2 = new MQMessage(); message2.writeString("world"); putAllMessagesToQueue(Arrays.asList(message1, message2)); - List kafkaMessages = newConnectTask.poll(); + final List kafkaMessages = newConnectTask.poll(); assertEquals(2, kafkaMessages.size()); - for (SourceRecord kafkaMessage : kafkaMessages) { + for (final SourceRecord kafkaMessage : kafkaMessages) { assertNull(kafkaMessage.key()); assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.valueSchema()); @@ -100,66 +100,64 @@ public void testAuthenticatedQueueManager() throws Exception { assertArrayEquals("hello".getBytes(), (byte[]) kafkaMessages.get(0).value()); assertArrayEquals("world".getBytes(), (byte[]) kafkaMessages.get(1).value()); - SourceTaskStopper stopper = new SourceTaskStopper(newConnectTask); + final SourceTaskStopper stopper = new SourceTaskStopper(newConnectTask); stopper.run(); } - - @Test public void verifyJmsConnClosed() throws Exception { - int restApiPortNumber = MQ_CONTAINER.getMappedPort(9443); + final int restApiPortNumber = mqContainer.getMappedPort(9443); // count number of connections to the qmgr at the start - int numQmgrConnectionsBefore = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD); + final int numQmgrConnectionsBefore = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, + ADMIN_PASSWORD); // start the source connector so that it connects to the qmgr - MQSourceTask connectTask = new MQSourceTask(); + final MQSourceTask connectTask = new MQSourceTask(); connectTask.start(getConnectorProps()); // count number of connections to the qmgr now - it should have increased - int numQmgrConnectionsDuring = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD); + final int numQmgrConnectionsDuring = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, + ADMIN_PASSWORD); // stop the source connector so it disconnects from the qmgr connectTask.stop(); // count number of connections to the qmgr now - it should have decreased - int numQmgrConnectionsAfter = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, ADMIN_PASSWORD); + final int numQmgrConnectionsAfter = MQQueueManagerAttrs.getNumConnections(QMGR_NAME, restApiPortNumber, + ADMIN_PASSWORD); // verify number of connections changed as expected assertTrue("connections should have increased after starting the source task", - numQmgrConnectionsDuring > numQmgrConnectionsBefore); + numQmgrConnectionsDuring > numQmgrConnectionsBefore); assertTrue("connections should have decreased after calling stop()", - numQmgrConnectionsAfter < numQmgrConnectionsDuring); + numQmgrConnectionsAfter < numQmgrConnectionsDuring); // cleanup - SourceTaskStopper stopper = new SourceTaskStopper(connectTask); + final SourceTaskStopper stopper = new SourceTaskStopper(connectTask); stopper.run(); } - - private void waitForQueueManagerStartup() throws TimeoutException { - WaitingConsumer logConsumer = new WaitingConsumer(); - MQ_CONTAINER.followOutput(logConsumer); + final WaitingConsumer logConsumer = new WaitingConsumer(); + mqContainer.followOutput(logConsumer); logConsumer.waitUntil(logline -> logline.getUtf8String().contains("AMQ5975I")); } - - private void putAllMessagesToQueue(List messages) throws MQException { - Hashtable props = new Hashtable<>(); + private void putAllMessagesToQueue(final List messages) throws MQException { + final Hashtable props = new Hashtable<>(); props.put(MQConstants.HOST_NAME_PROPERTY, "localhost"); - props.put(MQConstants.PORT_PROPERTY, MQ_CONTAINER.getMappedPort(1414)); + props.put(MQConstants.PORT_PROPERTY, mqContainer.getMappedPort(1414)); props.put(MQConstants.CHANNEL_PROPERTY, CHANNEL_NAME); props.put(MQConstants.USER_ID_PROPERTY, "app"); props.put(MQConstants.PASSWORD_PROPERTY, APP_PASSWORD); - MQQueueManager qmgr = new MQQueueManager(QMGR_NAME, props); + final MQQueueManager qmgr = new MQQueueManager(QMGR_NAME, props); - MQQueue q = qmgr.accessQueue(QUEUE_NAME, MQConstants.MQOO_OUTPUT); + final MQQueue q = qmgr.accessQueue(QUEUE_NAME, MQConstants.MQOO_OUTPUT); - for (MQMessage message : messages) { + for (final MQMessage message : messages) { q.put(message); } } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index c28de90..66e4629 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -30,7 +30,6 @@ import javax.jms.MapMessage; import javax.jms.Message; -import javax.jms.MessageFormatException; import javax.jms.TextMessage; import org.apache.kafka.connect.data.Schema; @@ -48,15 +47,14 @@ public class MQSourceTaskIT extends AbstractJMSContextIT { @After public void cleanup() throws InterruptedException { - SourceTaskStopper stopper = new SourceTaskStopper(connectTask); + final SourceTaskStopper stopper = new SourceTaskStopper(connectTask); stopper.run(); } - private static final String MQ_QUEUE = "DEV.QUEUE.1"; private Map createDefaultConnectorProperties() { - Map props = new HashMap<>(); + final Map props = new HashMap<>(); props.put("mq.queue.manager", getQmgrName()); props.put("mq.connection.mode", "client"); props.put("mq.connection.name.list", getConnectionName()); @@ -66,24 +64,24 @@ private Map createDefaultConnectorProperties() { return props; } - @Test public void verifyJmsTextMessages() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectTask.start(connectorConfigProps); - TextMessage message1 = getJmsContext().createTextMessage("hello"); - TextMessage message2 = getJmsContext().createTextMessage("world"); + final TextMessage message1 = getJmsContext().createTextMessage("hello"); + final TextMessage message2 = getJmsContext().createTextMessage("world"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2)); - List kafkaMessages = connectTask.poll(); + final List kafkaMessages = connectTask.poll(); assertEquals(2, kafkaMessages.size()); - for (SourceRecord kafkaMessage : kafkaMessages) { + for (final SourceRecord kafkaMessage : kafkaMessages) { assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); @@ -94,64 +92,62 @@ public void verifyJmsTextMessages() throws Exception { assertEquals("world", kafkaMessages.get(1).value()); } - - @Test public void verifyJmsJsonMessages() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); connectTask.start(connectorConfigProps); - List messages = new ArrayList<>(); + final List messages = new ArrayList<>(); for (int i = 0; i < 5; i++) { messages.add(getJmsContext().createTextMessage( - "{ " + - "\"i\" : " + i + - "}")); + "{ " + + "\"i\" : " + i + + "}")); } putAllMessagesToQueue(MQ_QUEUE, messages); - List kafkaMessages = connectTask.poll(); + final List kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); for (int i = 0; i < 5; i++) { - SourceRecord kafkaMessage = kafkaMessages.get(i); + final SourceRecord kafkaMessage = kafkaMessages.get(i); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); - Map value = (Map) kafkaMessage.value(); + final Map value = (Map) kafkaMessage.value(); assertEquals(Long.valueOf(i), value.get("i")); connectTask.commitRecord(kafkaMessage); } } - - @Test public void verifyJmsMessageHeaders() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); connectTask.start(connectorConfigProps); - TextMessage message = getJmsContext().createTextMessage("helloworld"); + final TextMessage message = getJmsContext().createTextMessage("helloworld"); message.setStringProperty("teststring", "myvalue"); message.setIntProperty("volume", 11); message.setDoubleProperty("decimalmeaning", 42.0); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = connectTask.poll(); + final List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); - SourceRecord kafkaMessage = kafkaMessages.get(0); + final SourceRecord kafkaMessage = kafkaMessages.get(0); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); @@ -164,20 +160,19 @@ public void verifyJmsMessageHeaders() throws Exception { connectTask.commitRecord(kafkaMessage); } - - @Test public void verifyMessageBatchIndividualCommits() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.batch.size", "10"); connectTask.start(connectorConfigProps); - List messages = new ArrayList<>(); + final List messages = new ArrayList<>(); for (int i = 1; i <= 35; i++) { messages.add(getJmsContext().createTextMessage("batch message " + i)); } @@ -189,47 +184,46 @@ public void verifyMessageBatchIndividualCommits() throws Exception { kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - for (SourceRecord kafkaMessage : kafkaMessages) { + for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); connectTask.commitRecord(kafkaMessage); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - for (SourceRecord kafkaMessage : kafkaMessages) { + for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); connectTask.commitRecord(kafkaMessage); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - for (SourceRecord kafkaMessage : kafkaMessages) { + for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); connectTask.commitRecord(kafkaMessage); } kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); - for (SourceRecord kafkaMessage : kafkaMessages) { + for (final SourceRecord kafkaMessage : kafkaMessages) { assertEquals("batch message " + (nextExpectedMessage++), kafkaMessage.value()); connectTask.commitRecord(kafkaMessage); } } - - @Test public void verifyMessageBatchGroupCommits() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.batch.size", "10"); connectTask.start(connectorConfigProps); - List messages = new ArrayList<>(); + final List messages = new ArrayList<>(); for (int i = 1; i <= 35; i++) { messages.add(getJmsContext().createTextMessage("message " + i)); } @@ -239,38 +233,37 @@ public void verifyMessageBatchGroupCommits() throws Exception { kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - for (SourceRecord m : kafkaMessages) { + for (final SourceRecord m : kafkaMessages) { connectTask.commitRecord(m); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - for (SourceRecord m : kafkaMessages) { + for (final SourceRecord m : kafkaMessages) { connectTask.commitRecord(m); } kafkaMessages = connectTask.poll(); assertEquals(10, kafkaMessages.size()); - for (SourceRecord m : kafkaMessages) { + for (final SourceRecord m : kafkaMessages) { connectTask.commitRecord(m); } kafkaMessages = connectTask.poll(); assertEquals(5, kafkaMessages.size()); - for (SourceRecord m : kafkaMessages) { + for (final SourceRecord m : kafkaMessages) { connectTask.commitRecord(m); } } - - @Test public void verifyMessageBatchRollback() throws Exception { - MQSourceTask newConnectTask = new MQSourceTask(); + final MQSourceTask newConnectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.batch.size", "10"); newConnectTask.start(connectorConfigProps); @@ -278,14 +271,14 @@ public void verifyMessageBatchRollback() throws Exception { // Test overview: // // messages 01-15 - valid messages - // message 16 - a message that the builder can't process + // message 16 - a message that the builder can't process // messages 17-30 - valid messages - List messages = new ArrayList<>(); + final List messages = new ArrayList<>(); for (int i = 1; i <= 15; i++) { messages.add(getJmsContext().createTextMessage("message " + i)); } - MapMessage invalidMessage = getJmsContext().createMapMessage(); + final MapMessage invalidMessage = getJmsContext().createMapMessage(); invalidMessage.setString("test", "builder cannot convert this"); messages.add(invalidMessage); for (int i = 17; i <= 30; i++) { @@ -293,7 +286,7 @@ public void verifyMessageBatchRollback() throws Exception { } putAllMessagesToQueue(MQ_QUEUE, messages); - List kafkaMessages; + final List kafkaMessages; // first batch should successfully retrieve messages 01-10 kafkaMessages = newConnectTask.poll(); @@ -302,37 +295,36 @@ public void verifyMessageBatchRollback() throws Exception { newConnectTask.commit(); // second batch (11-20) should fail because of message 16 - ConnectException exc = assertThrows(ConnectException.class, () -> { + final ConnectException exc = assertThrows(ConnectException.class, () -> { newConnectTask.poll(); }); assertTrue(exc.getMessage().equals("Unsupported JMS message type")); // there should be 20 messages left on the MQ queue (messages 11-30) newConnectTask.stop(); - List remainingMQMessages = getAllMessagesFromQueue(MQ_QUEUE); + final List remainingMQMessages = getAllMessagesFromQueue(MQ_QUEUE); assertEquals(20, remainingMQMessages.size()); } - - @Test public void verifyMessageIdAsKey() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSMessageID"); connectTask.start(connectorConfigProps); - TextMessage message = getJmsContext().createTextMessage("testmessage"); + final TextMessage message = getJmsContext().createTextMessage("testmessage"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = connectTask.poll(); + final List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); - SourceRecord kafkaMessage = kafkaMessages.get(0); + final SourceRecord kafkaMessage = kafkaMessages.get(0); assertEquals(message.getJMSMessageID().substring("ID:".length()), kafkaMessage.key()); assertNotNull(message.getJMSMessageID()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage.keySchema()); @@ -342,63 +334,61 @@ public void verifyMessageIdAsKey() throws Exception { connectTask.commitRecord(kafkaMessage); } - - @Test public void verifyCorrelationIdAsKey() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationID"); connectTask.start(connectorConfigProps); - TextMessage message1 = getJmsContext().createTextMessage("first message"); + final TextMessage message1 = getJmsContext().createTextMessage("first message"); message1.setJMSCorrelationID("verifycorrel"); - TextMessage message2 = getJmsContext().createTextMessage("second message"); + final TextMessage message2 = getJmsContext().createTextMessage("second message"); message2.setJMSCorrelationID("ID:5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message1, message2)); - List kafkaMessages = connectTask.poll(); + final List kafkaMessages = connectTask.poll(); assertEquals(2, kafkaMessages.size()); - SourceRecord kafkaMessage1 = kafkaMessages.get(0); + final SourceRecord kafkaMessage1 = kafkaMessages.get(0); assertEquals("verifycorrel", kafkaMessage1.key()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage1.keySchema()); assertEquals("first message", kafkaMessage1.value()); connectTask.commitRecord(kafkaMessage1); - SourceRecord kafkaMessage2 = kafkaMessages.get(1); + final SourceRecord kafkaMessage2 = kafkaMessages.get(1); assertEquals("5fb4a18030154fe4b09a1dfe8075bc101dfe8075bc104fe4", kafkaMessage2.key()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage2.keySchema()); assertEquals("second message", kafkaMessage2.value()); connectTask.commitRecord(kafkaMessage2); } - - @Test public void verifyCorrelationIdBytesAsKey() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSCorrelationIDAsBytes"); connectTask.start(connectorConfigProps); - TextMessage message = getJmsContext().createTextMessage("testmessagewithcorrelbytes"); + final TextMessage message = getJmsContext().createTextMessage("testmessagewithcorrelbytes"); message.setJMSCorrelationID("verifycorrelbytes"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = connectTask.poll(); + final List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); - SourceRecord kafkaMessage = kafkaMessages.get(0); - assertArrayEquals("verifycorrelbytes".getBytes(), (byte[])kafkaMessage.key()); + final SourceRecord kafkaMessage = kafkaMessages.get(0); + assertArrayEquals("verifycorrelbytes".getBytes(), (byte[]) kafkaMessage.key()); assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, kafkaMessage.keySchema()); assertEquals("testmessagewithcorrelbytes", kafkaMessage.value()); @@ -406,26 +396,25 @@ public void verifyCorrelationIdBytesAsKey() throws Exception { connectTask.commitRecord(kafkaMessage); } - - @Test public void verifyDestinationAsKey() throws Exception { connectTask = new MQSourceTask(); - Map connectorConfigProps = createDefaultConnectorProperties(); + final Map connectorConfigProps = createDefaultConnectorProperties(); connectorConfigProps.put("mq.message.body.jms", "true"); - connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorConfigProps.put("mq.record.builder", + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); connectorConfigProps.put("mq.record.builder.key.header", "JMSDestination"); connectTask.start(connectorConfigProps); - TextMessage message = getJmsContext().createTextMessage("testmessagewithdest"); + final TextMessage message = getJmsContext().createTextMessage("testmessagewithdest"); putAllMessagesToQueue(MQ_QUEUE, Arrays.asList(message)); - List kafkaMessages = connectTask.poll(); + final List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); - SourceRecord kafkaMessage = kafkaMessages.get(0); + final SourceRecord kafkaMessage = kafkaMessages.get(0); assertEquals("queue:///" + MQ_QUEUE, kafkaMessage.key()); assertEquals(Schema.OPTIONAL_STRING_SCHEMA, kafkaMessage.keySchema()); diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java index 7a187fa..8062ba8 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java @@ -40,55 +40,53 @@ public class DefaultRecordBuilderIT extends AbstractJMSContextIT { @Test public void buildFromJmsMapMessage() throws Exception { - final String MESSAGE_CONTENTS = "This is the message contents"; - final boolean IS_JMS = true; + final String messageContent = "This is the message contents"; + final boolean isJMS = true; // create MQ message - MapMessage message = getJmsContext().createMapMessage(); - message.setString("example", MESSAGE_CONTENTS); + final MapMessage message = getJmsContext().createMapMessage(); + message.setString("example", messageContent); // use the builder to convert it to a Kafka record - DefaultRecordBuilder builder = new DefaultRecordBuilder(); - ConnectException exc = assertThrows(ConnectException .class, () -> { - builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final DefaultRecordBuilder builder = new DefaultRecordBuilder(); + final ConnectException exc = assertThrows(ConnectException.class, () -> { + builder.toSourceRecord(getJmsContext(), TOPIC, isJMS, message); }); // verify the exception assertEquals("Unsupported JMS message type", exc.getMessage()); } - @Test public void buildFromJmsTextMessage() throws Exception { - final String MESSAGE_CONTENTS = "This is the JMS message contents"; - final boolean IS_JMS = true; + final String messageContents = "This is the JMS message contents"; + final boolean isJMS = true; // create MQ message - TextMessage message = getJmsContext().createTextMessage(MESSAGE_CONTENTS); + final TextMessage message = getJmsContext().createTextMessage(messageContents); // use the builder to convert it to a Kafka record - DefaultRecordBuilder builder = new DefaultRecordBuilder(); - SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final DefaultRecordBuilder builder = new DefaultRecordBuilder(); + final SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, isJMS, message); // verify the Kafka record assertNull(record.key()); - assertEquals(MESSAGE_CONTENTS, record.value()); + assertEquals(messageContents, record.value()); assertNull(record.valueSchema()); } - @Test public void buildFromTextMessage() throws Exception { - final String MESSAGE_CONTENTS = "This is the message contents"; - final boolean IS_JMS = false; + final String messageContents = "This is the message contents"; + final boolean isJMS = false; // create MQ message - TextMessage message = getJmsContext().createTextMessage(MESSAGE_CONTENTS); + final TextMessage message = getJmsContext().createTextMessage(messageContents); // use the builder to convert it to a Kafka record - DefaultRecordBuilder builder = new DefaultRecordBuilder(); - MessageFormatException exc = assertThrows(MessageFormatException.class, () -> { - builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final DefaultRecordBuilder builder = new DefaultRecordBuilder(); + final MessageFormatException exc = assertThrows(MessageFormatException.class, () -> { + builder.toSourceRecord(getJmsContext(), TOPIC, isJMS, message); }); // verify the exception @@ -96,47 +94,45 @@ public void buildFromTextMessage() throws Exception { assertTrue(exc.getMessage().contains("The message of type jms_text can not have its body assigned to")); } - @Test public void buildFromJmsBytesMessage() throws Exception { - final String MESSAGE_ORIGIN = "This is the data used for message contents"; - final byte[] MESSAGE_CONTENTS = MESSAGE_ORIGIN.getBytes(); - final boolean IS_JMS = true; + final String messageOrigin = "This is the data used for message contents"; + final byte[] messageContents = messageOrigin.getBytes(); + final boolean isJMS = true; // create MQ message - BytesMessage message = getJmsContext().createBytesMessage(); - message.writeBytes(MESSAGE_CONTENTS); + final BytesMessage message = getJmsContext().createBytesMessage(); + message.writeBytes(messageContents); message.reset(); // use the builder to convert it to a Kafka record - DefaultRecordBuilder builder = new DefaultRecordBuilder(); - SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final DefaultRecordBuilder builder = new DefaultRecordBuilder(); + final SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, isJMS, message); // verify the Kafka record assertNull(record.key()); - assertArrayEquals(MESSAGE_CONTENTS, (byte[])record.value()); + assertArrayEquals(messageContents, (byte[]) record.value()); assertNull(record.valueSchema()); } - @Test public void buildFromBytesMessage() throws Exception { - final String MESSAGE_ORIGIN = "This is the data used for message contents"; - final byte[] MESSAGE_CONTENTS = MESSAGE_ORIGIN.getBytes(); - final boolean IS_JMS = false; + final String messageOrigin = "This is the data used for message contents"; + final byte[] messageContents = messageOrigin.getBytes(); + final boolean isJMS = false; // create MQ message - BytesMessage message = getJmsContext().createBytesMessage(); - message.writeBytes(MESSAGE_CONTENTS); + final BytesMessage message = getJmsContext().createBytesMessage(); + message.writeBytes(messageContents); message.reset(); // use the builder to convert it to a Kafka record - DefaultRecordBuilder builder = new DefaultRecordBuilder(); - SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final DefaultRecordBuilder builder = new DefaultRecordBuilder(); + final SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, isJMS, message); // verify the Kafka record assertNull(record.key()); - assertArrayEquals(MESSAGE_CONTENTS, (byte[])record.value()); + assertArrayEquals(messageContents, (byte[]) record.value()); assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, record.valueSchema()); } } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java index ee12e3f..df87893 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java @@ -35,10 +35,10 @@ public class JsonRecordBuilderIT extends AbstractJMSContextIT { - private final String TOPIC = "MY.TOPIC"; - private final boolean IS_JMS = true; + private final String topic = "MY.TOPIC"; + private final boolean isJMS = true; - private final String MESSAGE_CONTENTS = + private final String messageContents = "{ " + "\"hello\" : \"world\", " + "\"test\" : 123, " + @@ -46,24 +46,22 @@ public class JsonRecordBuilderIT extends AbstractJMSContextIT { "}"; @SuppressWarnings("unchecked") - private void verifyJsonMap(Map value) { + private void verifyJsonMap(final Map value) { assertEquals(3, value.keySet().size()); assertEquals("world", value.get("hello")); assertEquals(123L, value.get("test")); - String[] expected = { "one", "two", "three" }; + final String[] expected = {"one", "two", "three"}; assertArrayEquals(expected, ((List) value.get("list")).toArray()); } - - @Test public void buildFromJmsTextMessage() throws Exception { // create MQ message - TextMessage message = getJmsContext().createTextMessage(MESSAGE_CONTENTS); + final TextMessage message = getJmsContext().createTextMessage(messageContents); // use the builder to convert it to a Kafka record - JsonRecordBuilder builder = new JsonRecordBuilder(); - SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message); // verify the Kafka record assertNull(record.key()); @@ -71,17 +69,16 @@ public void buildFromJmsTextMessage() throws Exception { verifyJsonMap((Map) record.value()); } - @Test public void buildFromJmsBytesMessage() throws Exception { // create MQ message - BytesMessage message = getJmsContext().createBytesMessage(); - message.writeBytes(MESSAGE_CONTENTS.getBytes()); + final BytesMessage message = getJmsContext().createBytesMessage(); + message.writeBytes(messageContents.getBytes()); message.reset(); // use the builder to convert it to a Kafka record - JsonRecordBuilder builder = new JsonRecordBuilder(); - SourceRecord record = builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message); // verify the Kafka record assertNull(record.key()); @@ -89,19 +86,18 @@ public void buildFromJmsBytesMessage() throws Exception { verifyJsonMap((Map) record.value()); } - @Test public void buildFromJmsMapMessage() throws Exception { - final String MESSAGE_CONTENTS = "This is the message contents"; + final String messageContents = "This is the message contents"; // create MQ message - MapMessage message = getJmsContext().createMapMessage(); - message.setString("example", MESSAGE_CONTENTS); + final MapMessage message = getJmsContext().createMapMessage(); + message.setString("example", messageContents); // use the builder to convert it to a Kafka record - JsonRecordBuilder builder = new JsonRecordBuilder(); - ConnectException exc = assertThrows(ConnectException .class, () -> { - builder.toSourceRecord(getJmsContext(), TOPIC, IS_JMS, message); + final JsonRecordBuilder builder = new JsonRecordBuilder(); + final ConnectException exc = assertThrows(ConnectException.class, () -> { + builder.toSourceRecord(getJmsContext(), topic, isJMS, message); }); // verify the exception diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/JsonRestApi.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/JsonRestApi.java index 6335e9a..86246dc 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/JsonRestApi.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/JsonRestApi.java @@ -40,31 +40,31 @@ import org.json.JSONObject; public class JsonRestApi { - - public static JSONObject jsonPost(String url, String username, String password, String payload) throws IOException, KeyManagementException, NoSuchAlgorithmException, JSONException { - URL urlObj = new URL(url); - HttpsURLConnection urlConnection = (HttpsURLConnection) urlObj.openConnection(); + + public static JSONObject jsonPost(final String url, final String username, final String password, + final String payload) throws IOException, KeyManagementException, NoSuchAlgorithmException, JSONException { + final URL urlObj = new URL(url); + final HttpsURLConnection urlConnection = (HttpsURLConnection) urlObj.openConnection(); urlConnection.setHostnameVerifier(new IgnoreCertVerifier()); urlConnection.setSSLSocketFactory(getTrustAllCertsFactory()); urlConnection.setRequestProperty("Authorization", getAuthHeader(username, password)); urlConnection.setRequestProperty("Content-Type", "application/json"); urlConnection.setRequestProperty("ibm-mq-rest-csrf-token", "junit"); urlConnection.setDoOutput(true); - - try(OutputStream os = urlConnection.getOutputStream()) { - byte[] input = payload.getBytes("utf-8"); - os.write(input, 0, input.length); + + try (OutputStream os = urlConnection.getOutputStream()) { + final byte[] input = payload.getBytes("utf-8"); + os.write(input, 0, input.length); } - - try (InputStream input = urlConnection.getInputStream()){ - BufferedReader re = new BufferedReader(new InputStreamReader(input, Charset.forName("utf-8"))); + + try (InputStream input = urlConnection.getInputStream()) { + final BufferedReader re = new BufferedReader(new InputStreamReader(input, Charset.forName("utf-8"))); return new JSONObject(read(re)); } } - - private static String read(Reader re) throws IOException { - StringBuilder str = new StringBuilder(); + private static String read(final Reader re) throws IOException { + final StringBuilder str = new StringBuilder(); int ch; do { ch = re.read(); @@ -72,30 +72,35 @@ private static String read(Reader re) throws IOException { } while (ch != -1); return str.toString(); } - - - private static String getAuthHeader(String username, String password) { - String userpass = username + ":" + password; - String basicAuth = "Basic " + new String(Base64.getEncoder().encode(userpass.getBytes())); + + private static String getAuthHeader(final String username, final String password) { + final String userpass = username + ":" + password; + final String basicAuth = "Basic " + new String(Base64.getEncoder().encode(userpass.getBytes())); return basicAuth; } - + private static class IgnoreCertVerifier implements HostnameVerifier { @Override - public boolean verify(String host, SSLSession session) { + public boolean verify(final String host, final SSLSession session) { return true; } } - + private static SSLSocketFactory getTrustAllCertsFactory() throws NoSuchAlgorithmException, KeyManagementException { - TrustManager[] trustAllCerts = new TrustManager[] { + final TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { - public java.security.cert.X509Certificate[] getAcceptedIssuers() { return null; } - public void checkClientTrusted(X509Certificate[] certs, String authType) { } - public void checkServerTrusted(X509Certificate[] certs, String authType) { } - } + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + + public void checkClientTrusted(final X509Certificate[] certs, final String authType) { + } + + public void checkServerTrusted(final X509Certificate[] certs, final String authType) { + } + } }; - SSLContext sc = SSLContext.getInstance("SSL"); + final SSLContext sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, new java.security.SecureRandom()); return sc.getSocketFactory(); } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/MQQueueManagerAttrs.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/MQQueueManagerAttrs.java index fd56d9f..b358b85 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/MQQueueManagerAttrs.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/MQQueueManagerAttrs.java @@ -30,23 +30,21 @@ public class MQQueueManagerAttrs { + " \"command\": \"display conn(*) where (channel EQ 'DEV.APP.SVRCONN')\"" + " }" + "}"; - - - public static int getNumConnections(String qmgrname, int portnum, String password) throws KeyManagementException, NoSuchAlgorithmException, IOException, JSONException { - String url = "https://localhost:" + portnum + "/ibmmq/rest/v2/admin/action/qmgr/" + qmgrname + "/mqsc"; - JSONObject connectionInfo = JsonRestApi.jsonPost(url, "admin", password, REQ_GET_SVRCONNS); - int completionCode = connectionInfo.getInt("overallCompletionCode"); - int reasonCode = connectionInfo.getInt("overallReasonCode"); - + public static int getNumConnections(final String qmgrname, final int portnum, final String password) + throws KeyManagementException, NoSuchAlgorithmException, IOException, JSONException { + final String url = "https://localhost:" + portnum + "/ibmmq/rest/v2/admin/action/qmgr/" + qmgrname + "/mqsc"; + final JSONObject connectionInfo = JsonRestApi.jsonPost(url, "admin", password, REQ_GET_SVRCONNS); + + final int completionCode = connectionInfo.getInt("overallCompletionCode"); + final int reasonCode = connectionInfo.getInt("overallReasonCode"); + if (completionCode == 2 && reasonCode == 3008) { return 0; - } - else if (completionCode == 0 && reasonCode == 0) { + } else if (completionCode == 0 && reasonCode == 0) { return connectionInfo.getJSONArray("commandResponse").length(); - } - else { + } else { return -1; } - } + } } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/SourceTaskStopper.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/SourceTaskStopper.java index 531599b..5e6ed75 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/SourceTaskStopper.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/utils/SourceTaskStopper.java @@ -23,39 +23,38 @@ /** * Stops an instance of the MQSourceTask in a way that will ensure - * it commits the work that it has completed so far. - * + * it commits the work that it has completed so far. + * * This is needed for tests so that subsequent tests don't disrupt * each other. */ public class SourceTaskStopper { - + private static ExecutorService executor = Executors.newCachedThreadPool(); private SourceTask sourceTask; - - public SourceTaskStopper(SourceTask task) { - sourceTask = task; + + public SourceTaskStopper(final SourceTask task) { + sourceTask = task; } - + public void run() throws InterruptedException { // start the poll in a background thread executor.submit(new PollStarter()); - - // the pollstarter thread will block waiting for messages - // that don't ever come so wait briefly before stopping - // it from another thread + + // the pollstarter thread will block waiting for messages + // that don't ever come so wait briefly before stopping + // it from another thread Thread.sleep(200); - sourceTask.stop(); + sourceTask.stop(); } - - class PollStarter implements Runnable { + class PollStarter implements Runnable { @Override public void run() { try { sourceTask.poll(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { e.printStackTrace(); } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java index a5bb0b3..19c8679 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java @@ -19,7 +19,8 @@ import com.ibm.mq.MQException; import com.ibm.mq.constants.MQConstants; -import com.ibm.mq.jms.*; +import com.ibm.mq.jms.MQConnectionFactory; +import com.ibm.mq.jms.MQQueue; import com.ibm.msg.client.wmq.WMQConstants; import java.io.FileInputStream; @@ -76,44 +77,46 @@ public class JMSReader { private boolean inflight = false; // Whether messages in-flight in current transaction private boolean inperil = false; // Whether current transaction must be forced to roll back private AtomicBoolean closeNow = new AtomicBoolean(); // Whether close has been requested - private long reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; // Delay between repeated reconnect attempts + private long reconnectDelayMillis = reconnectDelayMillisMin; // Delay between repeated reconnect attempts - private static long RECEIVE_TIMEOUT = 30000l; - private static long RECONNECT_DELAY_MILLIS_MIN = 64l; - private static long RECONNECT_DELAY_MILLIS_MAX = 8192l; + private static long receiveTimeout = 30000L; + private static long reconnectDelayMillisMin = 64L; + private static long reconnectDelayMillisMax = 8192L; - public JMSReader() {} + public JMSReader() { + } /** * Configure this class. * * @param props initial configuration * - * @throws ConnectException Operation failed and connector should stop. + * @throws ConnectException Operation failed and connector should stop. */ - public void configure(Map props) { - log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); - - String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER); - String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE); - String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST); - String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME); - String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE); - String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); - String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); - String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL); - String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER); - String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS); - String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ); - String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE); - String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME); - String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION); - String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD); - String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION); - String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD); - String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP); - String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); - String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC); + public void configure(final Map props) { + log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), + props); + + final String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER); + final String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE); + final String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST); + final String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME); + final String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE); + final String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); + final String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); + final String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL); + final String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER); + final String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS); + final String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ); + final String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE); + final String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME); + final String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION); + final String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD); + final String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION); + final String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD); + final String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP); + final String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); + final String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC); if (useIBMCipherMappings != null) { System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings); @@ -123,11 +126,9 @@ public void configure(Map props) { if (connectionMode != null) { if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) { transportType = WMQConstants.WMQ_CM_CLIENT; - } - else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) { + } else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) { transportType = WMQConstants.WMQ_CM_BINDINGS; - } - else { + } else { log.error("Unsupported MQ connection mode {}", connectionMode); throw new ConnectException("Unsupported MQ connection mode"); } @@ -139,36 +140,35 @@ else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE mqConnFactory.setQueueManager(queueManager); mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true); if (useMQCSP != null) { - mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, Boolean.parseBoolean(useMQCSP)); + mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, + Boolean.parseBoolean(useMQCSP)); } if (transportType == WMQConstants.WMQ_CM_CLIENT) { if (ccdtUrl != null) { - URL ccdtUrlObject; + final URL ccdtUrlObject; try { ccdtUrlObject = new URL(ccdtUrl); - } - catch (MalformedURLException e) { + } catch (final MalformedURLException e) { log.error("MalformedURLException exception {}", e); throw new ConnectException("CCDT file url invalid", e); } mqConnFactory.setCCDTURL(ccdtUrlObject); - } - else { + } else { mqConnFactory.setConnectionNameList(connectionNameList); mqConnFactory.setChannel(channelName); } if (sslCipherSuite != null) { mqConnFactory.setSSLCipherSuite(sslCipherSuite); - if (sslPeerName != null) - { + if (sslPeerName != null) { mqConnFactory.setSSLPeerName(sslPeerName); } } if (sslKeystoreLocation != null || sslTruststoreLocation != null) { - final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword, sslTruststoreLocation, sslTruststorePassword); + final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword, + sslTruststoreLocation, sslTruststorePassword); mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory()); } } @@ -194,18 +194,17 @@ else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE } this.topic = topic; - } - catch (JMSException | JMSRuntimeException jmse) { + } catch (JMSException | JMSRuntimeException jmse) { log.error("JMS exception {}", jmse); throw new ConnectException(jmse); } try { - Class c = Class.forName(builderClass).asSubclass(RecordBuilder.class); + final Class c = Class.forName(builderClass).asSubclass(RecordBuilder.class); builder = c.newInstance(); builder.configure(props); - } - catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException | NullPointerException exc) { + } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException + | NullPointerException exc) { log.error("Could not instantiate message builder {}", builderClass); throw new ConnectException("Could not instantiate message builder", exc); } @@ -222,8 +221,7 @@ public void connect() { try { if (userName != null) { jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED); - } - else { + } else { jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED); } @@ -231,8 +229,7 @@ public void connect() { connected = true; log.info("Connection to MQ established"); - } - catch (JMSRuntimeException jmse) { + } catch (final JMSRuntimeException jmse) { log.info("Connection to MQ could not be established"); log.error("JMS exception {}", jmse); handleException(jmse); @@ -242,13 +239,14 @@ public void connect() { } /** - * Receives a message from MQ. Adds the message to the current transaction. Reconnects to MQ if required. + * Receives a message from MQ. Adds the message to the current transaction. + * Reconnects to MQ if required. * - * @param wait Whether to wait indefinitely for a message + * @param wait Whether to wait indefinitely for a message * * @return The SourceRecord representing the message */ - public SourceRecord receive(boolean wait) { + public SourceRecord receive(final boolean wait) { log.trace("[{}] Entry {}.receive", Thread.currentThread().getId(), this.getClass().getName()); if (!connectInternal()) { @@ -260,37 +258,36 @@ public SourceRecord receive(boolean wait) { SourceRecord sr = null; try { if (wait) { - while ((m == null) && !closeNow.get()) - { - log.debug("Waiting {} ms for message", RECEIVE_TIMEOUT); - m = jmsCons.receive(RECEIVE_TIMEOUT); + while (m == null && !closeNow.get()) { + log.debug("Waiting {} ms for message", receiveTimeout); + m = jmsCons.receive(receiveTimeout); } if (m == null) { log.debug("No message received"); } - } - else { + } else { m = jmsCons.receiveNoWait(); } if (m != null) { inflight = true; - // We've received a message in a transacted session so we must only permit the transaction - // to commit once we've passed it on to Kafka. Temporarily mark the transaction as "in-peril" - // so that any exception thrown will result in the transaction rolling back instead of committing. + // We've received a message in a transacted session so we must only permit the + // transaction + // to commit once we've passed it on to Kafka. Temporarily mark the transaction + // as "in-peril" + // so that any exception thrown will result in the transaction rolling back + // instead of committing. inperil = true; sr = builder.toSourceRecord(jmsCtxt, topic, messageBodyJms, m); inperil = false; } - } - catch (JMSException | JMSRuntimeException exc) { + } catch (JMSException | JMSRuntimeException exc) { log.error("JMS exception {}", exc); handleException(exc); - } - catch (ConnectException exc) { + } catch (final ConnectException exc) { log.error("Connect exception {}", exc); attemptRollback(); throw exc; @@ -300,26 +297,26 @@ public SourceRecord receive(boolean wait) { return sr; } - /** - * Returns messages got from the MQ queue. Called if the builder has failed to transform the - * messages and return them to Connect for producing to Kafka. + * Returns messages got from the MQ queue. Called if the builder has failed to + * transform the + * messages and return them to Connect for producing to Kafka. */ private void attemptRollback() { log.trace("[{}] Entry {}.attemptRollback", Thread.currentThread().getId(), this.getClass().getName()); try { jmsCtxt.rollback(); - } - catch (JMSRuntimeException jmsExc) { + } catch (final JMSRuntimeException jmsExc) { log.error("rollback failed {}", jmsExc); } log.trace("[{}] Exit {}.attemptRollback", Thread.currentThread().getId(), this.getClass().getName()); } - /** - * Commits the current transaction. If the current transaction contains a message that could not - * be processed, the transaction is "in peril" and is rolled back instead to avoid data loss. + * Commits the current transaction. If the current transaction contains a + * message that could not + * be processed, the transaction is "in peril" and is rolled back instead to + * avoid data loss. */ public void commit() { log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName()); @@ -336,13 +333,11 @@ public void commit() { inperil = false; log.debug("Rolling back in-flight transaction"); jmsCtxt.rollback(); - } - else { + } else { jmsCtxt.commit(); } } - } - catch (JMSRuntimeException jmse) { + } catch (final JMSRuntimeException jmse) { log.error("JMS exception {}", jmse); handleException(jmse); } @@ -381,38 +376,35 @@ private boolean connectInternal() { try { if (userName != null) { jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED); - } - else { + } else { jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED); } jmsCons = jmsCtxt.createConsumer(queue); - reconnectDelayMillis = RECONNECT_DELAY_MILLIS_MIN; + reconnectDelayMillis = reconnectDelayMillisMin; connected = true; log.info("Connection to MQ established"); - } - catch (JMSRuntimeException jmse) { + } catch (final JMSRuntimeException jmse) { // Delay slightly so that repeated reconnect loops don't run too fast try { Thread.sleep(reconnectDelayMillis); - } - catch (InterruptedException ie) { - ; + } catch (final InterruptedException ie) { } - if (reconnectDelayMillis < RECONNECT_DELAY_MILLIS_MAX) - { + if (reconnectDelayMillis < reconnectDelayMillisMax) { reconnectDelayMillis = reconnectDelayMillis * 2; } log.error("JMS exception {}", jmse); handleException(jmse); - log.trace("[{}] Exit {}.connectInternal, retval=false", Thread.currentThread().getId(), this.getClass().getName()); + log.trace("[{}] Exit {}.connectInternal, retval=false", Thread.currentThread().getId(), + this.getClass().getName()); return false; } - log.trace("[{}] Exit {}.connectInternal, retval=true", Thread.currentThread().getId(), this.getClass().getName()); + log.trace("[{}] Exit {}.connectInternal, retval=true", Thread.currentThread().getId(), + this.getClass().getName()); return true; } @@ -430,12 +422,8 @@ private void closeInternal() { if (jmsCtxt != null) { jmsCtxt.close(); } - } - catch (JMSRuntimeException jmse) { - ; - } - finally - { + } catch (final JMSRuntimeException jmse) { + } finally { jmsCtxt = null; log.debug("Connection to MQ closed"); } @@ -444,10 +432,11 @@ private void closeInternal() { } /** - * Handles exceptions from MQ. Some JMS exceptions are treated as retriable meaning that the + * Handles exceptions from MQ. Some JMS exceptions are treated as retriable + * meaning that the * connector can keep running and just trying again is likely to fix things. */ - private ConnectException handleException(Throwable exc) { + private ConnectException handleException(final Throwable exc) { boolean isRetriable = false; boolean mustClose = true; int reason = -1; @@ -456,23 +445,22 @@ private ConnectException handleException(Throwable exc) { Throwable t = exc.getCause(); while (t != null) { if (t instanceof MQException) { - MQException mqe = (MQException)t; + final MQException mqe = (MQException) t; log.error("MQ error: CompCode {}, Reason {} {}", mqe.getCompCode(), mqe.getReason(), - MQConstants.lookupReasonCode(mqe.getReason())); + MQConstants.lookupReasonCode(mqe.getReason())); reason = mqe.getReason(); break; - } - else if (t instanceof JMSException) { - JMSException jmse = (JMSException)t; + } else if (t instanceof JMSException) { + final JMSException jmse = (JMSException) t; log.error("JMS exception: error code {}", jmse.getErrorCode()); } t = t.getCause(); } - switch (reason) - { - // These reason codes indicate that the connection needs to be closed, but just retrying later + switch (reason) { + // These reason codes indicate that the connection needs to be closed, but just + // retrying later // will probably recover case MQConstants.MQRC_BACKED_OUT: case MQConstants.MQRC_CHANNEL_NOT_AVAILABLE: @@ -486,8 +474,10 @@ else if (t instanceof JMSException) { isRetriable = true; break; - // These reason codes indicate that the connection is still OK, but just retrying later - // will probably recover - possibly with administrative action on the queue manager + // These reason codes indicate that the connection is still OK, but just + // retrying later + // will probably recover - possibly with administrative action on the queue + // manager case MQConstants.MQRC_GET_INHIBITED: isRetriable = true; mustClose = false; @@ -497,10 +487,8 @@ else if (t instanceof JMSException) { if (mustClose) { // Delay so that repeated reconnect loops don't run too fast try { - Thread.sleep(RECONNECT_DELAY_MILLIS_MAX); - } - catch (InterruptedException ie) { - ; + Thread.sleep(reconnectDelayMillisMax); + } catch (final InterruptedException ie) { } closeInternal(); } @@ -512,7 +500,8 @@ else if (t instanceof JMSException) { return new ConnectException(exc); } - private SSLContext buildSslContext(String sslKeystoreLocation, String sslKeystorePassword, String sslTruststoreLocation, String sslTruststorePassword) { + private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword, + final String sslTruststoreLocation, final String sslTruststorePassword) { log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName()); try { @@ -520,13 +509,14 @@ private SSLContext buildSslContext(String sslKeystoreLocation, String sslKeystor TrustManager[] trustManagers = null; if (sslKeystoreLocation != null) { - KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray()); keyManagers = kmf.getKeyManagers(); } if (sslTruststoreLocation != null) { - TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + final TrustManagerFactory tmf = TrustManagerFactory + .getInstance(TrustManagerFactory.getDefaultAlgorithm()); tmf.init(loadKeyStore(sslTruststoreLocation, sslTruststorePassword)); trustManagers = tmf.getTrustManagers(); } @@ -534,23 +524,25 @@ private SSLContext buildSslContext(String sslKeystoreLocation, String sslKeystor final SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(keyManagers, trustManagers, new SecureRandom()); - log.trace("[{}] Exit {}.buildSslContext, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sslContext); + log.trace("[{}] Exit {}.buildSslContext, retval={}", Thread.currentThread().getId(), + this.getClass().getName(), sslContext); return sslContext; - } catch (GeneralSecurityException e) { + } catch (final GeneralSecurityException e) { throw new ConnectException("Error creating SSLContext", e); } } - private KeyStore loadKeyStore(String location, String password) throws GeneralSecurityException { + private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException { log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName()); try (final InputStream ksStr = new FileInputStream(location)) { final KeyStore ks = KeyStore.getInstance("JKS"); ks.load(ksStr, password.toCharArray()); - log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks); + log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), + this.getClass().getName(), ks); return ks; - } catch (IOException e) { + } catch (final IOException e) { throw new ConnectException("Error reading keystore " + location, e); } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 191e7c4..a6509d9 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; @@ -131,11 +132,11 @@ public class MQSourceConnector extends SourceConnector { public static final String CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP = "Whether to use MQ connection security parameters (MQCSP)."; public static final String CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP = "User authentication using MQCSP"; - public static final String CONFIG_NAME_TOPIC = "topic"; + public static final String CONFIG_NAME_TOPIC = "topic"; public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic."; public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic"; - public static String VERSION = "1.3.2"; + public static String version = "1.3.2"; private Map configProps; @@ -145,7 +146,7 @@ public class MQSourceConnector extends SourceConnector { * @return the version, formatted as a String */ @Override public String version() { - return VERSION; + return version; } /** @@ -154,13 +155,13 @@ public class MQSourceConnector extends SourceConnector { * * @param props configuration settings */ - @Override public void start(Map props) { + @Override public void start(final Map props) { log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); configProps = props; - for (final Entry entry: props.entrySet()) { - String value; - if (entry.getKey().toLowerCase().contains("password")) { + for (final Entry entry : props.entrySet()) { + final String value; + if (entry.getKey().toLowerCase(Locale.ENGLISH).contains("password")) { value = "[hidden]"; } else { value = entry.getValue(); @@ -174,7 +175,8 @@ public class MQSourceConnector extends SourceConnector { /** * Returns the Task implementation for this Connector. */ - @Override public Class taskClass() { + @Override + public Class taskClass() { return MQSourceTask.class; } @@ -185,33 +187,38 @@ public class MQSourceConnector extends SourceConnector { * @param maxTasks maximum number of configurations to generate * @return configurations for Tasks */ - @Override public List> taskConfigs(int maxTasks) { - log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().getId(), this.getClass().getName(), maxTasks); + @Override + public List> taskConfigs(final int maxTasks) { + log.trace("[{}] Entry {}.taskConfigs, maxTasks={}", Thread.currentThread().getId(), this.getClass().getName(), + maxTasks); - List> taskConfigs = new ArrayList<>(); - for (int i = 0; i < maxTasks; i++) - { + final List> taskConfigs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { taskConfigs.add(configProps); } - log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().getId(), this.getClass().getName(), taskConfigs); + log.trace("[{}] Exit {}.taskConfigs, retval={}", Thread.currentThread().getId(), this.getClass().getName(), + taskConfigs); return taskConfigs; } /** * Stop this connector. */ - @Override public void stop() { + @Override + public void stop() { log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName()); log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName()); } /** * Define the configuration for the connector. + * * @return The ConfigDef for this connector. */ - @Override public ConfigDef config() { - ConfigDef config = new ConfigDef(); + @Override + public ConfigDef config() { + final ConfigDef config = new ConfigDef(); config.define(CONFIG_NAME_MQ_QUEUE_MANAGER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, CONFIG_GROUP_MQ, 1, Width.MEDIUM, diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index c8177ef..e0ec7ab 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; @@ -50,19 +51,19 @@ public MQSourceTask() { * @return the version, formatted as a String */ @Override public String version() { - return MQSourceConnector.VERSION; + return MQSourceConnector.version; } /** * Start the Task. This should handle any configuration parsing and one-time setup of the task. * @param props initial configuration */ - @Override public void start(Map props) { + @Override public void start(final Map props) { log.trace("[{}] Entry {}.start, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); - for (final Entry entry: props.entrySet()) { - String value; - if (entry.getKey().toLowerCase().contains("password")) { + for (final Entry entry : props.entrySet()) { + final String value; + if (entry.getKey().toLowerCase(Locale.ENGLISH).contains("password")) { value = "[hidden]"; } else { value = entry.getValue(); @@ -70,7 +71,7 @@ public MQSourceTask() { log.debug("Task props entry {} : {}", entry.getKey(), value); } - String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE); + final String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE); if (strBatchSize != null) { batchSize = Integer.parseInt(strBatchSize); } @@ -86,18 +87,21 @@ public MQSourceTask() { } /** - * Poll this SourceTask for new records. This method should block if no data is currently + * Poll this SourceTask for new records. This method should block if no data is + * currently * available. * * @return a list of source records */ - @Override public List poll() throws InterruptedException { + @Override + public List poll() throws InterruptedException { log.trace("[{}] Entry {}.poll", Thread.currentThread().getId(), this.getClass().getName()); final List msgs = new ArrayList<>(); int messageCount = 0; - // Resolve any in-flight transaction, committing unless there has been an error between + // Resolve any in-flight transaction, committing unless there has been an error + // between // receiving the message from MQ and converting it if (batchCompleteSignal != null) { log.debug("Awaiting batch completion signal"); @@ -107,9 +111,10 @@ public MQSourceTask() { reader.commit(); } - // Increment the counter for the number of times poll is called so we can ensure we don't get stuck waiting for + // Increment the counter for the number of times poll is called so we can ensure + // we don't get stuck waiting for // commitRecord callbacks to trigger the batch complete signal - int currentPollCycle = pollCycle.incrementAndGet(); + final int currentPollCycle = pollCycle.incrementAndGet(); log.debug("Starting poll cycle {}", currentPollCycle); try { @@ -123,64 +128,70 @@ public MQSourceTask() { msgs.add(src); messageCount++; } - } while ((src != null) && (messageCount < batchSize) && !stopNow.get()); - } - else { + } while (src != null && messageCount < batchSize && !stopNow.get()); + } else { log.info("Stopping polling for records"); } - } - finally { + } finally { } - synchronized(this) { + synchronized (this) { if (messageCount > 0) { if (!stopNow.get()) { batchCompleteSignal = new CountDownLatch(messageCount); - } - else { - // Discard this batch - we've rolled back when the connection to MQ was closed in stop() + } else { + // Discard this batch - we've rolled back when the connection to MQ was closed + // in stop() log.debug("Discarding a batch of {} records as task is stopping", messageCount); msgs.clear(); batchCompleteSignal = null; } - } - else { + } else { batchCompleteSignal = null; } } log.debug("Poll returning {} records", messageCount); - log.trace("[{}] Exit {}.poll, retval={}", Thread.currentThread().getId(), this.getClass().getName(), messageCount); + log.trace("[{}] Exit {}.poll, retval={}", Thread.currentThread().getId(), this.getClass().getName(), + messageCount); return msgs; } - /** + /** *

- * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This + * Commit the offsets, up to the offsets that have been returned by + * {@link #poll()}. This * method should block until the commit is complete. *

*

- * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets - * automatically. This hook is provided for systems that also need to store offsets internally + * SourceTasks are not required to implement this functionality; Kafka Connect + * will record offsets + * automatically. This hook is provided for systems that also need to store + * offsets internally * in their own system. *

*/ public void commit() throws InterruptedException { log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName()); - // This callback is simply used to ensure that the mechanism to use commitRecord callbacks - // to check that all messages in a batch are complete is not getting stuck. If this callback - // is being called, it means that Kafka Connect believes that all outstanding messages have - // been completed. That should mean that commitRecord has been called for all of them too. - // However, if too few calls to commitRecord are received, the connector could wait indefinitely. - // If this commit callback is called twice without the poll cycle increasing, trigger the + // This callback is simply used to ensure that the mechanism to use commitRecord + // callbacks + // to check that all messages in a batch are complete is not getting stuck. If + // this callback + // is being called, it means that Kafka Connect believes that all outstanding + // messages have + // been completed. That should mean that commitRecord has been called for all of + // them too. + // However, if too few calls to commitRecord are received, the connector could + // wait indefinitely. + // If this commit callback is called twice without the poll cycle increasing, + // trigger the // batch complete signal directly. - int currentPollCycle = pollCycle.get(); + final int currentPollCycle = pollCycle.get(); log.debug("Commit starting in poll cycle {}", currentPollCycle); - if (lastCommitPollCycle == currentPollCycle) - { + if (lastCommitPollCycle == currentPollCycle) { synchronized (this) { if (batchCompleteSignal != null) { log.debug("Bumping batch complete signal by {}", batchCompleteSignal.getCount()); @@ -192,8 +203,7 @@ public void commit() throws InterruptedException { } } } - } - else { + } else { lastCommitPollCycle = currentPollCycle; } @@ -201,21 +211,28 @@ public void commit() throws InterruptedException { } /** - * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop - * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has - * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and + * Signal this SourceTask to stop. In SourceTasks, this method only needs to + * signal to the task that it should stop + * trying to poll for new data and interrupt any outstanding poll() requests. It + * is not required that the task has + * fully stopped. Note that this method necessarily may be invoked from a + * different thread than {@link #poll()} and * {@link #commit()}. * - * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method - * could set a flag that will force {@link #poll()} to exit immediately and invoke - * {@link java.nio.channels.Selector#wakeup() wakeup()} to interrupt any ongoing requests. + * For example, if a task uses a {@link java.nio.channels.Selector} to receive + * data over the network, this method + * could set a flag that will force {@link #poll()} to exit immediately and + * invoke + * {@link java.nio.channels.Selector#wakeup() wakeup()} to interrupt any ongoing + * requests. */ - @Override public void stop() { + @Override + public void stop() { log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName()); stopNow.set(true); - synchronized(this) { + synchronized (this) { // Close the connection to MQ to clean up if (reader != null) { reader.close(); @@ -227,18 +244,23 @@ public void commit() throws InterruptedException { /** *

- * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation. + * Commit an individual {@link SourceRecord} when the callback from the producer + * client is received, or if a record is filtered by a transformation. *

*

- * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets - * automatically. This hook is provided for systems that also need to store offsets internally + * SourceTasks are not required to implement this functionality; Kafka Connect + * will record offsets + * automatically. This hook is provided for systems that also need to store + * offsets internally * in their own system. *

* - * @param record {@link SourceRecord} that was successfully sent via the producer. + * @param record {@link SourceRecord} that was successfully sent via the + * producer. * @throws InterruptedException */ - @Override public void commitRecord(SourceRecord record) throws InterruptedException { + @Override + public void commitRecord(final SourceRecord record) throws InterruptedException { log.trace("[{}] Entry {}.commitRecord, record={}", Thread.currentThread().getId(), this.getClass().getName(), record); synchronized (this) { diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java index 399ddda..8a3c40e 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java @@ -37,12 +37,12 @@ public abstract class BaseRecordBuilder implements RecordBuilder { private static final Logger log = LoggerFactory.getLogger(BaseRecordBuilder.class); - public enum KeyHeader {NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION}; + public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES, DESTINATION }; protected KeyHeader keyheader = KeyHeader.NONE; - private boolean copyJmsPropertiesFlag = Boolean.FALSE; - private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter; + private boolean copyJmsPropertiesFlag = Boolean.FALSE; + private JmsToKafkaHeaderConverter jmsToKafkaHeaderConverter; /** * Configure this class. @@ -51,55 +51,53 @@ public enum KeyHeader {NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTES * * @throws ConnectException Operation failed and connector should stop. */ - @Override public void configure(Map props) { - log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); + @Override public void configure(final Map props) { + log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), + props); - String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER); + final String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER); if (kh != null) { if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID)) { keyheader = KeyHeader.MESSAGE_ID; log.debug("Setting Kafka record key from JMSMessageID header field"); - } - else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID)) { + } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID)) { keyheader = KeyHeader.CORRELATION_ID; log.debug("Setting Kafka record key from JMSCorrelationID header field"); - } - else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES)) { + } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES)) { keyheader = KeyHeader.CORRELATION_ID_AS_BYTES; log.debug("Setting Kafka record key from JMSCorrelationIDAsBytes header field"); - } - else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION)) { + } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION)) { keyheader = KeyHeader.DESTINATION; log.debug("Setting Kafka record key from JMSDestination header field"); - } - else { + } else { log.error("Unsupported MQ record builder key header value {}", kh); throw new ConnectException("Unsupported MQ record builder key header value"); } } - String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); + final String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false")); jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter(); - log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName()); - } + log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName()); + } /** * Gets the key to use for the Kafka Connect SourceRecord. * - * @param context the JMS context to use for building messages - * @param topic the Kafka topic - * @param message the message + * @param context the JMS context to use for building messages + * @param topic the Kafka topic + * @param message the message * * @return the Kafka Connect SourceRecord's key * - * @throws JMSException Message could not be converted + * @throws JMSException Message could not be converted */ - public SchemaAndValue getKey(JMSContext context, String topic, Message message) throws JMSException { + public SchemaAndValue getKey(final JMSContext context, final String topic, final Message message) + throws JMSException { Schema keySchema = null; Object key = null; - String keystr; + final String keystr; switch (keyheader) { case MESSAGE_ID: @@ -107,8 +105,7 @@ public SchemaAndValue getKey(JMSContext context, String topic, Message message) keystr = message.getJMSMessageID(); if (keystr.startsWith("ID:", 0)) { key = keystr.substring(3); - } - else { + } else { key = keystr; } break; @@ -117,8 +114,7 @@ public SchemaAndValue getKey(JMSContext context, String topic, Message message) keystr = message.getJMSCorrelationID(); if (keystr.startsWith("ID:", 0)) { key = keystr.substring(3); - } - else { + } else { key = keystr; } break; @@ -140,36 +136,41 @@ public SchemaAndValue getKey(JMSContext context, String topic, Message message) /** * Gets the value to use for the Kafka Connect SourceRecord. * - * @param context the JMS context to use for building messages - * @param topic the Kafka topic - * @param messageBodyJms whether to interpret MQ messages as JMS messages - * @param message the message + * @param context the JMS context to use for building messages + * @param topic the Kafka topic + * @param messageBodyJms whether to interpret MQ messages as JMS messages + * @param message the message * * @return the Kafka Connect SourceRecord's value * - * @throws JMSException Message could not be converted + * @throws JMSException Message could not be converted */ - public abstract SchemaAndValue getValue(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException; + public abstract SchemaAndValue getValue(JMSContext context, String topic, boolean messageBodyJms, Message message) + throws JMSException; - /** + /** * Convert a message into a Kafka Connect SourceRecord. * - * @param context the JMS context to use for building messages - * @param topic the Kafka topic - * @param messageBodyJms whether to interpret MQ messages as JMS messages - * @param message the message + * @param context the JMS context to use for building messages + * @param topic the Kafka topic + * @param messageBodyJms whether to interpret MQ messages as JMS messages + * @param message the message * * @return the Kafka Connect SourceRecord * - * @throws JMSException Message could not be converted + * @throws JMSException Message could not be converted */ - @Override public SourceRecord toSourceRecord(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException { - SchemaAndValue key = this.getKey(context, topic, message); - SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message); - - if (copyJmsPropertiesFlag && messageBodyJms) - return new SourceRecord(null, null, topic, (Integer) null, key.schema(), key.value(), value.schema(), value.value(), message.getJMSTimestamp(), jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message)); + @Override + public SourceRecord toSourceRecord(final JMSContext context, final String topic, final boolean messageBodyJms, + final Message message) throws JMSException { + final SchemaAndValue key = this.getKey(context, topic, message); + final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message); + + if (copyJmsPropertiesFlag && messageBodyJms) + return new SourceRecord(null, null, topic, (Integer) null, key.schema(), key.value(), value.schema(), + value.value(), message.getJMSTimestamp(), + jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message)); else - return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value()); - } + return new SourceRecord(null, null, topic, key.schema(), key.value(), value.schema(), value.value()); + } } \ No newline at end of file diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java index 8d67a2b..acad1c5 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java @@ -55,28 +55,28 @@ public DefaultRecordBuilder() { * * @throws JMSException Message could not be converted */ - @Override public SchemaAndValue getValue(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException { + @Override public SchemaAndValue getValue(final JMSContext context, final String topic, final boolean messageBodyJms, + final Message message) throws JMSException { Schema valueSchema = null; Object value = null; - // Interpreting the body as a JMS message type, we can accept BytesMessage and TextMessage only. + // Interpreting the body as a JMS message type, we can accept BytesMessage and + // TextMessage only. // We do not know the schema so do not specify one. if (messageBodyJms) { if (message instanceof BytesMessage) { log.debug("Bytes message with no schema"); value = message.getBody(byte[].class); - } - else if (message instanceof TextMessage) { + } else if (message instanceof TextMessage) { log.debug("Text message with no schema"); value = message.getBody(String.class); - } - else { + } else { log.error("Unsupported JMS message type {}", message.getClass()); throw new ConnectException("Unsupported JMS message type"); } - } - else { - // Not interpreting the body as a JMS message type, all messages come through as BytesMessage. + } else { + // Not interpreting the body as a JMS message type, all messages come through as + // BytesMessage. // In this case, we specify the value schema as OPTIONAL_BYTES. log.debug("Bytes message with OPTIONAL_BYTES schema"); valueSchema = Schema.OPTIONAL_BYTES_SCHEMA; diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index 94f3927..ba3e389 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -15,7 +15,7 @@ */ package com.ibm.eventstreams.connect.mqsource.builders; -import static java.nio.charset.StandardCharsets.*; +import static java.nio.charset.StandardCharsets.UTF_8; import java.util.HashMap; import javax.jms.BytesMessage; @@ -45,7 +45,7 @@ public JsonRecordBuilder() { converter = new JsonConverter(); // We just want the payload, not the schema in the output message - HashMap m = new HashMap<>(); + final HashMap m = new HashMap<>(); m.put("schemas.enable", "false"); // Convert the value, not the key (isKey == false) @@ -55,26 +55,26 @@ public JsonRecordBuilder() { /** * Gets the value schema to use for the Kafka Connect SourceRecord. * - * @param context the JMS context to use for building messages - * @param topic the Kafka topic - * @param messageBodyJms whether to interpret MQ messages as JMS messages - * @param message the message + * @param context the JMS context to use for building messages + * @param topic the Kafka topic + * @param messageBodyJms whether to interpret MQ messages as JMS messages + * @param message the message * * @return the Kafka Connect SourceRecord's value * - * @throws JMSException Message could not be converted + * @throws JMSException Message could not be converted */ - @Override public SchemaAndValue getValue(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException { - byte[] payload; + @Override + public SchemaAndValue getValue(final JMSContext context, final String topic, final boolean messageBodyJms, + final Message message) throws JMSException { + final byte[] payload; if (message instanceof BytesMessage) { payload = message.getBody(byte[].class); - } - else if (message instanceof TextMessage) { - String s = message.getBody(String.class); + } else if (message instanceof TextMessage) { + final String s = message.getBody(String.class); payload = s.getBytes(UTF_8); - } - else { + } else { log.error("Unsupported JMS message type {}", message.getClass()); throw new ConnectException("Unsupported JMS message type"); } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java index f7f9a19..52357ee 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/processor/JmsToKafkaHeaderConverter.java @@ -31,33 +31,33 @@ public class JmsToKafkaHeaderConverter { private static final Logger log = LoggerFactory.getLogger(JmsToKafkaHeaderConverter.class); - /** - * Copies the JMS properties to Kafka headers. - * - * @param message JMS message. - * - * @return Kafka connect headers. - */ - public ConnectHeaders convertJmsPropertiesToKafkaHeaders(Message message) { - ConnectHeaders connectHeaders = new ConnectHeaders(); + /** + * Copies the JMS properties to Kafka headers. + * + * @param message JMS message. + * + * @return Kafka connect headers. + */ + public ConnectHeaders convertJmsPropertiesToKafkaHeaders(final Message message) { + final ConnectHeaders connectHeaders = new ConnectHeaders(); try { @SuppressWarnings("unchecked") - Enumeration propertyNames = (Enumeration)message.getPropertyNames(); - List jmsPropertyKeys = Collections.list(propertyNames); + final Enumeration propertyNames = (Enumeration) message.getPropertyNames(); + final List jmsPropertyKeys = Collections.list(propertyNames); jmsPropertyKeys.forEach(key -> { try { connectHeaders.addString(key.toString(), message.getObjectProperty(key.toString()).toString()); - } - catch (JMSException e) { - // Not failing the message processing if JMS properties cannot be read for some reason. + } catch (final JMSException e) { + // Not failing the message processing if JMS properties cannot be read for some + // reason. log.warn("JMS exception {}", e); } }); - } - catch (JMSException e) { - // Not failing the message processing if JMS properties cannot be read for some reason. + } catch (final JMSException e) { + // Not failing the message processing if JMS properties cannot be read for some + // reason. log.warn("JMS exception {}", e); } diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java index ed41bd1..0598a9b 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/JmsToKafkaHeaderConverterTest.java @@ -44,17 +44,18 @@ public class JmsToKafkaHeaderConverterTest { @Test public void convertJmsPropertiesToKafkaHeaders() throws JMSException { - List keys = Arrays.asList("facilityCountryCode", "facilityNum"); + final List keys = Arrays.asList("facilityCountryCode", "facilityNum"); - Enumeration keyEnumeration = Collections.enumeration(keys); + final Enumeration keyEnumeration = Collections.enumeration(keys); - //Arrange + // Arrange when(message.getPropertyNames()).thenReturn(keyEnumeration); when(message.getObjectProperty("facilityCountryCode")).thenReturn("US"); when(message.getObjectProperty("facilityNum")).thenReturn("12345"); - //Act - ConnectHeaders actualConnectHeaders = jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message); + // Act + final ConnectHeaders actualConnectHeaders = jmsToKafkaHeaderConverter + .convertJmsPropertiesToKafkaHeaders(message); //Verify diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java index 3dd6b34..9ea8431 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java @@ -25,14 +25,14 @@ public class MQSourceConnectorTest { @Test public void testVersion() { - String version = new MQSourceConnector().version(); - String expectedVersion = System.getProperty("connectorVersion"); + final String version = new MQSourceConnector().version(); + final String expectedVersion = System.getProperty("connectorVersion"); assertEquals("Expected connector version to match version of built jar file.", expectedVersion, version); } @Test public void testConnectorType() { - Connector connector = new MQSourceConnector(); + final Connector connector = new MQSourceConnector(); assertTrue(SourceConnector.class.isAssignableFrom(connector.getClass())); } } \ No newline at end of file