Skip to content

Commit

Permalink
Fixed jms open connection issue (#73). Signed-off-by: Khetanshu Chauh…
Browse files Browse the repository at this point in the history
  • Loading branch information
khetanshu authored and dalelane committed Jan 4, 2023
1 parent 8505dfe commit ddf6bfd
Showing 1 changed file with 0 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class MQSourceTask extends SourceTask {
private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch
private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called
private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called
private AtomicBoolean receivingMessages = new AtomicBoolean(); // Whether currently receiving messages
private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested

private JMSReader reader;
Expand Down Expand Up @@ -114,8 +113,6 @@ public MQSourceTask() {
log.debug("Starting poll cycle {}", currentPollCycle);

try {
receivingMessages.set(true);

if (!stopNow.get()) {
log.info("Polling for records");
SourceRecord src;
Expand All @@ -133,7 +130,6 @@ public MQSourceTask() {
}
}
finally {
receivingMessages.set(false);
}

synchronized(this) {
Expand Down Expand Up @@ -219,16 +215,7 @@ public void commit() throws InterruptedException {

stopNow.set(true);

boolean willClose = false;

synchronized(this) {
if (receivingMessages.get()) {
log.debug("Will close connection");
willClose = true;
}
}

if (willClose) {
// Close the connection to MQ to clean up
if (reader != null) {
reader.close();
Expand Down

0 comments on commit ddf6bfd

Please sign in to comment.