diff --git a/Dockerfile b/Dockerfile
index 3ab6cd1..956337d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
-COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.4.0-jar-with-dependencies.jar /opt/kafka/libs/
+COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.5.0-jar-with-dependencies.jar /opt/kafka/libs/
WORKDIR /opt/kafka
diff --git a/README.md b/README.md
index 0e62118..407c387 100644
--- a/README.md
+++ b/README.md
@@ -98,13 +98,13 @@ curl -X POST -H "Content-Type: application/json" http://localhost:8083/connector
This repository includes an example Dockerfile to run Kafka Connect in distributed mode. It also adds in the MQ sink connector as an available connector plugin. It uses the default `connect-distributed.properties` and `connect-log4j.properties` files.
1. `mvn clean package`
-1. `docker build -t kafkaconnect-with-mq-sink:1.4.0 .`
-1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0`
+1. `docker build -t kafkaconnect-with-mq-sink:1.5.0 .`
+1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.5.0`
**NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container like this:
``` shell
-docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0
+docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.5.0
```
To start the MQ connector, you can use `config/mq-sink.json` in this repository after replacing all placeholders and use a command like this:
@@ -312,6 +312,7 @@ The configuration options for the Kafka Connect sink connector for IBM MQ are as
| mq.message.builder.partition.property | The JMS message property to set from the Kafka partition | string | | Blank or valid JMS property name |
| mq.message.builder.offset.property | The JMS message property to set from the Kafka offset | string | | Blank or valid JMS property name |
| mq.reply.queue | The name of the reply-to queue | string | | MQ queue name or queue URI |
+| mq.retry.backoff.ms | Wait time, in milliseconds, before retrying after retriable exceptions | long | 60000 | [0,...] |
### Using a CCDT file
diff --git a/pom.xml b/pom.xml
index ebd015e..d92d8bf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
com.ibm.eventstreams.connect
kafka-connect-mq-sink
jar
- 1.4.0
+ 1.5.0
kafka-connect-mq-sink
IBM Corporation
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
index 611e9da..d5ca5ee 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkConnector.java
@@ -147,7 +147,12 @@ public class MQSinkConnector extends SinkConnector {
public static final String CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Whether to copy Kafka headers to JMS message properties.";
public static final String CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Copy Kafka headers to JMS message properties";
- public static String VERSION = "1.4.0";
+ public static final String CONFIG_NAME_MQ_RETRY_BACKOFF_MS = "mq.retry.backoff.ms";
+ public static final String CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS = "Time to wait, in milliseconds, before retrying after retriable exceptions";
+ public static final String CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS = "Retry backoff (ms)";
+
+
+ public static String VERSION = "1.5.0";
private Map configProps;
@@ -336,6 +341,10 @@ public class MQSinkConnector extends SinkConnector {
CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES, CONFIG_GROUP_MQ, 27, Width.SHORT,
CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES);
+ config.define(CONFIG_NAME_MQ_RETRY_BACKOFF_MS, Type.LONG, 60000, Range.between(0L, 99999999900L), Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS, CONFIG_GROUP_MQ, 28, Width.SHORT,
+ CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS);
+
return config;
}
}
\ No newline at end of file
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkTask.java
index 30fd655..2218798 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkTask.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkTask.java
@@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -32,6 +33,8 @@ public class MQSinkTask extends SinkTask {
private JMSWriter writer;
+ private long retryBackoffMs = 60000;
+
public MQSinkTask() {
}
@@ -61,6 +64,13 @@ public MQSinkTask() {
log.debug("Task props entry {} : {}", entry.getKey(), value);
}
+ // check if a custom retry time is provided
+ String retryBackoffMsStr = props.get(MQSinkConnector.CONFIG_NAME_MQ_RETRY_BACKOFF_MS);
+ if (retryBackoffMsStr != null) {
+ retryBackoffMs = Long.parseLong(retryBackoffMsStr);
+ }
+ log.debug("Setting retry backoff {}", retryBackoffMs);
+
// Construct a writer to interface with MQ
writer = new JMSWriter();
writer.configure(props);
@@ -85,12 +95,19 @@ public MQSinkTask() {
@Override public void put(Collection records) {
log.trace("[{}] Entry {}.put, records.size={}", Thread.currentThread().getId(), this.getClass().getName(), records.size());
- for (SinkRecord r: records) {
- log.debug("Putting record for topic {}, partition {} and offset {}", r.topic(), r.kafkaPartition(), r.kafkaOffset());
- writer.send(r);
+ try {
+ for (SinkRecord r: records) {
+ log.debug("Putting record for topic {}, partition {} and offset {}", r.topic(), r.kafkaPartition(), r.kafkaOffset());
+ writer.send(r);
+ }
+
+ writer.commit();
+ }
+ catch (RetriableException rte) {
+ context.timeout(retryBackoffMs);
+ throw rte;
}
- writer.commit();
log.trace("[{}] Exit {}.put", Thread.currentThread().getId(), this.getClass().getName());
}