Skip to content

Commit

Permalink
refactor: delegate config validation to Kafka Connect (#130)
Browse files Browse the repository at this point in the history
The connector was doing a lot of it's own processing of config
options - type-checking, null-checking, casting, etc.

This makes the connector-specific aspects of the code harder to
follow, and it also makes it easier for us to miss validating some
required config options (for example, we weren't checking that
topic names were provided).

This commit moves all of this out of the connector, and makes it
the responsibility of the Kafka Connect framework. This removes a
lot of unnecessary checking and type checking/casting from the
connector code, as the source task and jms reader class can assume
that they will be given a validated config.

Note that I did have to leave the existing Map<String, String>
parameter in the record builder as that is a public interface that
users have subclassed. This should ideally be a Kafka Connect
object rather than a raw map, but in the interest of not breaking
existing subclasses, I've left it as a map. This does mean I'm
converting the properties map to a config object twice, but as
this is only at startup I think the performance cost will be
minimal.

Signed-off-by: Dale Lane <[email protected]>
  • Loading branch information
dalelane authored Feb 7, 2024
1 parent d93b499 commit 6d2b939
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 221 deletions.
29 changes: 7 additions & 22 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,6 @@
</executions>
</plugin>

<!-- add the src/integration folder as a test folder, which lets us keep -->
<!-- tests that have a dependency on testcontainers separate from pure -->
<!-- unit tests with no external dependency -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>add-test-source</id>
<phase>process-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/integration/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- generate test code coverage report -->
<plugin>
Expand Down Expand Up @@ -323,5 +301,12 @@
</executions>
</plugin>
</plugins>

<!-- some integration tests use the custom mqsc file in the resources directory -->
<testResources>
<testResource>
<directory>src/integration/resources</directory>
</testResource>
</testResources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.ClassRule;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.utility.MountableFile;

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.jms.JmsConnectionFactory;
Expand All @@ -53,6 +54,7 @@ public class AbstractJMSContextIT {
.withEnv("LICENSE", "accept")
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
.withCopyFileToContainer(MountableFile.forClasspathResource("no-auth-qmgr.mqsc"), "/etc/mqm/99-no-auth-qmgr.mqsc")
.withExposedPorts(1414);

private JMSContext jmsContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private Map<String, String> getConnectorProps() {
connectorProps.put("mq.password", APP_PASSWORD);
connectorProps.put("mq.message.body.jms", "false");
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorProps.put("topic", "mytopic");
return connectorProps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private Map<String, String> createDefaultConnectorProperties() {
props.put("mq.channel.name", getChannelName());
props.put("mq.queue", MQ_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("topic", "mytopic");
return props;
}

Expand All @@ -82,6 +83,7 @@ public void verifyJmsTextMessages() throws Exception {
final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(2, kafkaMessages.size());
for (final SourceRecord kafkaMessage : kafkaMessages) {
assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());

Expand Down Expand Up @@ -116,6 +118,7 @@ public void verifyJmsJsonMessages() throws Exception {
assertEquals(5, kafkaMessages.size());
for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());

Expand Down Expand Up @@ -148,6 +151,7 @@ public void verifyJmsMessageHeaders() throws Exception {
final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());
final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());

Expand Down
3 changes: 3 additions & 0 deletions src/integration/resources/no-auth-qmgr.mqsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER QMGR CHLAUTH(DISABLED)
ALTER QMGR CONNAUTH(' ')
REFRESH SECURITY TYPE(CONNAUTH)
139 changes: 52 additions & 87 deletions src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.JMSConsumer;
Expand All @@ -45,6 +44,8 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -61,7 +62,7 @@ public class JMSReader {

// Configs
private String userName;
private String password;
private Password password;
private String topic;
private boolean messageBodyJms;

Expand Down Expand Up @@ -93,116 +94,80 @@ public JMSReader() {
*
* @throws ConnectException Operation failed and connector should stop.
*/
public void configure(final Map<String, String> props) {
public void configure(final AbstractConfig config) {
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
props);

final String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER);
final String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE);
final String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST);
final String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME);
final String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
final String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
final String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
final String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
final String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
final String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
final String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ);
final String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
final String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
final String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
final String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
final String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
final String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
final String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP);
final String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
final String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);

if (useIBMCipherMappings != null) {
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings);
}
config);

int transportType = WMQConstants.WMQ_CM_CLIENT;
if (connectionMode != null) {
if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
transportType = WMQConstants.WMQ_CM_CLIENT;
} else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) {
transportType = WMQConstants.WMQ_CM_BINDINGS;
} else {
log.error("Unsupported MQ connection mode {}", connectionMode);
throw new ConnectException("Unsupported MQ connection mode");
}
}
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());

