From 8cd02e87c8e7706f6192b3636d20845e9c879f6c Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 1 Nov 2023 14:46:02 -0700 Subject: [PATCH] revert tpchannel --- .../connector/internal/streaming/TopicPartitionChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 0908ac9b5..48e2e0ff9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -403,7 +403,7 @@ private boolean shouldIgnoreAddingRecordToBuffer( } // Don't ignore if we see the expected offset; otherwise log and skip - if (kafkaSinkRecord.kafkaOffset() == (currentProcessedOffset - 1)) { + if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) { LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(),