Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: delegate config validation to Kafka Connect #130

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,6 @@
</executions>
</plugin>

<!-- add the src/integration folder as a test folder, which lets us keep -->
<!-- tests that have a dependency on testcontainers separate from pure -->
<!-- unit tests with no external dependency -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>add-test-source</id>
<phase>process-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/integration/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- generate test code coverage report -->
<plugin>
Expand Down Expand Up @@ -323,5 +301,12 @@
</executions>
</plugin>
</plugins>

<!-- some integration tests use the custom mqsc file in the resources directory -->
<testResources>
<testResource>
<directory>src/integration/resources</directory>
</testResource>
</testResources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.ClassRule;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.utility.MountableFile;

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.jms.JmsConnectionFactory;
Expand All @@ -53,6 +54,7 @@ public class AbstractJMSContextIT {
.withEnv("LICENSE", "accept")
.withEnv("MQ_QMGR_NAME", QMGR_NAME)
.withEnv("MQ_ENABLE_EMBEDDED_WEB_SERVER", "false")
.withCopyFileToContainer(MountableFile.forClasspathResource("no-auth-qmgr.mqsc"), "/etc/mqm/99-no-auth-qmgr.mqsc")
.withExposedPorts(1414);

private JMSContext jmsContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private Map<String, String> getConnectorProps() {
connectorProps.put("mq.password", APP_PASSWORD);
connectorProps.put("mq.message.body.jms", "false");
connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorProps.put("topic", "mytopic");
return connectorProps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private Map<String, String> createDefaultConnectorProperties() {
props.put("mq.channel.name", getChannelName());
props.put("mq.queue", MQ_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("topic", "mytopic");
return props;
}

Expand All @@ -82,6 +83,7 @@ public void verifyJmsTextMessages() throws Exception {
final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(2, kafkaMessages.size());
for (final SourceRecord kafkaMessage : kafkaMessages) {
assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());

Expand Down Expand Up @@ -116,6 +118,7 @@ public void verifyJmsJsonMessages() throws Exception {
assertEquals(5, kafkaMessages.size());
for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());

Expand Down Expand Up @@ -148,6 +151,7 @@ public void verifyJmsMessageHeaders() throws Exception {
final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(1, kafkaMessages.size());
final SourceRecord kafkaMessage = kafkaMessages.get(0);
assertEquals("mytopic", kafkaMessage.topic());
assertNull(kafkaMessage.key());
assertNull(kafkaMessage.valueSchema());

Expand Down
3 changes: 3 additions & 0 deletions src/integration/resources/no-auth-qmgr.mqsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER QMGR CHLAUTH(DISABLED)
ALTER QMGR CONNAUTH(' ')
REFRESH SECURITY TYPE(CONNAUTH)
139 changes: 52 additions & 87 deletions src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.JMSConsumer;
Expand All @@ -45,6 +44,8 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -61,7 +62,7 @@ public class JMSReader {

// Configs
private String userName;
private String password;
private Password password;
private String topic;
private boolean messageBodyJms;

Expand Down Expand Up @@ -93,116 +94,80 @@ public JMSReader() {
*
* @throws ConnectException Operation failed and connector should stop.
*/
public void configure(final Map<String, String> props) {
public void configure(final AbstractConfig config) {
log.trace("[{}] Entry {}.configure, props={}", Thread.currentThread().getId(), this.getClass().getName(),
props);

final String queueManager = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER);
final String connectionMode = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE);
final String connectionNameList = props.get(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST);
final String channelName = props.get(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME);
final String queueName = props.get(MQSourceConnector.CONFIG_NAME_MQ_QUEUE);
final String userName = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
final String password = props.get(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);
final String ccdtUrl = props.get(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);
final String builderClass = props.get(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
final String mbj = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
final String mdr = props.get(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ);
final String sslCipherSuite = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE);
final String sslPeerName = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME);
final String sslKeystoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
final String sslKeystorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
final String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
final String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
final String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP);
final String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
final String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);

if (useIBMCipherMappings != null) {
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings);
}
config);

int transportType = WMQConstants.WMQ_CM_CLIENT;
if (connectionMode != null) {
if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
transportType = WMQConstants.WMQ_CM_CLIENT;
} else if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_BINDINGS)) {
transportType = WMQConstants.WMQ_CM_BINDINGS;
} else {
log.error("Unsupported MQ connection mode {}", connectionMode);
throw new ConnectException("Unsupported MQ connection mode");
}
}
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings",
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS).toString());

final int transportType =
config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_MODE)
.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT) ?
WMQConstants.WMQ_CM_CLIENT :
WMQConstants.WMQ_CM_BINDINGS;

try {
mqConnFactory = new MQConnectionFactory();
mqConnFactory.setTransportType(transportType);
mqConnFactory.setQueueManager(queueManager);
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
if (useMQCSP != null) {
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
Boolean.parseBoolean(useMQCSP));
}
mqConnFactory.setQueueManager(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER));
mqConnFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP,
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP));

if (transportType == WMQConstants.WMQ_CM_CLIENT) {
final String ccdtUrl = config.getString(MQSourceConnector.CONFIG_NAME_MQ_CCDT_URL);

if (ccdtUrl != null) {
final URL ccdtUrlObject;
try {
ccdtUrlObject = new URL(ccdtUrl);
} catch (final MalformedURLException e) {
log.error("MalformedURLException exception {}", e);
throw new ConnectException("CCDT file url invalid", e);
}
mqConnFactory.setCCDTURL(ccdtUrlObject);
mqConnFactory.setCCDTURL(new URL(ccdtUrl));
} else {
mqConnFactory.setConnectionNameList(connectionNameList);
mqConnFactory.setChannel(channelName);
mqConnFactory.setConnectionNameList(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CONNECTION_NAME_LIST));
mqConnFactory.setChannel(config.getString(MQSourceConnector.CONFIG_NAME_MQ_CHANNEL_NAME));
}

