diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 0908ac9b5..48e2e0ff9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -403,7 +403,7 @@ private boolean shouldIgnoreAddingRecordToBuffer( } // Don't ignore if we see the expected offset; otherwise log and skip - if (kafkaSinkRecord.kafkaOffset() == (currentProcessedOffset - 1)) { + if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) { LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(),