Skip to content

Commit

Permalink
SNOW-1514185 Assign new channel when no offset is present in Snowflake (
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lshcharbaty authored Aug 20, 2024
1 parent 3dabfc0 commit b68d226
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ private void resetChannelMetadataAfterRecovery(
? latestConsumerOffset.get()
: offsetRecoveredFromSnowflake + 1L;
if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.channel = newChannel;
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ private void resetChannelMetadataAfterRecovery(
? currentConsumerGroupOffset.get()
: offsetRecoveredFromSnowflake + 1L;
if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.channel = newChannel;
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1311,4 +1311,46 @@ private void insertAndFlush(TopicPartitionChannel channel, List<SinkRecord> reco
Thread.sleep(this.streamingBufferThreshold.getFlushTimeThresholdSeconds() + 1);
channel.insertBufferedRecordsIfFlushTimeThresholdReached();
}

@Test
public void assignANewChannel_whenNoOffsetIsPresentInSnowflake() {
// given
String noOffset = "-1";

SnowflakeStreamingIngestChannel originalChannel =
Mockito.mock(SnowflakeStreamingIngestChannel.class);
Mockito.when(originalChannel.getLatestCommittedOffsetToken())
.thenReturn(noOffset)
.thenThrow(new SFException(ErrorCode.CHANNEL_STATUS_INVALID));

SnowflakeStreamingIngestChannel reopenedChannel =
Mockito.mock(SnowflakeStreamingIngestChannel.class);
Mockito.when(reopenedChannel.getLatestCommittedOffsetToken()).thenReturn(noOffset);

Mockito.when(mockStreamingClient.openChannel(any(OpenChannelRequest.class)))
.thenReturn(originalChannel, reopenedChannel);

TopicPartitionChannel topicPartitionChannel =
createTopicPartitionChannel(
this.mockStreamingClient,
this.topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
this.enableSchematization,
this.streamingBufferThreshold,
this.sfConnectorConfig,
this.mockKafkaRecordErrorReporter,
this.mockSinkTaskContext,
this.mockSnowflakeConnectionService,
new RecordService(),
this.mockTelemetryService,
true,
null);

// when
topicPartitionChannel.getOffsetSafeToCommitToKafka();

// then
Assert.assertEquals(reopenedChannel, topicPartitionChannel.getChannel());
}
}

0 comments on commit b68d226

Please sign in to comment.