Skip to content

Commit

Permalink
and not or
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 18, 2023
1 parent 9de5460 commit 53db470
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public class TopicPartitionChannel {
* <li>If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to
* fetch offset from Snowflake again
* <li>If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the
* channel and try to fetch offset from Snowflake again
* channel and try to fetch offset from Snowflake again. Schematization purposefully fails the first buffer insert in order to alter the table, and then expects Kafka to resend data
* </ol>
*
* <p>In both cases above, we ask Kafka to send back offsets, strictly from offset number after
Expand All @@ -124,7 +124,7 @@ public class TopicPartitionChannel {
* <p>This boolean is used to indicate that we reset offset in kafka and we will only buffer once
* we see the offset which is one more than an offset present in Snowflake.
*/
private boolean isOffsetResetInKafka;
private boolean isOffsetResetInKafka = false; // TODO @rcheng question: atomic?

private final SnowflakeStreamingIngestClient streamingIngestClient;

Expand Down Expand Up @@ -389,19 +389,21 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
*
* @param kafkaSinkRecord Record to check for above condition only in case of failures
* (isOffsetResetInKafka = true)
* @param currentProcessedOffset The current processed offset
* @return true if this record can be skipped to add into buffer, false otherwise.
*/
private boolean shouldIgnoreAddingRecordToBuffer(
SinkRecord kafkaSinkRecord, long currentProcessedOffset) {
// Don't skip rows if there is no offset reset or there is no offset token information in the
SinkRecord kafkaSinkRecord, final long currentProcessedOffset) {
// Don't skip rows if there is no offset reset and there is no offset token information in the
// channel
if (!isOffsetResetInKafka
|| currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
&& currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.debug("No offset registered in Snowflake and offset is not being reset, we can add this offset to buffer for channel:{}", currentProcessedOffset);
return false;
}

// Don't ignore if we see the expected offset; otherwise log and skip
if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) {
if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset ) == 1L) {
LOGGER.debug(
"Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}",
kafkaSinkRecord.kafkaOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,23 @@ public void testFetchOffsetTokenWithRetry_RuntimeException() {
/* Only SFExceptions goes into fallback -> reopens channel, fetch offsetToken and throws Appropriate exception */
@Test
public void testInsertRows_SuccessAfterReopenChannel() throws Exception {
final int noOfRecords = 5;
int expectedInsertRowsCount = 0;
int expectedOpenChannelCount = 0;
int expectedGetOffsetCount = 0;

// setup mocks to fail first insert and return two null snowflake offsets (open channel and failed insert) before succeeding
Mockito.when(
mockStreamingChannel.insertRows(
ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
.thenThrow(SF_EXCEPTION);

// get null from snowflake first time it is called and null for second time too since insert
// rows was failure
Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()).thenReturn(null);
.thenThrow(SF_EXCEPTION)
.thenReturn(new InsertValidationResponse());
Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken())
.thenReturn(null)
.thenReturn(null)
.thenReturn(Long.toString(noOfRecords - 1));

// create tpchannel
TopicPartitionChannel topicPartitionChannel =
new TopicPartitionChannel(
mockStreamingClient,
Expand All @@ -417,37 +425,56 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception {
mockKafkaRecordErrorReporter,
mockSinkTaskContext,
mockTelemetryService);
final int noOfRecords = 5;
// Since record 0 was not able to ingest, all records in this batch will not be added into the
// buffer.
expectedOpenChannelCount++;
expectedGetOffsetCount++;

// verify initial mock counts after tpchannel creation
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));
Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount))
.openChannel(ArgumentMatchers.any());
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount))
.getLatestCommittedOffsetToken();

// Test inserting record 0, which should fail to ingest so the other records are ignored
List<SinkRecord> records =
TestUtils.createJsonStringSinkRecords(0, noOfRecords, TOPIC, PARTITION);

records.forEach(topicPartitionChannel::insertRecordToBuffer);
expectedInsertRowsCount++;
expectedOpenChannelCount++;
expectedGetOffsetCount++;

Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords))
// verify mocks only tried ingesting once
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));
Mockito.verify(mockStreamingClient, Mockito.times(noOfRecords + 1))
Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount))
.openChannel(ArgumentMatchers.any());
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords + 1))
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount))
.getLatestCommittedOffsetToken();

// Now, it should be successful
Mockito.when(
mockStreamingChannel.insertRows(
ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
.thenReturn(new InsertValidationResponse());

Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken())
.thenReturn(Long.toString(noOfRecords - 1));
// Mockito.when(
// mockStreamingChannel.insertRows(
// ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
// .thenReturn(new InsertValidationResponse());
//
// Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken())
// .thenReturn(Long.toString(noOfRecords - 1));

// Retry the insert again, now everything should be ingested and the offset token should be
// noOfRecords-1
records.forEach(topicPartitionChannel::insertRecordToBuffer);
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords * 2))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));

Assert.assertEquals(noOfRecords - 1, topicPartitionChannel.fetchOffsetTokenWithRetry());
expectedInsertRowsCount += noOfRecords;
expectedGetOffsetCount++;

// verify mocks ingested each record
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount))
.insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class));
Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount))
.openChannel(ArgumentMatchers.any());
Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount))
.getLatestCommittedOffsetToken();
}

@Test
Expand Down

0 comments on commit 53db470

Please sign in to comment.