From db8e36b78a2efef6e238c58ed3778e469e75da4f Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Tue, 6 Feb 2024 22:36:15 +0000 Subject: [PATCH] refactor: delegate config validation to Kafka Connect The connector was doing a lot of it's own processing of config options - type-checking, null-checking, casting, etc. This makes the connector-specific aspects of the code harder to follow, and it also makes it easier for us to miss validating some required config options (for example, we weren't checking that topic names were provided). This commit moves all of this out of the connector, and makes it the responsibility of the Kafka Connect framework. This removes a lot of unnecessary checking and type checking/casting from the connector code, as the source task and jms reader class can assume that they will be given a validated config. Note that I did have to leave the existing Map parameter in the record builder as that is a public interface that users have subclassed. This should ideally be a Kafka Connect object rather than a raw map, but in the interest of not breaking existing subclasses, I've left it as a map. This does mean I'm converting the properties map to a config object twice, but as this is only at startup I think the performance cost will be minimal. Signed-off-by: Dale Lane --- .../connect/mqsource/MQSourceTaskAuthIT.java | 1 + .../connect/mqsource/MQSourceTaskIT.java | 4 + .../connect/mqsource/JMSReader.java | 139 +++---- .../connect/mqsource/MQSourceConnector.java | 353 +++++++++++++----- .../connect/mqsource/MQSourceTask.java | 10 +- .../mqsource/builders/BaseRecordBuilder.java | 26 +- 6 files changed, 334 insertions(+), 199 deletions(-) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java index 86f2e9a..b3109f9 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskAuthIT.java @@ -72,6 +72,7 @@ private Map getConnectorProps() { connectorProps.put("mq.password", APP_PASSWORD); connectorProps.put("mq.message.body.jms", "false"); connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorProps.put("topic", "mytopic"); return connectorProps; } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 782ea94..c9ca4ce 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -61,6 +61,7 @@ private Map createDefaultConnectorProperties() { props.put("mq.channel.name", getChannelName()); props.put("mq.queue", MQ_QUEUE); props.put("mq.user.authentication.mqcsp", "false"); + props.put("topic", "mytopic"); return props; } @@ -82,6 +83,7 @@ public void verifyJmsTextMessages() throws Exception { final List kafkaMessages = connectTask.poll(); assertEquals(2, kafkaMessages.size()); for (final SourceRecord kafkaMessage : kafkaMessages) { + assertEquals("mytopic", kafkaMessage.topic()); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); @@ -116,6 +118,7 @@ public void verifyJmsJsonMessages() throws Exception { assertEquals(5, kafkaMessages.size()); for (int i = 0; i < 5; i++) { final SourceRecord kafkaMessage = kafkaMessages.get(i); + assertEquals("mytopic", kafkaMessage.topic()); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); @@ -148,6 +151,7 @@ public void verifyJmsMessageHeaders() throws Exception { final List kafkaMessages = connectTask.poll(); assertEquals(1, kafkaMessages.size()); final SourceRecord kafkaMessage = kafkaMessages.get(0); + assertEquals("mytopic", kafkaMessage.topic()); assertNull(kafkaMessage.key()); assertNull(kafkaMessage.valueSchema()); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java index 8354f46..71b5db0 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java @@ -31,7 +31,6 @@ import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.SecureRandom; -import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSConsumer; @@ -45,6 +44,8 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.source.SourceRecord; @@ -61,7 +62,7 @@ public class JMSReader { // Configs private String userName; - private String password; + private Password password; private String topic; private boolean messageBodyJms; @@ -93,79 +94,44 @@ public JMSReader() { * * @throws ConnectException Operation failed and connector should stop. */ - public void configure(final Map props) { + public void configure(final AbstractConfig config) { log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), - props); - - final String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER); - final String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE); - final String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST); - final String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME); - final String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE); - final String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); - final String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); - final String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL); - final String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER); - final String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS); - final String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ); - final String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE); - final String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME); - final String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION); - final String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD); - final String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION); - final String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD); - final String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP); - final String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); - final String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC); - - if (useIBMCipherMappings != null) { - System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings); - } + config); - int transportType = WMQConstants.WMQ_CM_CLIENT; - if (connectionMode != null) { - if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) { - transportType = WMQConstants.WMQ_CM_CLIENT; - } else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) { - transportType = WMQConstants.WMQ_CM_BINDINGS; - } else { - log.error("Unsupported MQ connection mode {}", connectionMode); - throw new ConnectException("Unsupported MQ connection mode"); - } - } + System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", + config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString()); + + final int transportType = + config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE) + .equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT) ? + WMQConstants.WMQ_CM_CLIENT : + WMQConstants.WMQ_CM_BINDINGS; try { mqConnFactory = new MQConnectionFactory(); mqConnFactory.setTransportType(transportType); - mqConnFactory.setQueueManager(queueManager); - mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true); - if (useMQCSP != null) { - mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, - Boolean.parseBoolean(useMQCSP)); - } + mqConnFactory.setQueueManager(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER)); + mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, + config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP)); if (transportType == WMQConstants.WMQ_CM_CLIENT) { + final String ccdtUrl = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL); + if (ccdtUrl != null) { - final URL ccdtUrlObject; - try { - ccdtUrlObject = new URL(ccdtUrl); - } catch (final MalformedURLException e) { - log.error("MalformedURLException exception {}", e); - throw new ConnectException("CCDT file url invalid", e); - } - mqConnFactory.setCCDTURL(ccdtUrlObject); + mqConnFactory.setCCDTURL(new URL(ccdtUrl)); } else { - mqConnFactory.setConnectionNameList(connectionNameList); - mqConnFactory.setChannel(channelName); + mqConnFactory.setConnectionNameList(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST)); + mqConnFactory.setChannel(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME)); } - if (sslCipherSuite != null) { - mqConnFactory.setSSLCipherSuite(sslCipherSuite); - if (sslPeerName != null) { - mqConnFactory.setSSLPeerName(sslPeerName); - } - } + mqConnFactory.setSSLCipherSuite(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE)); + mqConnFactory.setSSLPeerName(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME)); + + final String sslKeystoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION); + final Password sslKeystorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD); + final String sslTruststoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION); + final Password sslTruststorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD); if (sslKeystoreLocation != null || sslTruststoreLocation != null) { final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword, sslTruststoreLocation, sslTruststorePassword); @@ -173,36 +139,35 @@ public void configure(final Map props) { } } - queue = new MQQueue(queueName); + queue = new MQQueue(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE)); - this.userName = userName; - this.password = password; + userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME); + password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD); - this.messageBodyJms = false; - queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ); - if (mbj != null) { - if (Boolean.parseBoolean(mbj)) { - this.messageBodyJms = true; - queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_JMS); - } - } + messageBodyJms = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS); + queue.setMessageBodyStyle(messageBodyJms ? + WMQConstants.WMQ_MESSAGE_BODY_JMS : + WMQConstants.WMQ_MESSAGE_BODY_MQ); - if (mdr != null) { - if (Boolean.parseBoolean(mdr)) { - queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true); - } - } + queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, + config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ)); + + topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC); - this.topic = topic; } catch (JMSException | JMSRuntimeException jmse) { log.error("JMS exception {}", jmse); throw new ConnectException(jmse); + } catch (final MalformedURLException e) { + log.error("MalformedURLException exception {}", e); + throw new ConnectException("CCDT file url invalid", e); } + + final String builderClass = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER); try { final Class c = Class.forName(builderClass).asSubclass(RecordBuilder.class); builder = c.newInstance(); - builder.configure(props); + builder.configure(config.originalsStrings()); } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException | NullPointerException exc) { log.error("Could not instantiate message builder {}", builderClass); @@ -220,7 +185,7 @@ public void connect() { try { if (userName != null) { - jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED); + jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED); } else { jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED); } @@ -373,7 +338,7 @@ private boolean connectInternal() { log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName()); try { if (userName != null) { - jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED); + jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED); } else { jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED); } @@ -498,8 +463,8 @@ private ConnectException handleException(final Throwable exc) { return new ConnectException(exc); } - private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword, - final String sslTruststoreLocation, final String sslTruststorePassword) { + private SSLContext buildSslContext(final String sslKeystoreLocation, final Password sslKeystorePassword, + final String sslTruststoreLocation, final Password sslTruststorePassword) { log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName()); try { @@ -508,7 +473,7 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin if (sslKeystoreLocation != null) { final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray()); + kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.value().toCharArray()); keyManagers = kmf.getKeyManagers(); } @@ -530,12 +495,12 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin } } - private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException { + private KeyStore loadKeyStore(final String location, final Password password) throws GeneralSecurityException { log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName()); try (final InputStream ksStr = new FileInputStream(location)) { final KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(ksStr, password.toCharArray()); + ks.load(ksStr, password.value().toCharArray()); log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 24df222..5a640ef 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -15,6 +15,9 @@ */ package com.ibm.eventstreams.connect.mqsource; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -25,15 +28,17 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MQSourceConnector extends SourceConnector { private static final Logger log = LoggerFactory.getLogger(MQSourceConnector.class); + public static final ConfigDef CONFIGDEF; + public static final String CONFIG_GROUP_MQ = "mq"; public static final String CONFIG_NAME_MQ_QUEUE_MANAGER = "mq.queue.manager"; @@ -218,104 +223,264 @@ public void stop() { */ @Override public ConfigDef config() { - final ConfigDef config = new ConfigDef(); - - config.define(CONFIG_NAME_MQ_QUEUE_MANAGER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, CONFIG_GROUP_MQ, 1, Width.MEDIUM, - CONFIG_DISPLAY_MQ_QUEUE_MANAGER); - - config.define(CONFIG_NAME_MQ_CONNECTION_MODE, Type.STRING, CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, - ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, - CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS), - Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE, CONFIG_GROUP_MQ, 2, Width.SHORT, - CONFIG_DISPLAY_MQ_CONNECTION_MODE); - - config.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST, CONFIG_GROUP_MQ, 3, Width.LONG, - CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST); - - config.define(CONFIG_NAME_MQ_CHANNEL_NAME, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, CONFIG_GROUP_MQ, 4, Width.MEDIUM, - CONFIG_DISPLAY_MQ_CHANNEL_NAME); - - config.define(CONFIG_NAME_MQ_CCDT_URL, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_CCDT_URL, CONFIG_GROUP_MQ, 5, Width.MEDIUM, - CONFIG_DISPLAY_MQ_CCDT_URL); - - config.define(CONFIG_NAME_MQ_QUEUE, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_MQ_QUEUE, CONFIG_GROUP_MQ, 6, Width.LONG, - CONFIG_DISPLAY_MQ_QUEUE); - - config.define(CONFIG_NAME_MQ_USER_NAME, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_USER_NAME, CONFIG_GROUP_MQ, 7, Width.MEDIUM, - CONFIG_DISPLAY_MQ_USER_NAME); - - config.define(CONFIG_NAME_MQ_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_PASSWORD, CONFIG_GROUP_MQ, 8, Width.MEDIUM, - CONFIG_DISPLAY_MQ_PASSWORD); - - config.define(CONFIG_NAME_MQ_RECORD_BUILDER, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, CONFIG_GROUP_MQ, 9, Width.LONG, - CONFIG_DISPLAY_MQ_RECORD_BUILDER); - - config.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, Type.BOOLEAN, Boolean.FALSE, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, CONFIG_GROUP_MQ, 10, Width.SHORT, - CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS); - - config.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, CONFIG_GROUP_MQ, 11, Width.MEDIUM, - CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER); - - config.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, CONFIG_GROUP_MQ, 12, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE); - - config.define(CONFIG_NAME_MQ_SSL_PEER_NAME, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 13, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_PEER_NAME); - - config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION, CONFIG_GROUP_MQ, 14, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION); - - config.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD, CONFIG_GROUP_MQ, 15, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD); - - config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION, CONFIG_GROUP_MQ, 16, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION); - - config.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM, - CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD, CONFIG_GROUP_MQ, 17, Width.MEDIUM, - CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD); - - config.define(CONFIG_NAME_MQ_BATCH_SIZE, Type.INT, CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT, - ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), Importance.LOW, - CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 18, Width.MEDIUM, - CONFIG_DISPLAY_MQ_BATCH_SIZE); + return CONFIGDEF; + } - config.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ, Type.BOOLEAN, Boolean.FALSE, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, CONFIG_GROUP_MQ, 19, Width.SHORT, - CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ); + /** Null validator - indicates that any value is acceptable for this config option. */ + private static final ConfigDef.Validator ANY = null; + + static { + CONFIGDEF = new ConfigDef(); + + CONFIGDEF.define(CONFIG_NAME_MQ_QUEUE_MANAGER, + Type.STRING, + // user must specify the queue manager name + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_MQ_QUEUE_MANAGER, + CONFIG_GROUP_MQ, 1, Width.MEDIUM, + CONFIG_DISPLAY_MQ_QUEUE_MANAGER); + + CONFIGDEF.define(CONFIG_NAME_MQ_CONNECTION_MODE, + Type.STRING, + // required value - two valid options + CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, + ConfigDef.ValidString.in(CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT, + CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CONNECTION_MODE, + CONFIG_GROUP_MQ, 2, Width.SHORT, + CONFIG_DISPLAY_MQ_CONNECTION_MODE); + + CONFIGDEF.define(CONFIG_NAME_MQ_CONNECTION_NAME_LIST, + Type.STRING, + // can be null, for example when using bindings mode or a CCDT + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CONNNECTION_NAME_LIST, + CONFIG_GROUP_MQ, 3, Width.LONG, + CONFIG_DISPLAY_MQ_CONNECTION_NAME_LIST); + + CONFIGDEF.define(CONFIG_NAME_MQ_CHANNEL_NAME, + Type.STRING, + // can be null, for example when using bindings mode + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CHANNEL_NAME, + CONFIG_GROUP_MQ, 4, Width.MEDIUM, + CONFIG_DISPLAY_MQ_CHANNEL_NAME); + + CONFIGDEF.define(CONFIG_NAME_MQ_CCDT_URL, + Type.STRING, + // can be null, for example when using bindings mode or a conname list + null, new ValidURL(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_CCDT_URL, + CONFIG_GROUP_MQ, 5, Width.MEDIUM, + CONFIG_DISPLAY_MQ_CCDT_URL); + + CONFIGDEF.define(CONFIG_NAME_MQ_QUEUE, + Type.STRING, + // user must specify the queue name + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_MQ_QUEUE, + CONFIG_GROUP_MQ, 6, Width.LONG, + CONFIG_DISPLAY_MQ_QUEUE); + + CONFIGDEF.define(CONFIG_NAME_MQ_USER_NAME, + Type.STRING, + // can be null, when auth not required + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_USER_NAME, + CONFIG_GROUP_MQ, 7, Width.MEDIUM, + CONFIG_DISPLAY_MQ_USER_NAME); + + CONFIGDEF.define(CONFIG_NAME_MQ_PASSWORD, + Type.PASSWORD, + // can be null, when auth not required + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_PASSWORD, + CONFIG_GROUP_MQ, 8, Width.MEDIUM, + CONFIG_DISPLAY_MQ_PASSWORD); + + CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER, + Type.STRING, + // user must specify a record builder class + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER, + CONFIG_GROUP_MQ, 9, Width.LONG, + CONFIG_DISPLAY_MQ_RECORD_BUILDER); + + CONFIGDEF.define(CONFIG_NAME_MQ_MESSAGE_BODY_JMS, + Type.BOOLEAN, + // must be a non-null boolean - assume false if not provided + Boolean.FALSE, new ConfigDef.NonNullValidator(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_MESSAGE_BODY_JMS, + CONFIG_GROUP_MQ, 10, Width.SHORT, + CONFIG_DISPLAY_MQ_MESSAGE_BODY_JMS); + + CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER, + Type.STRING, + // optional value - four valid values + null, ConfigDef.ValidString.in(null, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES, + CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_KEY_HEADER, + CONFIG_GROUP_MQ, 11, Width.MEDIUM, + CONFIG_DISPLAY_MQ_RECORD_BUILDER_KEY_HEADER); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_CIPHER_SUITE, + Type.STRING, + // optional - not needed if not using SSL - SSL cipher suites change + // too frequently so we won't maintain a valid list here + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_CIPHER_SUITE, + CONFIG_GROUP_MQ, 12, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_CIPHER_SUITE); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_PEER_NAME, + Type.STRING, + // optional - not needed if not using SSL + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, + CONFIG_GROUP_MQ, 13, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_PEER_NAME); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION, + Type.STRING, + // optional - if provided should be the location of a readable file + null, new ReadableFile(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_LOCATION, + CONFIG_GROUP_MQ, 14, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_KEYSTORE_LOCATION); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD, + Type.PASSWORD, + // optional - not needed if SSL keystore isn't provided + null, ANY, + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_KEYSTORE_PASSWORD, + CONFIG_GROUP_MQ, 15, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_KEYSTORE_PASSWORD); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION, + Type.STRING, + // optional - if provided should be the location of a readable file + null, new ReadableFile(), + Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_LOCATION, + CONFIG_GROUP_MQ, 16, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_LOCATION); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD, + Type.PASSWORD, + // optional - not needed if SSL truststore isn't provided + null, Importance.MEDIUM, + CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD, + CONFIG_GROUP_MQ, 17, Width.MEDIUM, + CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD); + + CONFIGDEF.define(CONFIG_NAME_MQ_BATCH_SIZE, + Type.INT, + // must be an int greater than min + CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT, ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, + CONFIG_GROUP_MQ, 18, Width.MEDIUM, + CONFIG_DISPLAY_MQ_BATCH_SIZE); + + CONFIGDEF.define(CONFIG_NAME_MQ_MESSAGE_MQMD_READ, + Type.BOOLEAN, + // must be a non-null boolean - assume false if not provided + Boolean.FALSE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_MESSAGE_MQMD_READ, + CONFIG_GROUP_MQ, 19, Width.SHORT, + CONFIG_DISPLAY_MQ_MESSAGE_MQMD_READ); + + CONFIGDEF.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, + Type.BOOLEAN, + // must be a non-null boolean - assume true if not provided + Boolean.TRUE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, + CONFIG_GROUP_MQ, 20, Width.SHORT, + CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP); + + CONFIGDEF.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, + Type.BOOLEAN, + // must be a non-null boolean - assume false if not provided + Boolean.FALSE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, + CONFIG_GROUP_MQ, 21, Width.MEDIUM, + CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); + + CONFIGDEF.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, + Type.BOOLEAN, + // must be a non-null boolean - assume true if not provided + Boolean.TRUE, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, + CONFIG_GROUP_MQ, 22, Width.SHORT, + CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); + + CONFIGDEF.define(CONFIG_NAME_TOPIC, + Type.STRING, + // user must specify the topic name + ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyStringWithoutControlChars(), + Importance.HIGH, + CONFIG_DOCUMENTATION_TOPIC, + null, 0, Width.MEDIUM, + CONFIG_DISPLAY_TOPIC); + } - config.define(CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP, Type.BOOLEAN, Boolean.TRUE, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_USER_AUTHENTICATION_MQCSP, CONFIG_GROUP_MQ, 20, Width.SHORT, - CONFIG_DISPLAY_MQ_USER_AUTHENTICATION_MQCSP); - config.define(CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, Type.BOOLEAN, Boolean.FALSE, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM, - CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); + private static class ReadableFile implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + final String strValue = (String) value; + if (value == null || strValue.isEmpty()) { + // only validate non-empty locations + return; + } - config.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, Type.BOOLEAN, null, Importance.LOW, - CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 22, Width.SHORT, - CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); + try { + final File file = new File((String) value); + if (!file.isFile() || !file.canRead()) { + throw new ConfigException(name, value, "Value must be the location of a readable file"); + } + } catch (final Exception exc) { + throw new ConfigException(name, value, "Value must be a valid file location"); + } + } + } - config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, - CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM, - CONFIG_DISPLAY_TOPIC); + private static class ValidURL implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + final String strValue = (String) value; + if (value == null || strValue.isEmpty()) { + // only validate non-empty locations + return; + } - return config; + try { + new URL(strValue); + } catch (final MalformedURLException exc) { + throw new ConfigException(name, value, "Value must be a valid URL"); + } + } } } \ No newline at end of file diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 04291f3..f635e2d 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -71,14 +72,13 @@ public MQSourceTask() { log.debug("Task props entry {} : {}", entry.getKey(), value); } - final String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE); - if (strBatchSize != null) { - batchSize = Integer.parseInt(strBatchSize); - } + final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props); + + batchSize = config.getInt(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE); // Construct a reader to interface with MQ reader = new JMSReader(); - reader.configure(props); + reader.configure(config); // Make a connection as an initial test of the configuration reader.connect(); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java index 8a3c40e..fd4c0b6 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java @@ -17,6 +17,8 @@ import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; import com.ibm.eventstreams.connect.mqsource.processor.JmsToKafkaHeaderConverter; + +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; @@ -29,7 +31,6 @@ import javax.jms.JMSException; import javax.jms.Message; import java.util.Map; -import java.util.Optional; /** * Builds Kafka Connect SourceRecords from messages. @@ -55,28 +56,27 @@ public enum KeyHeader { NONE, MESSAGE_ID, CORRELATION_ID, CORRELATION_ID_AS_BYTE log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(), props); - final String kh = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER); - if (kh != null) { - if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID)) { + final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props); + switch (String.valueOf(config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_KEY_HEADER))) { + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSMESSAGEID: keyheader = KeyHeader.MESSAGE_ID; log.debug("Setting Kafka record key from JMSMessageID header field"); - } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID)) { + break; + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONID: keyheader = KeyHeader.CORRELATION_ID; log.debug("Setting Kafka record key from JMSCorrelationID header field"); - } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES)) { + break; + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSCORRELATIONIDASBYTES: keyheader = KeyHeader.CORRELATION_ID_AS_BYTES; log.debug("Setting Kafka record key from JMSCorrelationIDAsBytes header field"); - } else if (kh.equals(MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION)) { + break; + case MQSourceConnector.CONFIG_VALUE_MQ_RECORD_BUILDER_KEY_HEADER_JMSDESTINATION: keyheader = KeyHeader.DESTINATION; log.debug("Setting Kafka record key from JMSDestination header field"); - } else { - log.error("Unsupported MQ record builder key header value {}", kh); - throw new ConnectException("Unsupported MQ record builder key header value"); - } + break; } - final String str = props.get(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); - copyJmsPropertiesFlag = Boolean.parseBoolean(Optional.ofNullable(str).orElse("false")); + copyJmsPropertiesFlag = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); jmsToKafkaHeaderConverter = new JmsToKafkaHeaderConverter(); log.trace("[{}] Exit {}.configure", Thread.currentThread().getId(), this.getClass().getName());