From e796e00d29f944a01e324840d58eb341331a3b13 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 23 Jan 2019 11:24:54 +0000 Subject: [PATCH] Version 1.0.2 Stopping connector interrupts MQ connection and discards in-flight batch of messages to prevent duplication. Configuration of batch size, default 250. A few minor tidying changes. --- README.md | 15 +-- pom.xml | 2 +- .../connect/mqsource/MQSourceConnector.java | 13 ++- .../connect/mqsource/MQSourceTask.java | 97 +++++++++---------- .../mqsource/builders/BaseRecordBuilder.java | 4 +- .../builders/DefaultRecordBuilder.java | 3 +- .../mqsource/builders/JsonRecordBuilder.java | 4 +- 7 files changed, 69 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index 1927676..3747a30 100644 --- a/README.md +++ b/README.md @@ -155,14 +155,6 @@ You will need to put the public part of the client's certificate in the queue ma For troubleshooting, or to better understand the handshake performed by the IBM MQ Java client application in combination with your specific JSSE provider, you can enable debugging by setting `javax.net.debug=ssl` in the JVM environment. -## Performance and syncpoint limit -The connector uses a transacted JMS session to receive messages from MQ in syncpoint and periodically commits the in-flight transaction. This has the effect of batching messages together for improved efficiency. However, the frequency of committing transactions is controlled by the Kafka Connect framework rather than the connector. The connector is only able to receive up to the queue manager's maximum uncommitted message limit (typically 10000 messages) before committing. - -By default, Kafka Connect only commits every 60 seconds (10 seconds for the standalone worker), meaning that each task is limited to a rate of about 166 messages per second. You can increase the frequency of committing by using the `offset.flush.interval.ms` configuration in the worker configuration file. For example, if you set `offset.flush.interval.ms=5000`, the connector commits every 5 seconds increasing the maximum rate per task to about 2000 messages per second. - -If messages are being received faster than they can be committed, the connector prints a message `Uncommitted message limit reached` and sleeps for a short delay. You should use this as an indication to set the `offset.flush.interval.ms` to a lower value, or increase the number of tasks. - - ## Configuration The configuration options for the Kafka Connect source connector for IBM MQ are as follows: @@ -181,10 +173,11 @@ The configuration options for the Kafka Connect source connector for IBM MQ are | mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes | | mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite | | mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern | +| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | | topic | The name of the target Kafka topic | string | | Topic name | ### Using a CCDT file -Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the Kafka Connect source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required. +Some of the connection details for MQ can be provided in a [CCDT file](https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q016730_.htm) by setting `mq.ccdt.url` in the MQ source connector configuration file. If using a CCDT file the `mq.connection.name.list` and `mq.channel.name` configuration options are not required. ### Externalizing secrets [KIP 297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations) introduced a mechanism to externalize secrets to be used as configuration for Kafka connectors. @@ -215,7 +208,7 @@ mq.password=${file:mq-secret.properties:secret-key} ### Unable to connect to Kafka -You may receive an `org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed` error when trying to run the MQ Source Connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: `Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found.`, Java may be replacing the IP address of your cluster with the corresponding hostname in your `/etc/hosts` file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. `123.456.78.90 mycluster.icp`. To fix this, you can comment out this line in your `/etc/hosts` file. +You may receive an `org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed` error when trying to run the MQ source connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: `Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found.`, Java may be replacing the IP address of your cluster with the corresponding hostname in your `/etc/hosts` file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. `123.456.78.90 mycluster.icp`. To fix this, you can comment out this line in your `/etc/hosts` file. ## Support @@ -227,7 +220,7 @@ For issues relating specifically to this connector, please use the [GitHub issue ## License -Copyright 2017, 2018 IBM Corporation +Copyright 2017, 2018, 2019 IBM Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pom.xml b/pom.xml index e53a34b..969e8fe 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ com.ibm.eventstreams.connect kafka-connect-mq-source jar - 1.0.2-beta + 1.0.2 kafka-connect-mq-source IBM Corporation diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index bc7f80f..c868360 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -92,11 +92,17 @@ public class MQSourceConnector extends SourceConnector { public static final String CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME = "The distinguished name pattern of the TLS (SSL) peer."; public static final String CONFIG_DISPLAY_MQ_SSL_PEER_NAME = "SSL peer name"; + public static final String CONFIG_NAME_MQ_BATCH_SIZE = "mq.batch.size"; + public static final String CONFIG_DOCUMENTATION_MQ_BATCH_SIZE = "The maximum number of messages in a batch. A batch uses a single unit of work."; + public static final String CONFIG_DISPLAY_MQ_BATCH_SIZE = "Batch size"; + public static final int CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT = 250; + public static final int CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM = 1; + public static final String CONFIG_NAME_TOPIC = "topic"; public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic."; public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic"; - public static String VERSION = "1.0.2-beta"; + public static String VERSION = "1.0.2"; private Map configProps; @@ -229,6 +235,11 @@ public class MQSourceConnector extends SourceConnector { CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 13, Width.MEDIUM, CONFIG_DISPLAY_MQ_SSL_PEER_NAME); + config.define(CONFIG_NAME_MQ_BATCH_SIZE, Type.INT, CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT, + ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), Importance.LOW, + CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 14, Width.MEDIUM, + CONFIG_DISPLAY_MQ_BATCH_SIZE); + config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH, CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM, CONFIG_DISPLAY_TOPIC); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 53c394a..b62a73a 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -32,10 +32,12 @@ public class MQSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class); - private static int BATCH_SIZE = 250; // The maximum number of records returned per call to poll() + // The maximum number of records returned per call to poll() + private int batchSize = MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT; private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called + private AtomicBoolean receivingMessages = new AtomicBoolean(); // Whether currently receiving messages private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested private JMSReader reader; @@ -69,6 +71,11 @@ public MQSourceTask() { log.debug("Task props entry {} : {}", entry.getKey(), value); } + String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE); + if (strBatchSize != null) { + batchSize = Integer.parseInt(strBatchSize); + } + // Construct a reader to interface with MQ reader = new JMSReader(); reader.configure(props); @@ -106,22 +113,40 @@ public MQSourceTask() { int currentPollCycle = pollCycle.incrementAndGet(); log.debug("Starting poll cycle {}", currentPollCycle); - if (!stopNow.get()) { - log.info("Polling for records"); - SourceRecord src; - do { - // For the first message in the batch, wait a while if no message - src = reader.receive(messageCount == 0); - if (src != null) { - msgs.add(src); - messageCount++; - } - } while ((src != null) && (messageCount < BATCH_SIZE) && !stopNow.get()); + try { + receivingMessages.set(true); + + if (!stopNow.get()) { + log.info("Polling for records"); + SourceRecord src; + do { + // For the first message in the batch, wait a while if no message + src = reader.receive(messageCount == 0); + if (src != null) { + msgs.add(src); + messageCount++; + } + } while ((src != null) && (messageCount < batchSize) && !stopNow.get()); + } + else { + log.info("Stopping polling for records"); + } + } + finally { + receivingMessages.set(false); } synchronized(this) { if (messageCount > 0) { - batchCompleteSignal = new CountDownLatch(messageCount); + if (!stopNow.get()) { + batchCompleteSignal = new CountDownLatch(messageCount); + } + else { + // Discard this batch - we've rolled back when the connection to MQ was closed in stop() + log.debug("Discarding a batch of {} records as task is stopping", messageCount); + msgs.clear(); + batchCompleteSignal = null; + } } else { batchCompleteSignal = null; @@ -157,7 +182,6 @@ public void commit() throws InterruptedException { // batch complete signal directly. int currentPollCycle = pollCycle.get(); log.debug("Commit starting in poll cycle {}", currentPollCycle); - boolean willShutdown = false; if (lastCommitPollCycle == currentPollCycle) { @@ -171,25 +195,10 @@ public void commit() throws InterruptedException { batchCompleteSignal.countDown(); } } - else if (stopNow.get()) { - log.debug("Shutting down with empty batch after delay"); - willShutdown = true; - } } } else { lastCommitPollCycle = currentPollCycle; - - synchronized (this) { - if ((batchCompleteSignal == null) && stopNow.get()) { - log.debug("Shutting down with empty batch"); - willShutdown = true; - } - } - } - - if (willShutdown) { - shutdown(); } log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName()); @@ -210,16 +219,20 @@ else if (stopNow.get()) { stopNow.set(true); - boolean willShutdown = false; + boolean willClose = false; synchronized(this) { - if (batchCompleteSignal == null) { - willShutdown = true; + if (receivingMessages.get()) { + log.debug("Will close connection"); + willClose = true; } } - if (willShutdown) { - shutdown(); + if (willClose) { + // Close the connection to MQ to clean up + if (reader != null) { + reader.close(); + } } log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName()); @@ -247,20 +260,4 @@ else if (stopNow.get()) { log.trace("[{}] Exit {}.commitRecord", Thread.currentThread().getId(), this.getClass().getName()); } - - /** - *

