From d61d16060696b575f37b112da49b1f1a0f803356 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 18 Oct 2023 11:48:42 -0700 Subject: [PATCH] manual formatting --- .../streaming/TopicPartitionChannel.java | 11 ++++++++--- .../streaming/SnowflakeSinkServiceV2IT.java | 2 -- .../streaming/TopicPartitionChannelTest.java | 17 +++++++++-------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 32744f507..4c38caae7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -112,7 +112,9 @@ public class TopicPartitionChannel { *
  • If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to * fetch offset from Snowflake again *
  • If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the - * channel and try to fetch offset from Snowflake again. Schematization purposefully fails the first buffer insert in order to alter the table, and then expects Kafka to resend data + * channel and try to fetch offset from Snowflake again. Schematization purposefully fails + * the first buffer insert in order to alter the table, and then expects Kafka to resend + * data * * *

    In both cases above, we ask Kafka to send back offsets, strictly from offset number after @@ -398,12 +400,15 @@ private boolean shouldIgnoreAddingRecordToBuffer( // channel if (!isOffsetResetInKafka && currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - LOGGER.debug("No offset registered in Snowflake and offset is not being reset, we can add this offset to buffer for channel:{}", currentProcessedOffset); + LOGGER.debug( + "No offset registered in Snowflake and offset is not being reset, we can add this offset" + + " to buffer for channel:{}", + currentProcessedOffset); return false; } // Don't ignore if we see the expected offset; otherwise log and skip - if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset ) == 1L) { + if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) { LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(), diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index b97c58a99..64d70f49b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -33,8 +33,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import net.snowflake.ingest.internal.apache.commons.math3.analysis.function.Sin; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 319b42e60..d44d39e7f 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -402,7 +402,8 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { int expectedOpenChannelCount = 0; int expectedGetOffsetCount = 0; - // setup mocks to fail first insert and return two null snowflake offsets (open channel and failed insert) before succeeding + // setup mocks to fail first insert and return two null snowflake offsets (open channel and + // failed insert) before succeeding Mockito.when( mockStreamingChannel.insertRows( ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) @@ -453,13 +454,13 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { .getLatestCommittedOffsetToken(); // Now, it should be successful -// Mockito.when( -// mockStreamingChannel.insertRows( -// ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) -// .thenReturn(new InsertValidationResponse()); -// -// Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) -// .thenReturn(Long.toString(noOfRecords - 1)); + // Mockito.when( + // mockStreamingChannel.insertRows( + // ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) + // .thenReturn(new InsertValidationResponse()); + // + // Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) + // .thenReturn(Long.toString(noOfRecords - 1)); // Retry the insert again, now everything should be ingested and the offset token should be // noOfRecords-1