Skip to content

Commit

Permalink
Merge pull request #6 from ibm-messaging/master
Browse files Browse the repository at this point in the history
Update to master
  • Loading branch information
AndrewJSchofield authored May 22, 2020
2 parents 4750d61 + 6cc710c commit 0722e1c
Showing 1 changed file with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017, 2018, 2019 IBM Corporation
* Copyright 2017, 2020 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,14 +83,14 @@ public MQSinkTask() {
* @param records the set of records to send
*/
@Override public void put(Collection<SinkRecord> records) {
log.trace("[{}] Entry {}.put", Thread.currentThread().getId(), this.getClass().getName());
log.trace("[{}] Entry {}.put, records.size={}", Thread.currentThread().getId(), this.getClass().getName(), records.size());

for (SinkRecord r: records) {
log.debug("Putting record for topic {}, partition {} and offset {}", r.topic(), r.kafkaPartition(), r.kafkaOffset());
writer.send(r);
}

context.requestCommit();
writer.commit();
log.trace("[{}] Exit {}.put", Thread.currentThread().getId(), this.getClass().getName());
}

Expand All @@ -110,7 +110,6 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
log.debug("Flushing up to topic {}, partition {} and offset {}", tp.topic(), tp.partition(), om.offset());
}

writer.commit();
log.trace("[{}] Exit {}.flush", Thread.currentThread().getId(), this.getClass().getName());
}

Expand Down

0 comments on commit 0722e1c

Please sign in to comment.