- * Shuts down the task, releasing any resource held by the task. - *

- */ - private void shutdown() { - log.trace("[{}] Entry {}.shutdown", Thread.currentThread().getId(), this.getClass().getName()); - - // Close the connection to MQ to clean up - if (reader != null) { - reader.close(); - } - - log.trace("[{}] Exit {}.shutdown", Thread.currentThread().getId(), this.getClass().getName()); - } } \ No newline at end of file diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java index 839b3cd..fbcc13f 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java @@ -1,5 +1,5 @@ /** - * Copyright 2018 IBM Corporation + * Copyright 2018, 2019 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -115,6 +115,8 @@ SchemaAndValue getKey(JMSContext context, String topic, Message message) throws keySchema = Schema.OPTIONAL_BYTES_SCHEMA; key = message.getJMSCorrelationIDAsBytes(); break; + default: + break; } return new SchemaAndValue(keySchema, key); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java index b7d1847..142726c 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java @@ -1,5 +1,5 @@ /** - * Copyright 2017, 2018 IBM Corporation + * Copyright 2017, 2018, 2019 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index 593123b..0b1691d 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -1,5 +1,5 @@ /** - * Copyright 2017, 2018 IBM Corporation + * Copyright 2017, 2018, 2019 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,9 @@ import javax.jms.Message; import javax.jms.TextMessage; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; -import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory;