From ddf6bfd1f49ca292b694735ce672e9c1d45223ef Mon Sep 17 00:00:00 2001 From: khetanshu Date: Sun, 6 Dec 2020 18:02:13 -0800 Subject: [PATCH] Fixed jms open connection issue (#73). Signed-off-by: Khetanshu Chauhan [khetanshu.chauhan@gmail.com](mailto:khetanshu.chauhan@gmail.com) --- .../eventstreams/connect/mqsource/MQSourceTask.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java index 071737c..c8177ef 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java @@ -37,7 +37,6 @@ public class MQSourceTask extends SourceTask { private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called - private AtomicBoolean receivingMessages = new AtomicBoolean(); // Whether currently receiving messages private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested private JMSReader reader; @@ -114,8 +113,6 @@ public MQSourceTask() { log.debug("Starting poll cycle {}", currentPollCycle); try { - receivingMessages.set(true); - if (!stopNow.get()) { log.info("Polling for records"); SourceRecord src; @@ -133,7 +130,6 @@ public MQSourceTask() { } } finally { - receivingMessages.set(false); } synchronized(this) { @@ -219,16 +215,7 @@ public void commit() throws InterruptedException { stopNow.set(true); - boolean willClose = false; - synchronized(this) { - if (receivingMessages.get()) { - log.debug("Will close connection"); - willClose = true; - } - } - - if (willClose) { // Close the connection to MQ to clean up if (reader != null) { reader.close();