diff --git a/pom.xml b/pom.xml
index e6759b9..5a9ae3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,28 +187,6 @@
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
- 3.3.0
-
-
- add-test-source
- process-test-sources
-
- add-test-source
-
-
-
-
-
-
-
-
-
@@ -323,5 +301,12 @@
+
+
+
+
+ src/integration/resources
+
+
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java
index d56af8f..06ccb5e 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java
@@ -31,6 +31,7 @@
import org.junit.ClassRule;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.WaitingConsumer;
+import org.testcontainers.utility.MountableFile;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.jms.JmsConnectionFactory;
@@ -53,6 +54,7 @@ public class AbstractJMSContextIT {
.withEnv("LICENSE", "accept")
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
+ .withCopyFileToContainer(MountableFile.forClasspathResource("no-auth-qmgr.mqsc"), "/etc/mqm/99-no-auth-qmgr.mqsc")
.withExposedPorts(1414);
private JMSContext jmsContext;
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java
index 86f2e9a..b3109f9 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java
@@ -72,6 +72,7 @@ private Map getConnectorProps() {
connectorProps.put("mq.password", APP_PASSWORD);
connectorProps.put("mq.message.body.jms", "false");
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
+ connectorProps.put("topic", "mytopic");
return connectorProps;
}
diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
index 782ea94..c9ca4ce 100644
--- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
+++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java
@@ -61,6 +61,7 @@ private Map createDefaultConnectorProperties() {
props.put("mq.channel.name", getChannelName());
props.put("mq.queue", MQ_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
+ props.put("topic", "mytopic");
return props;
}
@@ -82,6 +83,7 @@ public void verifyJmsTextMessages() throws Exception {
final List kafkaMessages = connectTask.poll();
assertEquals(2, kafkaMessages.size());
for (final SourceRecord kafkaMessage : kafkaMessages) {
+ assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());
@@ -116,6 +118,7 @@ public void verifyJmsJsonMessages() throws Exception {
assertEquals(5, kafkaMessages.size());
for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
+ assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());
@@ -148,6 +151,7 @@ public void verifyJmsMessageHeaders() throws Exception {
final List kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());
final SourceRecord kafkaMessage = kafkaMessages.get(0);
+ assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());
diff --git a/src/integration/resources/no-auth-qmgr.mqsc b/src/integration/resources/no-auth-qmgr.mqsc
new file mode 100644
index 0000000..a05d40e
--- /dev/null
+++ b/src/integration/resources/no-auth-qmgr.mqsc
@@ -0,0 +1,3 @@
+ALTER QMGR CHLAUTH(DISABLED)
+ALTER QMGR CONNAUTH(' ')
+REFRESH SECURITY TYPE(CONNAUTH)
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java
index 8354f46..71b5db0 100755
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java
@@ -31,7 +31,6 @@
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSConsumer;
@@ -45,6 +44,8 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
@@ -61,7 +62,7 @@ public class JMSReader {
// Configs
private String userName;
- private String password;
+ private Password password;
private String topic;
private boolean messageBodyJms;
@@ -93,79 +94,44 @@ public JMSReader() {
*
* @throws ConnectException Operation failed and connector should stop.
*/
- public void configure(final Map props) {
+ public void configure(final AbstractConfig config) {
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
- props);
-
- final String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER);
- final String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE);
- final String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST);
- final String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME);
- final String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
- final String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
- final String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
- final String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
- final String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
- final String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
- final String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ);
- final String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
- final String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
- final String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
- final String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
- final String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
- final String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
- final String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP);
- final String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
- final String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);
-
- if (useIBMCipherMappings != null) {
- System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings);
- }
+ config);
- int transportType = WMQConstants.WMQ_CM_CLIENT;
- if (connectionMode != null) {
- if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
- transportType = WMQConstants.WMQ_CM_CLIENT;
- } else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) {
- transportType = WMQConstants.WMQ_CM_BINDINGS;
- } else {
- log.error("Unsupported MQ connection mode {}", connectionMode);
- throw new ConnectException("Unsupported MQ connection mode");
- }
- }
+ System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
+ config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());
+
+ final int transportType =
+ config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE)
+ .equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT) ?
+ WMQConstants.WMQ_CM_CLIENT :
+ WMQConstants.WMQ_CM_BINDINGS;
try {
mqConnFactory = new MQConnectionFactory();
mqConnFactory.setTransportType(transportType);
- mqConnFactory.setQueueManager(queueManager);
- mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
- if (useMQCSP != null) {
- mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
- Boolean.parseBoolean(useMQCSP));
- }
+ mqConnFactory.setQueueManager(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER));
+ mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
+ config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP));
if (transportType == WMQConstants.WMQ_CM_CLIENT) {
+ final String ccdtUrl = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
+
if (ccdtUrl != null) {
- final URL ccdtUrlObject;
- try {
- ccdtUrlObject = new URL(ccdtUrl);
- } catch (final MalformedURLException e) {
- log.error("MalformedURLException exception {}", e);
- throw new ConnectException("CCDT file url invalid", e);
- }
- mqConnFactory.setCCDTURL(ccdtUrlObject);
+ mqConnFactory.setCCDTURL(new URL(ccdtUrl));
} else {
- mqConnFactory.setConnectionNameList(connectionNameList);
- mqConnFactory.setChannel(channelName);
+ mqConnFactory.setConnectionNameList(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST));
+ mqConnFactory.setChannel(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME));
}
- if (sslCipherSuite != null) {
- mqConnFactory.setSSLCipherSuite(sslCipherSuite);
- if (sslPeerName != null) {
- mqConnFactory.setSSLPeerName(sslPeerName);
- }
- }
+ mqConnFactory.setSSLCipherSuite(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE));
+ mqConnFactory.setSSLPeerName(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME));
+
+ final String sslKeystoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
+ final Password sslKeystorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
+ final String sslTruststoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
+ final Password sslTruststorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
if (sslKeystoreLocation != null || sslTruststoreLocation != null) {
final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword,
sslTruststoreLocation, sslTruststorePassword);
@@ -173,36 +139,35 @@ public void configure(final Map props) {
}
}
- queue = new MQQueue(queueName);
+ queue = new MQQueue(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE));
- this.userName = userName;
- this.password = password;
+ userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
+ password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
- this.messageBodyJms = false;
- queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
- if (mbj != null) {
- if (Boolean.parseBoolean(mbj)) {
- this.messageBodyJms = true;
- queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_JMS);
- }
- }
+ messageBodyJms = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
+ queue.setMessageBodyStyle(messageBodyJms ?
+ WMQConstants.WMQ_MESSAGE_BODY_JMS :
+ WMQConstants.WMQ_MESSAGE_BODY_MQ);
- if (mdr != null) {
- if (Boolean.parseBoolean(mdr)) {
- queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true);
- }
- }
+ queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED,
+ config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ));
+
+ topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);
- this.topic = topic;
} catch (JMSException | JMSRuntimeException jmse) {
log.error("JMS exception {}", jmse);
throw new ConnectException(jmse);
+ } catch (final MalformedURLException e) {
+ log.error("MalformedURLException exception {}", e);
+ throw new ConnectException("CCDT file url invalid", e);
}
+
+ final String builderClass = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
try {
final Class extends RecordBuilder> c = Class.forName(builderClass).asSubclass(RecordBuilder.class);
builder = c.newInstance();
- builder.configure(props);
+ builder.configure(config.originalsStrings());
} catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException
| NullPointerException exc) {
log.error("Could not instantiate message builder {}", builderClass);
@@ -220,7 +185,7 @@ public void connect() {
try {
if (userName != null) {
- jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
+ jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
} else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
@@ -373,7 +338,7 @@ private boolean connectInternal() {
log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName());
try {
if (userName != null) {
- jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
+ jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
} else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
@@ -498,8 +463,8 @@ private ConnectException handleException(final Throwable exc) {
return new ConnectException(exc);
}
- private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword,
- final String sslTruststoreLocation, final String sslTruststorePassword) {
+ private SSLContext buildSslContext(final String sslKeystoreLocation, final Password sslKeystorePassword,
+ final String sslTruststoreLocation, final Password sslTruststorePassword) {
log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());
try {
@@ -508,7 +473,7 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin
if (sslKeystoreLocation != null) {
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
+ kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.value().toCharArray());
keyManagers = kmf.getKeyManagers();
}
@@ -530,12 +495,12 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin
}
}
- private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException {
+ private KeyStore loadKeyStore(final String location, final Password password) throws GeneralSecurityException {
log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName());
try (final InputStream ksStr = new FileInputStream(location)) {
final KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(ksStr, password.toCharArray());
+ ks.load(ksStr, password.value().toCharArray());
log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(),
this.getClass().getName(), ks);
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
index 24df222..5a640ef 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java
@@ -15,6 +15,9 @@
*/
package com.ibm.eventstreams.connect.mqsource;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@@ -25,15 +28,17 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(MQSourceConnector.class);
+ public static final ConfigDef CONFIGDEF;
+
public static final String CONFIG_GROUP_MQ = "mq";
public static final String CONFIG_NAME_MQ_QUEUE_MANAGER = "mq.queue.manager";
@@ -218,104 +223,264 @@ public void stop() {
*/
@Override
public ConfigDef config() {
- final ConfigDef config = new ConfigDef();
-
- config.define(CONFIG_NAME_MQ_QUEUE_MANAGER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
- CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, CONFIG_GROUP_MQ, 1, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_QUEUE_MANAGER);
-
- config.define(CONFIG_NAME_MQ_CONNECTION_MODE, Type.STRING, CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT,
- ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT,
- CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS),
- Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE, CONFIG_GROUP_MQ, 2, Width.SHORT,
- CONFIG_DISPLAY_MQ_CONNECTION_MODE);
-
- config.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST, CONFIG_GROUP_MQ, 3, Width.LONG,
- CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST);
-
- config.define(CONFIG_NAME_MQ_CHANNEL_NAME, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, CONFIG_GROUP_MQ, 4, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_CHANNEL_NAME);
-
- config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 5, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_CCDT_URL);
-
- config.define(CONFIG_NAME_MQ_QUEUE, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
- CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 6, Width.LONG,
- CONFIG_DISPLAY_MQ_QUEUE);
-
- config.define(CONFIG_NAME_MQ_USER_NAME, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 7, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_USER_NAME);
-
- config.define(CONFIG_NAME_MQ_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 8, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_PASSWORD);
-
- config.define(CONFIG_NAME_MQ_RECORD_BUILDER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
- CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 9, Width.LONG,
- CONFIG_DISPLAY_MQ_RECORD_BUILDER);
-
- config.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, Type.BOOLEAN, Boolean.FALSE, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 10, Width.SHORT,
- CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS);
-
- config.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 11, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER);
-
- config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 12, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
-
- config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 13, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
-
- config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION, CONFIG_GROUP_MQ, 14, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION);
-
- config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD, CONFIG_GROUP_MQ, 15, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD);
-
- config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION, CONFIG_GROUP_MQ, 16, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION);
-
- config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
- CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD, CONFIG_GROUP_MQ, 17, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD);
-
- config.define(CONFIG_NAME_MQ_BATCH_SIZE, Type.INT, CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT,
- ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), Importance.LOW,
- CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 18, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_BATCH_SIZE);
+ return CONFIGDEF;
+ }
- config.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ, Type.BOOLEAN, Boolean.FALSE, Importance.LOW,
- CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, CONFIG_GROUP_MQ, 19, Width.SHORT,
- CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ);
+ /** Null validator - indicates that any value is acceptable for this config option. */
+ private static final ConfigDef.Validator ANY = null;
+
+ static {
+ CONFIGDEF = new ConfigDef();
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_QUEUE_MANAGER,
+ Type.STRING,
+ // user must specify the queue manager name
+ ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(),
+ Importance.HIGH,
+ CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER,
+ CONFIG_GROUP_MQ, 1, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_QUEUE_MANAGER);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_CONNECTION_MODE,
+ Type.STRING,
+ // required value - two valid options
+ CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT,
+ ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT,
+ CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE,
+ CONFIG_GROUP_MQ, 2, Width.SHORT,
+ CONFIG_DISPLAY_MQ_CONNECTION_MODE);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST,
+ Type.STRING,
+ // can be null, for example when using bindings mode or a CCDT
+ null, ANY,
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST,
+ CONFIG_GROUP_MQ, 3, Width.LONG,
+ CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_CHANNEL_NAME,
+ Type.STRING,
+ // can be null, for example when using bindings mode
+ null, ANY,
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME,
+ CONFIG_GROUP_MQ, 4, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_CHANNEL_NAME);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_CCDT_URL,
+ Type.STRING,
+ // can be null, for example when using bindings mode or a conname list
+ null, new ValidURL(),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_CCDT_URL,
+ CONFIG_GROUP_MQ, 5, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_CCDT_URL);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_QUEUE,
+ Type.STRING,
+ // user must specify the queue name
+ ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(),
+ Importance.HIGH,
+ CONFIG_DOCUMENTATION_MQ_QUEUE,
+ CONFIG_GROUP_MQ, 6, Width.LONG,
+ CONFIG_DISPLAY_MQ_QUEUE);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_USER_NAME,
+ Type.STRING,
+ // can be null, when auth not required
+ null, ANY,
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_USER_NAME,
+ CONFIG_GROUP_MQ, 7, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_USER_NAME);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_PASSWORD,
+ Type.PASSWORD,
+ // can be null, when auth not required
+ null, ANY,
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_PASSWORD,
+ CONFIG_GROUP_MQ, 8, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_PASSWORD);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER,
+ Type.STRING,
+ // user must specify a record builder class
+ ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(),
+ Importance.HIGH,
+ CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER,
+ CONFIG_GROUP_MQ, 9, Width.LONG,
+ CONFIG_DISPLAY_MQ_RECORD_BUILDER);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS,
+ Type.BOOLEAN,
+ // must be a non-null boolean - assume false if not provided
+ Boolean.FALSE, new ConfigDef.NonNullValidator(),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS,
+ CONFIG_GROUP_MQ, 10, Width.SHORT,
+ CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER,
+ Type.STRING,
+ // optional value - four valid values
+ null, ConfigDef.ValidString.in(null,
+ CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID,
+ CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID,
+ CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES,
+ CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER,
+ CONFIG_GROUP_MQ, 11, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE,
+ Type.STRING,
+ // optional - not needed if not using SSL - SSL cipher suites change
+ // too frequently so we won't maintain a valid list here
+ null, ANY,
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE,
+ CONFIG_GROUP_MQ, 12, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_SSL_PEER_NAME,
+ Type.STRING,
+ // optional - not needed if not using SSL
+ null, ANY,
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME,
+ CONFIG_GROUP_MQ, 13, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION,
+ Type.STRING,
+ // optional - if provided should be the location of a readable file
+ null, new ReadableFile(),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION,
+ CONFIG_GROUP_MQ, 14, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD,
+ Type.PASSWORD,
+ // optional - not needed if SSL keystore isn't provided
+ null, ANY,
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD,
+ CONFIG_GROUP_MQ, 15, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION,
+ Type.STRING,
+ // optional - if provided should be the location of a readable file
+ null, new ReadableFile(),
+ Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION,
+ CONFIG_GROUP_MQ, 16, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD,
+ Type.PASSWORD,
+ // optional - not needed if SSL truststore isn't provided
+ null, Importance.MEDIUM,
+ CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD,
+ CONFIG_GROUP_MQ, 17, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_BATCH_SIZE,
+ Type.INT,
+ // must be an int greater than min
+ CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT, ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_BATCH_SIZE,
+ CONFIG_GROUP_MQ, 18, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_BATCH_SIZE);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ,
+ Type.BOOLEAN,
+ // must be a non-null boolean - assume false if not provided
+ Boolean.FALSE, new ConfigDef.NonNullValidator(),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ,
+ CONFIG_GROUP_MQ, 19, Width.SHORT,
+ CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP,
+ Type.BOOLEAN,
+ // must be a non-null boolean - assume true if not provided
+ Boolean.TRUE, new ConfigDef.NonNullValidator(),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP,
+ CONFIG_GROUP_MQ, 20, Width.SHORT,
+ CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER,
+ Type.BOOLEAN,
+ // must be a non-null boolean - assume false if not provided
+ Boolean.FALSE, new ConfigDef.NonNullValidator(),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER,
+ CONFIG_GROUP_MQ, 21, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
+
+ CONFIGDEF.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS,
+ Type.BOOLEAN,
+ // must be a non-null boolean - assume true if not provided
+ Boolean.TRUE, new ConfigDef.NonNullValidator(),
+ Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS,
+ CONFIG_GROUP_MQ, 22, Width.SHORT,
+ CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
+
+ CONFIGDEF.define(CONFIG_NAME_TOPIC,
+ Type.STRING,
+ // user must specify the topic name
+ ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(),
+ Importance.HIGH,
+ CONFIG_DOCUMENTATION_TOPIC,
+ null, 0, Width.MEDIUM,
+ CONFIG_DISPLAY_TOPIC);
+ }
- config.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, Type.BOOLEAN, Boolean.TRUE, Importance.LOW,
- CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 20, Width.SHORT,
- CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP);
- config.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, Type.BOOLEAN, Boolean.FALSE, Importance.LOW,
- CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM,
- CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
+ private static class ReadableFile implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ final String strValue = (String) value;
+ if (value == null || strValue.isEmpty()) {
+ // only validate non-empty locations
+ return;
+ }
- config.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, Type.BOOLEAN, null, Importance.LOW,
- CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 22, Width.SHORT,
- CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
+ try {
+ final File file = new File((String) value);
+ if (!file.isFile() || !file.canRead()) {
+ throw new ConfigException(name, value, "Value must be the location of a readable file");
+ }
+ } catch (final Exception exc) {
+ throw new ConfigException(name, value, "Value must be a valid file location");
+ }
+ }
+ }
- config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
- CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
- CONFIG_DISPLAY_TOPIC);
+ private static class ValidURL implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(final String name, final Object value) {
+ final String strValue = (String) value;
+ if (value == null || strValue.isEmpty()) {
+ // only validate non-empty locations
+ return;
+ }
- return config;
+ try {
+ new URL(strValue);
+ } catch (final MalformedURLException exc) {
+ throw new ConfigException(name, value, "Value must be a valid URL");
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java
index 04291f3..f635e2d 100755
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java
@@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -71,14 +72,13 @@ public MQSourceTask() {
log.debug("Task props entry {} : {}", entry.getKey(), value);
}
- final String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE);
- if (strBatchSize != null) {
- batchSize = Integer.parseInt(strBatchSize);
- }
+ final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
+
+ batchSize = config.getInt(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE);
// Construct a reader to interface with MQ
reader = new JMSReader();
- reader.configure(props);
+ reader.configure(config);
// Make a connection as an initial test of the configuration
reader.connect();
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java
index 8a3c40e..fd4c0b6 100755
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java
@@ -17,6 +17,8 @@
import com.ibm.eventstreams.connect.mqsource.MQSourceConnector;
import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter;
+
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
@@ -29,7 +31,6 @@
import javax.jms.JMSException;
import javax.jms.Message;
import java.util.Map;
-import java.util.Optional;
/**
* Builds Kafka Connect SourceRecords from messages.
@@ -55,28 +56,27 @@ public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTE
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
props);
- final String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER);
- if (kh != null) {
- if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID)) {
+ final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props);
+ switch (String.valueOf(config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER))) {
+ case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID:
keyheader = KeyHeader.MESSAGE_ID;
log.debug("Setting Kafka record key from JMSMessageID header field");
- } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID)) {
+ break;
+ case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID:
keyheader = KeyHeader.CORRELATION_ID;
log.debug("Setting Kafka record key from JMSCorrelationID header field");
- } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES)) {
+ break;
+ case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES:
keyheader = KeyHeader.CORRELATION_ID_AS_BYTES;
log.debug("Setting Kafka record key from JMSCorrelationIDAsBytes header field");
- } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION)) {
+ break;
+ case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION:
keyheader = KeyHeader.DESTINATION;
log.debug("Setting Kafka record key from JMSDestination header field");
- } else {
- log.error("Unsupported MQ record builder key header value {}", kh);
- throw new ConnectException("Unsupported MQ record builder key header value");
- }
+ break;
}
- final String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
- copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false"));
+ copyJmsPropertiesFlag = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);
jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter();
log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());