diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 1bf1c8770..32744f507 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -112,7 +112,7 @@ public class TopicPartitionChannel { *
  • If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to * fetch offset from Snowflake again *
  • If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the - * channel and try to fetch offset from Snowflake again + * channel and try to fetch offset from Snowflake again. Schematization purposefully fails the first buffer insert in order to alter the table, and then expects Kafka to resend data * * *

    In both cases above, we ask Kafka to send back offsets, strictly from offset number after @@ -124,7 +124,7 @@ public class TopicPartitionChannel { *

    This boolean is used to indicate that we reset offset in kafka and we will only buffer once * we see the offset which is one more than an offset present in Snowflake. */ - private boolean isOffsetResetInKafka; + private boolean isOffsetResetInKafka = false; // TODO @rcheng question: atomic? private final SnowflakeStreamingIngestClient streamingIngestClient; @@ -389,19 +389,21 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { * * @param kafkaSinkRecord Record to check for above condition only in case of failures * (isOffsetResetInKafka = true) + * @param currentProcessedOffset The current processed offset * @return true if this record can be skipped to add into buffer, false otherwise. */ private boolean shouldIgnoreAddingRecordToBuffer( - SinkRecord kafkaSinkRecord, long currentProcessedOffset) { - // Don't skip rows if there is no offset reset or there is no offset token information in the + SinkRecord kafkaSinkRecord, final long currentProcessedOffset) { + // Don't skip rows if there is no offset reset and there is no offset token information in the // channel if (!isOffsetResetInKafka - || currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + && currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + LOGGER.debug("No offset registered in Snowflake and offset is not being reset, we can add this offset to buffer for channel:{}", currentProcessedOffset); return false; } // Don't ignore if we see the expected offset; otherwise log and skip - if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) { + if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset ) == 1L) { LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(), diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 601fb5922..319b42e60 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -397,15 +397,23 @@ public void testFetchOffsetTokenWithRetry_RuntimeException() { /* Only SFExceptions goes into fallback -> reopens channel, fetch offsetToken and throws Appropriate exception */ @Test public void testInsertRows_SuccessAfterReopenChannel() throws Exception { + final int noOfRecords = 5; + int expectedInsertRowsCount = 0; + int expectedOpenChannelCount = 0; + int expectedGetOffsetCount = 0; + + // setup mocks to fail first insert and return two null snowflake offsets (open channel and failed insert) before succeeding Mockito.when( mockStreamingChannel.insertRows( ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) - .thenThrow(SF_EXCEPTION); - - // get null from snowflake first time it is called and null for second time too since insert - // rows was failure - Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()).thenReturn(null); + .thenThrow(SF_EXCEPTION) + .thenReturn(new InsertValidationResponse()); + Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) + .thenReturn(null) + .thenReturn(null) + .thenReturn(Long.toString(noOfRecords - 1)); + // create tpchannel TopicPartitionChannel topicPartitionChannel = new TopicPartitionChannel( mockStreamingClient, @@ -417,37 +425,56 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockTelemetryService); - final int noOfRecords = 5; - // Since record 0 was not able to ingest, all records in this batch will not be added into the - // buffer. + expectedOpenChannelCount++; + expectedGetOffsetCount++; + + // verify initial mock counts after tpchannel creation + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) + .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) + .openChannel(ArgumentMatchers.any()); + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) + .getLatestCommittedOffsetToken(); + + // Test inserting record 0, which should fail to ingest so the other records are ignored List records = TestUtils.createJsonStringSinkRecords(0, noOfRecords, TOPIC, PARTITION); - records.forEach(topicPartitionChannel::insertRecordToBuffer); + expectedInsertRowsCount++; + expectedOpenChannelCount++; + expectedGetOffsetCount++; - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords)) + // verify mocks only tried ingesting once + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); - Mockito.verify(mockStreamingClient, Mockito.times(noOfRecords + 1)) + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) .openChannel(ArgumentMatchers.any()); - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords + 1)) + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) .getLatestCommittedOffsetToken(); // Now, it should be successful - Mockito.when( - mockStreamingChannel.insertRows( - ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) - .thenReturn(new InsertValidationResponse()); - - Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) - .thenReturn(Long.toString(noOfRecords - 1)); +// Mockito.when( +// mockStreamingChannel.insertRows( +// ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) +// .thenReturn(new InsertValidationResponse()); +// +// Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) +// .thenReturn(Long.toString(noOfRecords - 1)); // Retry the insert again, now everything should be ingested and the offset token should be // noOfRecords-1 records.forEach(topicPartitionChannel::insertRecordToBuffer); - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords * 2)) - .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); - Assert.assertEquals(noOfRecords - 1, topicPartitionChannel.fetchOffsetTokenWithRetry()); + expectedInsertRowsCount += noOfRecords; + expectedGetOffsetCount++; + + // verify mocks ingested each record + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) + .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) + .openChannel(ArgumentMatchers.any()); + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) + .getLatestCommittedOffsetToken(); } @Test