if (sslCipherSuite != null) {
mqConnFactory.setSSLCipherSuite(sslCipherSuite);
if (sslPeerName != null) {
mqConnFactory.setSSLPeerName(sslPeerName);
}
}
mqConnFactory.setSSLCipherSuite(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_CIPHER_SUITE));
mqConnFactory.setSSLPeerName(config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_PEER_NAME));


final String sslKeystoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_LOCATION);
final Password sslKeystorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_KEYSTORE_PASSWORD);
final String sslTruststoreLocation = config.getString(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
final Password sslTruststorePassword = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
if (sslKeystoreLocation != null || sslTruststoreLocation != null) {
final SSLContext sslContext = buildSslContext(sslKeystoreLocation, sslKeystorePassword,
sslTruststoreLocation, sslTruststorePassword);
mqConnFactory.setSSLSocketFactory(sslContext.getSocketFactory());
}
}

queue = new MQQueue(queueName);
queue = new MQQueue(config.getString(MQSourceConnector.CONFIG_NAME_MQ_QUEUE));

this.userName = userName;
this.password = password;
userName = config.getString(MQSourceConnector.CONFIG_NAME_MQ_USER_NAME);
password = config.getPassword(MQSourceConnector.CONFIG_NAME_MQ_PASSWORD);

this.messageBodyJms = false;
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_MQ);
if (mbj != null) {
if (Boolean.parseBoolean(mbj)) {
this.messageBodyJms = true;
queue.setMessageBodyStyle(WMQConstants.WMQ_MESSAGE_BODY_JMS);
}
}
messageBodyJms = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS);
queue.setMessageBodyStyle(messageBodyJms ?
WMQConstants.WMQ_MESSAGE_BODY_JMS :
WMQConstants.WMQ_MESSAGE_BODY_MQ);

if (mdr != null) {
if (Boolean.parseBoolean(mdr)) {
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED, true);
}
}
queue.setBooleanProperty(WMQConstants.WMQ_MQMD_READ_ENABLED,
config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_MQMD_READ));

topic = config.getString(MQSourceConnector.CONFIG_NAME_TOPIC);

this.topic = topic;
} catch (JMSException | JMSRuntimeException jmse) {
log.error("JMS exception {}", jmse);
throw new ConnectException(jmse);
} catch (final MalformedURLException e) {
log.error("MalformedURLException exception {}", e);
throw new ConnectException("CCDT file url invalid", e);
}


final String builderClass = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER);
try {
final Class<? extends RecordBuilder> c = Class.forName(builderClass).asSubclass(RecordBuilder.class);
builder = c.newInstance();
builder.configure(props);
builder.configure(config.originalsStrings());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to pass the strings rather than the config itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put that in the commit message:

Note that I did have to leave the existing Map<String, String> parameter in the record builder as that is a public interface that users have subclassed. This should ideally be a Kafka Connect object rather than a raw map, but in the interest of not breaking existing subclasses, I've left it as a map. This does mean I'm converting the properties map to a config object twice, but as this is only at startup I think the performance cost will be minimal.

It is a bit clunky and frustrating - in an ideal world we wouldn't be passing untyped strings around, but without a breaking change I felt like we were stuck with it for now.

} catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException
| NullPointerException exc) {
log.error("Could not instantiate message builder {}", builderClass);
Expand All @@ -220,7 +185,7 @@ public void connect() {

try {
if (userName != null) {
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
} else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
Expand Down Expand Up @@ -373,7 +338,7 @@ private boolean connectInternal() {
log.trace("[{}] Entry {}.connectInternal", Thread.currentThread().getId(), this.getClass().getName());
try {
if (userName != null) {
jmsCtxt = mqConnFactory.createContext(userName, password, JMSContext.SESSION_TRANSACTED);
jmsCtxt = mqConnFactory.createContext(userName, password.value(), JMSContext.SESSION_TRANSACTED);
} else {
jmsCtxt = mqConnFactory.createContext(JMSContext.SESSION_TRANSACTED);
}
Expand Down Expand Up @@ -498,8 +463,8 @@ private ConnectException handleException(final Throwable exc) {
return new ConnectException(exc);
}

private SSLContext buildSslContext(final String sslKeystoreLocation, final String sslKeystorePassword,
final String sslTruststoreLocation, final String sslTruststorePassword) {
private SSLContext buildSslContext(final String sslKeystoreLocation, final Password sslKeystorePassword,
final String sslTruststoreLocation, final Password sslTruststorePassword) {
log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());

try {
Expand All @@ -508,7 +473,7 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin

if (sslKeystoreLocation != null) {
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.value().toCharArray());
keyManagers = kmf.getKeyManagers();
}

Expand All @@ -530,12 +495,12 @@ private SSLContext buildSslContext(final String sslKeystoreLocation, final Strin
}
}

private KeyStore loadKeyStore(final String location, final String password) throws GeneralSecurityException {
private KeyStore loadKeyStore(final String location, final Password password) throws GeneralSecurityException {
log.trace("[{}] Entry {}.loadKeyStore", Thread.currentThread().getId(), this.getClass().getName());

try (final InputStream ksStr = new FileInputStream(location)) {
final KeyStore ks = KeyStore.getInstance("JKS");
ks.load(ksStr, password.toCharArray());
ks.load(ksStr, password.value().toCharArray());

log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(),
this.getClass().getName(), ks);
Expand Down
Loading
Loading