diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
index e835b69..df09ebe 100644
--- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
+++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml
@@ -57,8 +57,8 @@ body:
label: Version
description: What version of our software are you running?
options:
- - 2.1.0 (Default)
- - 1.3.5
+ - 2.1.1 (Default)
+ - 1.3.5
- older (<1.3.5)
validations:
required: true
diff --git a/README.md b/README.md
index 542e7e0..9745ecd 100644
--- a/README.md
+++ b/README.md
@@ -304,6 +304,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.max.poll.blocked.time.ms | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
+| mq.client.reconnect.options | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
### Using a CCDT file
diff --git a/pom.xml b/pom.xml
index a293abf..33f97ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
com.ibm.eventstreams.connect
kafka-connect-mq-source
jar
- 2.1.0
+ 2.1.1
kafka-connect-mq-source
IBM Corporation
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java
index d0a222d..6676bfe 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/JMSWorkerIT.java
@@ -106,5 +106,4 @@ public void testQueueHoldsMoreThanOneMessage_twoMessageOnQueue() throws Exceptio
public void testQueueHoldsMoreThanOneMessage_queueNotFound() {
assertThrows(Exception.class, ()->jmsWorker.queueHoldsMoreThanOneMessage("QUEUE_DOES_NOT_EXIST"));
}
-
}
\ No newline at end of file
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
index dba1908..8e505c0 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
@@ -577,4 +577,81 @@ public void testRemoveDeliveredMessagesFromSourceQueueDoesNotThrowException() th
assertThatNoException()
.isThrownBy(() -> connectTask.removeDeliveredMessagesFromSourceQueue(Arrays.asList(msgIds)));
}
+
+
+ @Test
+ public void testConfigureClientReconnectOptions() throws Exception {
+ // setup test condition: put messages on source queue, poll once to read them
+ connectTask = getSourceTaskWithEmptyKafkaOffset();
+
+ final Map connectorConfigProps = createExactlyOnceConnectorProperties();
+ connectorConfigProps.put("mq.message.body.jms", "true");
+ connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
+ connectorConfigProps.put("mq.client.reconnect.options", "QMGR");
+
+ JMSWorker shared = new JMSWorker();
+ shared.configure(getPropertiesConfig(connectorConfigProps));
+ JMSWorker dedicated = new JMSWorker();
+ dedicated.configure(getPropertiesConfig(connectorConfigProps));
+ SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
+
+ connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);
+
+ final List messages = createAListOfMessages(getJmsContext(), 2, "message ");
+ putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
+
+ connectTask.poll();
+
+ List stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
+ assertThat(stateMsgs1.size()).isEqualTo(1);
+ shared.attemptRollback();
+ assertThat(stateMsgs1.size()).isEqualTo(1); //state message is still there even though source message were rolled back
+
+ }
+
+ @Test
+ public void verifyEmptyMessage() throws Exception {
+ connectTask = new MQSourceTask();
+
+ final Map connectorConfigProps = createDefaultConnectorProperties();
+ connectorConfigProps.put("mq.message.body.jms", "true");
+ connectorConfigProps.put("mq.record.builder",
+ "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
+
+ connectTask.start(connectorConfigProps);
+
+ Message emptyMessage = getJmsContext().createMessage();
+ putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));
+
+ final List kafkaMessages = connectTask.poll();
+ assertEquals(1, kafkaMessages.size());
+
+ final SourceRecord kafkaMessage = kafkaMessages.get(0);
+ assertNull(kafkaMessage.value());
+
+ connectTask.commitRecord(kafkaMessage);
+ }
+
+ @Test
+ public void verifyEmptyTextMessage() throws Exception {
+ connectTask = new MQSourceTask();
+
+ final Map connectorConfigProps = createDefaultConnectorProperties();
+ connectorConfigProps.put("mq.message.body.jms", "true");
+ connectorConfigProps.put("mq.record.builder",
+ "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
+
+ connectTask.start(connectorConfigProps);
+
+ TextMessage emptyMessage = getJmsContext().createTextMessage();
+ putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(emptyMessage));
+
+ final List kafkaMessages = connectTask.poll();
+ assertEquals(1, kafkaMessages.size());
+
+ final SourceRecord kafkaMessage = kafkaMessages.get(0);
+ assertNull(kafkaMessage.value());
+
+ connectTask.commitRecord(kafkaMessage);
+ }
}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
index b4f00a5..8e2d4ac 100755
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSWorker.java
@@ -43,6 +43,7 @@
import java.net.URL;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,6 +127,7 @@ public void configure(final AbstractConfig config) {
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
}
}
+ configureClientReconnectOptions(config, mqConnFactory);
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
@@ -144,6 +146,31 @@ public void configure(final AbstractConfig config) {
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());
}
+ // Configure client reconnect option based on the config
+ private static void configureClientReconnectOptions(final AbstractConfig config,
+ final MQConnectionFactory mqConnFactory) throws JMSException {
+ String clientReconnectOptions = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);
+
+ clientReconnectOptions = clientReconnectOptions.toUpperCase(Locale.ENGLISH);
+
+ switch (clientReconnectOptions) {
+ case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY:
+ mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
+ break;
+
+ case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR:
+ mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_Q_MGR);
+ break;
+
+ case MQSourceConnector.CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED:
+ mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_DISABLED);
+ break;
+
+ default:
+ mqConnFactory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT_AS_DEF);
+ break;
+ }
+ }
/**
* Used for tests.
@@ -152,7 +179,7 @@ protected void setRecordBuilder(final RecordBuilder recordBuilder) {
this.recordBuilder = recordBuilder;
}
- protected JMSContext getContext() { // used to enable testing
+ protected JMSContext getContext() { // used to enable testing
if (jmsCtxt == null) maybeReconnect();
return jmsCtxt;
}
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
index e399c1d..453056f 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
@@ -23,12 +23,15 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
@@ -152,7 +155,27 @@ public class MQSourceConnector extends SourceConnector {
+ "previous batch of messages to be delivered to Kafka before starting a new poll.";
public static final String CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS = "Max poll blocked time ms";
- public static String version = "2.1.0";
+ public static final String CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS = "mq.client.reconnect.options";
+ public static final String CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS = "Options governing MQ reconnection.";
+ public static final String CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS = "MQ client reconnect options";
+ public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR = "QMGR";
+ public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY = "ANY";
+ public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED = "DISABLED";
+ public static final String CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF = "ASDEF";
+
+ // Define valid reconnect options
+ public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = {
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF.toLowerCase(Locale.ENGLISH),
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY,
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ANY.toLowerCase(Locale.ENGLISH),
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR,
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.toLowerCase(Locale.ENGLISH),
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED,
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH)
+ };
+
+ public static String version = "2.1.1";
private Map configProps;
@@ -237,6 +260,37 @@ public ConfigDef config() {
return CONFIGDEF;
}
+ @Override
+ public Config validate(final Map connectorConfigs) {
+ final Config config = super.validate(connectorConfigs);
+
+ MQSourceConnector.validateMQClientReconnectOptions(config);
+ return config;
+ }
+
+ private static void validateMQClientReconnectOptions(final Config config) {
+ // Collect all configuration values
+ final Map configValues = config.configValues().stream()
+ .collect(Collectors.toMap(ConfigValue::name, v -> v));
+
+ final ConfigValue clientReconnectOptionsConfigValue = configValues
+ .get(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS);
+ final ConfigValue exactlyOnceStateQueueConfigValue = configValues
+ .get(MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE);
+
+ // Check if the exactly once state queue is configured
+ if (exactlyOnceStateQueueConfigValue.value() == null) {
+ return;
+ }
+
+ // Validate the client reconnect options
+ final Boolean isClientReconnectOptionQMGR = CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_QMGR.equals(clientReconnectOptionsConfigValue.value());
+ if (!isClientReconnectOptionQMGR) {
+ clientReconnectOptionsConfigValue.addErrorMessage(
+ "When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided. For example: `mq.client.reconnect.options: QMGR`");
+ }
+ }
+
/** Null validator - indicates that any value is acceptable for this config option. */
private static final ConfigDef.Validator ANY = null;
@@ -468,6 +522,16 @@ null, new ReadableFile(),
null, 24, Width.MEDIUM,
CONFIG_DISPLAY_MAX_POLL_BLOCKED_TIME_MS);
+ CONFIGDEF.define(CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS,
+ Type.STRING,
+ CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF,
+ ConfigDef.ValidString.in(CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_CLIENT_RECONNECT_OPTIONS,
+ CONFIG_GROUP_MQ, 25,
+ Width.SHORT,
+ CONFIG_DISPLAY_MQ_CLIENT_RECONNECT_OPTIONS);
+
CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
// user must specify the topic name
diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
index 5ded06f..44d8146 100644
--- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
+++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java
@@ -15,6 +15,8 @@
*/
package com.ibm.eventstreams.connect.mqsource;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
@@ -28,7 +30,11 @@
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class MQSourceConnectorTest {
@Test
@@ -82,5 +88,62 @@ public void testConnectorConfigSupportsExactlyOnce() {
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", "")));
assertFalse(MQSourceConnector.configSupportsExactlyOnce(Collections.singletonMap("mq.exactly.once.state.queue", null)));
}
-
-}
\ No newline at end of file
+
+ @Test
+ public void testValidateMQClientReconnectOptions() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
+ configProps.put("tasks.max", "1");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
+ }
+
+ @Test
+ public void testValidateMQClientReconnectOptionsWithoutExactlyOnce() {
+ final Map configProps = new HashMap();
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertFalse(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
+ }
+
+ @Test
+ public void testValidateMQClientReconnectOptionsWithQMGROption() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
+ configProps.put("mq.client.reconnect.options", "QMGR");
+ configProps.put("tasks.max", "1");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertFalse(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
+ }
+
+ @Test
+ public void testValidateMQClientReconnectOptionsWithANYOption() {
+ final Map configProps = new HashMap();
+ configProps.put("mq.exactly.once.state.queue", "DEV.QUEUE.2");
+ configProps.put("mq.client.reconnect.options", "ANY");
+ configProps.put("tasks.max", "1");
+
+ final Config config = new MQSourceConnector().validate(configProps);
+
+ assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
+ assertTrue(config.configValues().stream()
+ .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_CLIENT_RECONNECT_OPTIONS))
+ .flatMap(cv -> cv.errorMessages().stream())
+ .anyMatch(msg -> msg.contains("When running the MQ source connector with exactly once mode, the client reconnect option 'QMGR' should be provided.")));
+ }
+}