diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index cf15a9c85..f13644b9b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -303,7 +303,7 @@ public boolean shouldInsertRecord(SinkRecord kafkaSinkRecord, long currProcessed return true; } - if (kafkaSinkRecord.kafkaOffset() == currProcessedOffset) { + if (kafkaSinkRecord.kafkaOffset() > currProcessedOffset) { LOGGER.debug( "Insert record because kafkaOffset {} > currProcessedOffset {} for channel:{}", kafkaSinkRecord.kafkaOffset(),