From b25430853af3824ac32d18b2b9856cfff08c4dae Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 31 Oct 2023 17:11:27 -0700 Subject: [PATCH] current not current+1 --- .../connector/internal/streaming/TopicPartitionChannel.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index b46f4f3ce7..215b321ea8 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -341,7 +341,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { // Accept the incoming record only if we don't have a valid offset token at server side, or the // incoming record offset is 1 + the processed offset if (currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE - || kafkaSinkRecord.kafkaOffset() >= currentProcessedOffset + 1) { + || kafkaSinkRecord.kafkaOffset() >= currentProcessedOffset) { StreamingBuffer copiedStreamingBuffer = null; bufferLock.lock(); try { @@ -403,7 +403,7 @@ private boolean shouldIgnoreAddingRecordToBuffer( } // Don't ignore if we see the expected offset; otherwise log and skip - if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) { + if (kafkaSinkRecord.kafkaOffset() == currentProcessedOffset) { LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(), @@ -957,7 +957,7 @@ private void resetChannelMetadataAfterRecovery( // Need to update the in memory processed offset otherwise if same offset is send again, it // might get rejected. this.offsetPersistedInSnowflake.set(offsetRecoveredFromSnowflake); - this.processedOffset.set(offsetRecoveredFromSnowflake); + this.processedOffset.set(offsetToResetInKafka); // State that there was some exception and only clear that state when we have received offset // starting from offsetRecoveredFromSnowflake