final int transportType =
config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE)
.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT) ?
WMQConstants.WMQ_CM_CLIENT :
WMQConstants.WMQ_CM_BINDINGS;

try {
mqConnFactory = new MQConnectionFactory();
mqConnFactory.setTransportType(transportType);
mqConnFactory.setQueueManager(queueManager);
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
if (useMQCSP != null) {
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
Boolean.parseBoolean(useMQCSP));
}
mqConnFactory.setQueueManager(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER));
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP));

if (transportType == WMQConstants.WMQ_CM_CLIENT) {
final String ccdtUrl = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);

if (ccdtUrl != null) {
final URL ccdtUrlObject;
try {
ccdtUrlObject = new URL(ccdtUrl);
} catch (final MalformedURLException e) {
log.error("MalformedURLException exception {}", e);
throw new ConnectException("CCDT file url invalid", e);
}
mqConnFactory.setCCDTURL(ccdtUrlObject);
mqConnFactory.setCCDTURL(new URL(ccdtUrl));
} else {
mqConnFactory.setConnectionNameList(connectionNameList);
mqConnFactory.setChannel(channelName);
mqConnFactory.setConnectionNameList(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST));
mqConnFactory.setChannel(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME));
}

if (sslCipherSuite != null) {
mqConnFactory.setSSLCipherSuite(sslCipherSuite);
if (sslPeerName != null) {
mqConnFactory.setSSLPeerName(sslPeerName);
}
}
mqConnFactory.setSSLCipherSuite(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE));
mqConnFactory.setSSLPeerName(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME));


final String sslKeystoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
final Password sslKeystorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
final String sslTruststoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
final Password sslTruststorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
if (sslKeystoreLocation != null || sslTruststoreLocation != null) {
final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword,
sslTruststoreLocation, sslTruststorePassword);
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
}
}

queue = new MQQueue(queueName);
queue = new MQQueue(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE));

this.userName = userName;
this.password = password;
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);

this.messageBodyJms = false;
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
if (mbj != null) {
if (Boolean.parseBoolean(mbj)) {
this.messageBodyJms = true;
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_JMS);
}
}
messageBodyJms = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
queue.setMessageBodyStyle(messageBodyJms ?
WMQConstants.WMQ_MESSAGE_BODY_JMS :
WMQConstants.WMQ_MESSAGE_BODY_MQ);

if (mdr != null) {
if (Boolean.parseBoolean(mdr)) {
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true);
}
}
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED,
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ));

topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);

this.topic = topic;
} catch (JMSException | JMSRuntimeException jmse) {
log.error("JMS exception {}", jmse);
throw new ConnectException(jmse);
} catch (final MalformedURLException e) {
log.error("MalformedURLException exception {}", e);
throw new ConnectException("CCDT file url invalid", e);
}


final String builderClass = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
try {
final Class<? extends RecordBuilder> c = Class.forName(builderClass).asSubclass(RecordBuilder.class);
builder = c.newInstance();
builder.configure(props);
builder.configure(config.originalsStrings());
} catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException
| NullPointerException exc) {
log.error("Could not instantiate message builder {}", builderClass);
Expand All @@ -220,7 +185,7 @@ public void connect() {

try {
if (userName != null) {
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
} else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
Expand Down Expand Up @@ -373,7 +338,7 @@ private boolean connectInternal() {
log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName());
try {
if (userName != null) {
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
} else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
Expand Down Expand Up @@ -498,8 +463,8 @@ private ConnectException handleException(final Throwable exc) {
return new ConnectException(exc);
}

private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword,
final String sslTruststoreLocation, final String sslTruststorePassword) {
private SSLContext buildSslContext(final String sslKeystoreLocation, final Password sslKeystorePassword,
final String sslTruststoreLocation, final Password sslTruststorePassword) {
log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());

try {
Expand All @@ -508,7 +473,7 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin

if (sslKeystoreLocation != null) {
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.value().toCharArray());
keyManagers = kmf.getKeyManagers();
}

Expand All @@ -530,12 +495,12 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin
}
}

private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException {
private KeyStore loadKeyStore(final String location, final Password password) throws GeneralSecurityException {
log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName());

try (final InputStream ksStr = new FileInputStream(location)) {
final KeyStore ks = KeyStore.getInstance("JKS");
ks.load(ksStr, password.toCharArray());
ks.load(ksStr, password.value().toCharArray());

log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(),
this.getClass().getName(), ks);
Expand Down
Loading

0 comments on commit 6d2b939

Please sign in to comment.