diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java index 86f2e9a..b3109f9 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java @@ -72,6 +72,7 @@ private Map getConnectorProps() { connectorProps.put("mq.password", APP_PASSWORD); connectorProps.put("mq.message.body.jms", "false"); connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorProps.put("topic", "mytopic"); return connectorProps; } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 782ea94..c9ca4ce 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -61,6 +61,7 @@ private Map createDefaultConnectorProperties() { props.put("mq.channel.name", getChannelName()); props.put("mq.queue", MQ_QUEUE); props.put("mq.user.authentication.mqcsp", "false"); + props.put("topic", "mytopic"); return props; } @@ -82,6 +83,7 @@ public void verifyJmsTextMessages() throws Exception { final List kafkaMessages = connectTask.poll(); assertEquals(2, kafkaMessages.size()); for (final SourceRecord kafkaMessage : kafkaMessages) { + assertEquals("mytopic", kafkaMessage.topic()); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); @@ -116,6 +118,7 @@ public void verifyJmsJsonMessages() throws Exception { assertEquals(5, kafkaMessages.size()); for (int i = 0; i < 5; i++) { final SourceRecord kafkaMessage = kafkaMessages.get(i); + assertEquals("mytopic", kafkaMessage.topic()); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); @@ -148,6 +151,7 @@ public void verifyJmsMessageHeaders() throws Exception { final List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); final SourceRecord kafkaMessage = kafkaMessages.get(0); + assertEquals("mytopic", kafkaMessage.topic()); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java index 8354f46..71b5db0 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java @@ -31,7 +31,6 @@ import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.SecureRandom; -import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSConsumer; @@ -45,6 +44,8 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.source.SourceRecord; @@ -61,7 +62,7 @@ public class JMSReader { // Configs private String userName; - private String password; + private Password password; private String topic; private boolean messageBodyJms; @@ -93,79 +94,44 @@ public JMSReader() { * * @throws ConnectException Operation failed and connector should stop. */ - public void configure(final Map props) { + public void configure(final AbstractConfig config) { log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), - props); - - final String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER); - final String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE); - final String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST); - final String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME); - final String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE); - final String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); - final String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); - final String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL); - final String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER); - final String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS); - final String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ); - final String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE); - final String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME); - final String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION); - final String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD); - final String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION); - final String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD); - final String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP); - final String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); - final String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC); - - if (useIBMCipherMappings != null) { - System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings); - } + config); - int transportType = WMQConstants.WMQ_CM_CLIENT; - if (connectionMode != null) { - if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) { - transportType = WMQConstants.WMQ_CM_CLIENT; - } else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) { - transportType = WMQConstants.WMQ_CM_BINDINGS; - } else { - log.error("Unsupported MQ connection mode {}", connectionMode); - throw new ConnectException("Unsupported MQ connection mode"); - } - } + System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", + config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString()); + + final int transportType = + config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE) + .equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT) ? + WMQConstants.WMQ_CM_CLIENT : + WMQConstants.WMQ_CM_BINDINGS; try { mqConnFactory = new MQConnectionFactory(); mqConnFactory.setTransportType(transportType); - mqConnFactory.setQueueManager(queueManager); - mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true); - if (useMQCSP != null) { - mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, - Boolean.parseBoolean(useMQCSP)); - } + mqConnFactory.setQueueManager(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER)); + mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, + config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP)); if (transportType == WMQConstants.WMQ_CM_CLIENT) { + final String ccdtUrl = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL); + if (ccdtUrl != null) { - final URL ccdtUrlObject; - try { - ccdtUrlObject = new URL(ccdtUrl); - } catch (final MalformedURLException e) { - log.error("MalformedURLException exception {}", e); - throw new ConnectException("CCDT file url invalid", e); - } - mqConnFactory.setCCDTURL(ccdtUrlObject); + mqConnFactory.setCCDTURL(new URL(ccdtUrl)); } else { - mqConnFactory.setConnectionNameList(connectionNameList); - mqConnFactory.setChannel(channelName); + mqConnFactory.setConnectionNameList(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST)); + mqConnFactory.setChannel(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME)); } - if (sslCipherSuite != null) { - mqConnFactory.setSSLCipherSuite(sslCipherSuite); - if (sslPeerName != null) { - mqConnFactory.setSSLPeerName(sslPeerName); - } - } + mqConnFactory.setSSLCipherSuite(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE)); + mqConnFactory.setSSLPeerName(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME)); + + final String sslKeystoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION); + final Password sslKeystorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD); + final String sslTruststoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION); + final Password sslTruststorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD); if (sslKeystoreLocation != null || sslTruststoreLocation != null) { final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword, sslTruststoreLocation, sslTruststorePassword); @@ -173,36 +139,35 @@ public void configure(final Map props) { } } - queue = new MQQueue(queueName); + queue = new MQQueue(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE)); - this.userName = userName; - this.password = password; + userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); + password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); - this.messageBodyJms = false; - queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ); - if (mbj != null) { - if (Boolean.parseBoolean(mbj)) { - this.messageBodyJms = true; - queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_JMS); - } - } + messageBodyJms = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS); + queue.setMessageBodyStyle(messageBodyJms ? + WMQConstants.WMQ_MESSAGE_BODY_JMS : + WMQConstants.WMQ_MESSAGE_BODY_MQ); - if (mdr != null) { - if (Boolean.parseBoolean(mdr)) { - queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true); - } - } + queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, + config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ)); + + topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC); - this.topic = topic; } catch (JMSException | JMSRuntimeException jmse) { log.error("JMS exception {}", jmse); throw new ConnectException(jmse); + } catch (final MalformedURLException e) { + log.error("MalformedURLException exception {}", e); + throw new ConnectException("CCDT file url invalid", e); } + + final String builderClass = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER); try { final Class c = Class.forName(builderClass).asSubclass(RecordBuilder.class); builder = c.newInstance(); - builder.configure(props); + builder.configure(config.originalsStrings()); } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException | NullPointerException exc) { log.error("Could not instantiate message builder {}", builderClass); @@ -220,7 +185,7 @@ public void connect() { try { if (userName != null) { - jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED); + jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED); } else { jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED); } @@ -373,7 +338,7 @@ private boolean connectInternal() { log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName()); try { if (userName != null) { - jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED); + jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED); } else { jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED); } @@ -498,8 +463,8 @@ private ConnectException handleException(final Throwable exc) { return new ConnectException(exc); } - private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword, - final String sslTruststoreLocation, final String sslTruststorePassword) { + private SSLContext buildSslContext(final String sslKeystoreLocation, final Password sslKeystorePassword, + final String sslTruststoreLocation, final Password sslTruststorePassword) { log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName()); try { @@ -508,7 +473,7 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin if (sslKeystoreLocation != null) { final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray()); + kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.value().toCharArray()); keyManagers = kmf.getKeyManagers(); } @@ -530,12 +495,12 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin } } - private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException { + private KeyStore loadKeyStore(final String location, final Password password) throws GeneralSecurityException { log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName()); try (final InputStream ksStr = new FileInputStream(location)) { final KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(ksStr, password.toCharArray()); + ks.load(ksStr, password.value().toCharArray()); log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 24df222..5a640ef 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -15,6 +15,9 @@ */ package com.ibm.eventstreams.connect.mqsource; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -25,15 +28,17 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MQSourceConnector extends SourceConnector { private static final Logger log = LoggerFactory.getLogger(MQSourceConnector.class); + public static final ConfigDef CONFIGDEF; + public static final String CONFIG_GROUP_MQ = "mq"; public static final String CONFIG_NAME_MQ_QUEUE_MANAGER = "mq.queue.manager"; @@ -218,104 +223,264 @@ public void stop() { */ @Override public ConfigDef config() { - final ConfigDef config = new ConfigDef(); - - config.define(CONFIG_NAME_MQ_QUEUE_MANAGER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, CONFIG_GROUP_MQ, 1, Width.MEDIUM, - CONFIG_DISPLAY_MQ_QUEUE_MANAGER); - - config.define(CONFIG_NAME_MQ_CONNECTION_MODE, Type.STRING, CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, - ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, - CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS), - Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE, CONFIG_GROUP_MQ, 2, Width.SHORT, - CONFIG_DISPLAY_MQ_CONNECTION_MODE); - - config.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST, CONFIG_GROUP_MQ, 3, Width.LONG, - CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST); - - config.define(CONFIG_NAME_MQ_CHANNEL_NAME, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, CONFIG_GROUP_MQ, 4, Width.MEDIUM, - CONFIG_DISPLAY_MQ_CHANNEL_NAME); - - config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 5, Width.MEDIUM, - CONFIG_DISPLAY_MQ_CCDT_URL); - - config.define(CONFIG_NAME_MQ_QUEUE, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 6, Width.LONG, - CONFIG_DISPLAY_MQ_QUEUE); - - config.define(CONFIG_NAME_MQ_USER_NAME, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 7, Width.MEDIUM, - CONFIG_DISPLAY_MQ_USER_NAME); - - config.define(CONFIG_NAME_MQ_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 8, Width.MEDIUM, - CONFIG_DISPLAY_MQ_PASSWORD); - - config.define(CONFIG_NAME_MQ_RECORD_BUILDER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 9, Width.LONG, - CONFIG_DISPLAY_MQ_RECORD_BUILDER); - - config.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, Type.BOOLEAN, Boolean.FALSE, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 10, Width.SHORT, - CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS); - - config.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 11, Width.MEDIUM, - CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER); - - config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 12, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE); - - config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 13, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_PEER_NAME); - - config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION, CONFIG_GROUP_MQ, 14, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION); - - config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD, CONFIG_GROUP_MQ, 15, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD); - - config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION, CONFIG_GROUP_MQ, 16, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION); - - config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD, CONFIG_GROUP_MQ, 17, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD); - - config.define(CONFIG_NAME_MQ_BATCH_SIZE, Type.INT, CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT, - ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), Importance.LOW, - CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 18, Width.MEDIUM, - CONFIG_DISPLAY_MQ_BATCH_SIZE); + return CONFIGDEF; + } - config.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ, Type.BOOLEAN, Boolean.FALSE, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, CONFIG_GROUP_MQ, 19, Width.SHORT, - CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ); + /** Null validator - indicates that any value is acceptable for this config option. */ + private static final ConfigDef.Validator ANY = null; + + static { + CONFIGDEF = new ConfigDef(); + + CONFIGDEF.define(CONFIG_NAME_MQ_QUEUE_MANAGER, + Type.STRING, + // user must specify the queue manager name + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, + CONFIG_GROUP_MQ, 1, Width.MEDIUM, + CONFIG_DISPLAY_MQ_QUEUE_MANAGER); + + CONFIGDEF.define(CONFIG_NAME_MQ_CONNECTION_MODE, + Type.STRING, + // required value - two valid options + CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, + ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, + CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE, + CONFIG_GROUP_MQ, 2, Width.SHORT, + CONFIG_DISPLAY_MQ_CONNECTION_MODE); + + CONFIGDEF.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, + Type.STRING, + // can be null, for example when using bindings mode or a CCDT + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST, + CONFIG_GROUP_MQ, 3, Width.LONG, + CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST); + + CONFIGDEF.define(CONFIG_NAME_MQ_CHANNEL_NAME, + Type.STRING, + // can be null, for example when using bindings mode + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, + CONFIG_GROUP_MQ, 4, Width.MEDIUM, + CONFIG_DISPLAY_MQ_CHANNEL_NAME); + + CONFIGDEF.define(CONFIG_NAME_MQ_CCDT_URL, + Type.STRING, + // can be null, for example when using bindings mode or a conname list + null, new ValidURL(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CCDT_URL, + CONFIG_GROUP_MQ, 5, Width.MEDIUM, + CONFIG_DISPLAY_MQ_CCDT_URL); + + CONFIGDEF.define(CONFIG_NAME_MQ_QUEUE, + Type.STRING, + // user must specify the queue name + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_MQ_QUEUE, + CONFIG_GROUP_MQ, 6, Width.LONG, + CONFIG_DISPLAY_MQ_QUEUE); + + CONFIGDEF.define(CONFIG_NAME_MQ_USER_NAME, + Type.STRING, + // can be null, when auth not required + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_USER_NAME, + CONFIG_GROUP_MQ, 7, Width.MEDIUM, + CONFIG_DISPLAY_MQ_USER_NAME); + + CONFIGDEF.define(CONFIG_NAME_MQ_PASSWORD, + Type.PASSWORD, + // can be null, when auth not required + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_PASSWORD, + CONFIG_GROUP_MQ, 8, Width.MEDIUM, + CONFIG_DISPLAY_MQ_PASSWORD); + + CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER, + Type.STRING, + // user must specify a record builder class + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, + CONFIG_GROUP_MQ, 9, Width.LONG, + CONFIG_DISPLAY_MQ_RECORD_BUILDER); + + CONFIGDEF.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, + Type.BOOLEAN, + // must be a non-null boolean - assume false if not provided + Boolean.FALSE, new ConfigDef.NonNullValidator(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, + CONFIG_GROUP_MQ, 10, Width.SHORT, + CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS); + + CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, + Type.STRING, + // optional value - four valid values + null, ConfigDef.ValidString.in(null, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, + CONFIG_GROUP_MQ, 11, Width.MEDIUM, + CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, + Type.STRING, + // optional - not needed if not using SSL - SSL cipher suites change + // too frequently so we won't maintain a valid list here + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, + CONFIG_GROUP_MQ, 12, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_PEER_NAME, + Type.STRING, + // optional - not needed if not using SSL + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, + CONFIG_GROUP_MQ, 13, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_PEER_NAME); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION, + Type.STRING, + // optional - if provided should be the location of a readable file + null, new ReadableFile(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION, + CONFIG_GROUP_MQ, 14, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD, + Type.PASSWORD, + // optional - not needed if SSL keystore isn't provided + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD, + CONFIG_GROUP_MQ, 15, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION, + Type.STRING, + // optional - if provided should be the location of a readable file + null, new ReadableFile(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION, + CONFIG_GROUP_MQ, 16, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD, + Type.PASSWORD, + // optional - not needed if SSL truststore isn't provided + null, Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD, + CONFIG_GROUP_MQ, 17, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD); + + CONFIGDEF.define(CONFIG_NAME_MQ_BATCH_SIZE, + Type.INT, + // must be an int greater than min + CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT, ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, + CONFIG_GROUP_MQ, 18, Width.MEDIUM, + CONFIG_DISPLAY_MQ_BATCH_SIZE); + + CONFIGDEF.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ, + Type.BOOLEAN, + // must be a non-null boolean - assume false if not provided + Boolean.FALSE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, + CONFIG_GROUP_MQ, 19, Width.SHORT, + CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ); + + CONFIGDEF.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, + Type.BOOLEAN, + // must be a non-null boolean - assume true if not provided + Boolean.TRUE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, + CONFIG_GROUP_MQ, 20, Width.SHORT, + CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP); + + CONFIGDEF.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, + Type.BOOLEAN, + // must be a non-null boolean - assume false if not provided + Boolean.FALSE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, + CONFIG_GROUP_MQ, 21, Width.MEDIUM, + CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, + Type.BOOLEAN, + // must be a non-null boolean - assume true if not provided + Boolean.TRUE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, + CONFIG_GROUP_MQ, 22, Width.SHORT, + CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); + + CONFIGDEF.define(CONFIG_NAME_TOPIC, + Type.STRING, + // user must specify the topic name + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_TOPIC, + null, 0, Width.MEDIUM, + CONFIG_DISPLAY_TOPIC); + } - config.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, Type.BOOLEAN, Boolean.TRUE, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 20, Width.SHORT, - CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP); - config.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, Type.BOOLEAN, Boolean.FALSE, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM, - CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); + private static class ReadableFile implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + final String strValue = (String) value; + if (value == null || strValue.isEmpty()) { + // only validate non-empty locations + return; + } - config.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, Type.BOOLEAN, null, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 22, Width.SHORT, - CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); + try { + final File file = new File((String) value); + if (!file.isFile() || !file.canRead()) { + throw new ConfigException(name, value, "Value must be the location of a readable file"); + } + } catch (final Exception exc) { + throw new ConfigException(name, value, "Value must be a valid file location"); + } + } + } - config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM, - CONFIG_DISPLAY_TOPIC); + private static class ValidURL implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + final String strValue = (String) value; + if (value == null || strValue.isEmpty()) { + // only validate non-empty locations + return; + } - return config; + try { + new URL(strValue); + } catch (final MalformedURLException exc) { + throw new ConfigException(name, value, "Value must be a valid URL"); + } + } } } \ No newline at end of file diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 04291f3..f635e2d 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -71,14 +72,13 @@ public MQSourceTask() { log.debug("Task props entry {} : {}", entry.getKey(), value); } - final String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE); - if (strBatchSize != null) { - batchSize = Integer.parseInt(strBatchSize); - } + final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props); + + batchSize = config.getInt(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE); // Construct a reader to interface with MQ reader = new JMSReader(); - reader.configure(props); + reader.configure(config); // Make a connection as an initial test of the configuration reader.connect(); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java index 8a3c40e..fd4c0b6 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java @@ -17,6 +17,8 @@ import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter; + +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; @@ -29,7 +31,6 @@ import javax.jms.JMSException; import javax.jms.Message; import java.util.Map; -import java.util.Optional; /** * Builds Kafka Connect SourceRecords from messages. @@ -55,28 +56,27 @@ public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTE log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); - final String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER); - if (kh != null) { - if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID)) { + final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props); + switch (String.valueOf(config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER))) { + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID: keyheader = KeyHeader.MESSAGE_ID; log.debug("Setting Kafka record key from JMSMessageID header field"); - } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID)) { + break; + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID: keyheader = KeyHeader.CORRELATION_ID; log.debug("Setting Kafka record key from JMSCorrelationID header field"); - } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES)) { + break; + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES: keyheader = KeyHeader.CORRELATION_ID_AS_BYTES; log.debug("Setting Kafka record key from JMSCorrelationIDAsBytes header field"); - } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION)) { + break; + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION: keyheader = KeyHeader.DESTINATION; log.debug("Setting Kafka record key from JMSDestination header field"); - } else { - log.error("Unsupported MQ record builder key header value {}", kh); - throw new ConnectException("Unsupported MQ record builder key header value"); - } + break; } - final String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); - copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false")); + copyJmsPropertiesFlag = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter(); log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());