Skip to content

Commit

Permalink
SNOW-1514185: Do assign the reopened channel unless kafka offsets are…
Browse files Browse the repository at this point in the history
… fully reset (#875)
  • Loading branch information
sfc-gh-akowalczyk authored Jul 11, 2024
1 parent ee83e22 commit 05dcbdf
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,17 @@ public long fetchOffsetTokenWithRetry() {
*/
private long streamingApiFallbackSupplier(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker) {
final long offsetRecoveredFromSnowflake =
getRecoveredOffsetFromSnowflake(streamingApiFallbackInvoker);
resetChannelMetadataAfterRecovery(streamingApiFallbackInvoker, offsetRecoveredFromSnowflake);
SnowflakeStreamingIngestChannel newChannel = reopenChannel(streamingApiFallbackInvoker);

LOGGER.warn(
"{} Fetching offsetToken after re-opening the channel:{}",
streamingApiFallbackInvoker,
this.getChannelNameFormatV1());
long offsetRecoveredFromSnowflake = fetchLatestOffsetFromChannel(newChannel);

resetChannelMetadataAfterRecovery(
streamingApiFallbackInvoker, offsetRecoveredFromSnowflake, newChannel);

return offsetRecoveredFromSnowflake;
}

Expand All @@ -871,10 +879,12 @@ private long streamingApiFallbackSupplier(
* for logging mainly.
* @param offsetRecoveredFromSnowflake offset number found in snowflake for this
* channel(partition)
* @param newChannel a channel to assign to the current instance
*/
private void resetChannelMetadataAfterRecovery(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker,
final long offsetRecoveredFromSnowflake) {
final long offsetRecoveredFromSnowflake,
SnowflakeStreamingIngestChannel newChannel) {
if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.info(
"{} Channel:{}, offset token is NULL, will use the consumer offset managed by the"
Expand Down Expand Up @@ -915,6 +925,7 @@ private void resetChannelMetadataAfterRecovery(
// Set the flag so that any leftover rows in the buffer should be skipped, it will be
// re-ingested since the offset in kafka was reset
needToSkipCurrentBatch = true;
this.channel = newChannel;
} finally {
this.bufferLock.unlock();
}
Expand All @@ -935,16 +946,11 @@ private void resetChannelMetadataAfterRecovery(
* @param streamingApiFallbackInvoker Streaming API which invoked this function.
* @return offset which was last present in Snowflake
*/
private long getRecoveredOffsetFromSnowflake(
private SnowflakeStreamingIngestChannel reopenChannel(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker) {
LOGGER.warn(
"{} Re-opening channel:{}", streamingApiFallbackInvoker, this.getChannelNameFormatV1());
this.channel = Preconditions.checkNotNull(openChannelForTable());
LOGGER.warn(
"{} Fetching offsetToken after re-opening the channel:{}",
streamingApiFallbackInvoker,
this.getChannelNameFormatV1());
return fetchLatestCommittedOffsetFromSnowflake();
return Preconditions.checkNotNull(openChannelForTable());
}

/**
Expand All @@ -958,11 +964,16 @@ private long getRecoveredOffsetFromSnowflake(
* snowflake.
*/
private long fetchLatestCommittedOffsetFromSnowflake() {
SnowflakeStreamingIngestChannel channelToGetOffset = this.channel;
return fetchLatestOffsetFromChannel(channelToGetOffset);
}

private long fetchLatestOffsetFromChannel(SnowflakeStreamingIngestChannel channel) {
LOGGER.debug(
"Fetching last committed offset for partition channel:{}", this.getChannelNameFormatV1());
String offsetToken = null;
try {
offsetToken = this.channel.getLatestCommittedOffsetToken();
offsetToken = channel.getLatestCommittedOffsetToken();
LOGGER.info(
"Fetched offsetToken for channelName:{}, offset:{}",
this.getChannelNameFormatV1(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,17 @@ public long fetchOffsetTokenWithRetry() {
*/
private long streamingApiFallbackSupplier(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker) {
final long offsetRecoveredFromSnowflake =
getRecoveredOffsetFromSnowflake(streamingApiFallbackInvoker);
resetChannelMetadataAfterRecovery(streamingApiFallbackInvoker, offsetRecoveredFromSnowflake);
SnowflakeStreamingIngestChannel newChannel = reopenChannel(streamingApiFallbackInvoker);

LOGGER.warn(
"{} Fetching offsetToken after re-opening the channel:{}",
streamingApiFallbackInvoker,
this.getChannelNameFormatV1());
long offsetRecoveredFromSnowflake = fetchLatestOffsetFromChannel(newChannel);

resetChannelMetadataAfterRecovery(
streamingApiFallbackInvoker, offsetRecoveredFromSnowflake, newChannel);

return offsetRecoveredFromSnowflake;
}

Expand All @@ -658,10 +666,12 @@ private long streamingApiFallbackSupplier(
* for logging mainly.
* @param offsetRecoveredFromSnowflake offset number found in snowflake for this
* channel(partition)
* @param newChannel a channel to assign to the current instance
*/
private void resetChannelMetadataAfterRecovery(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker,
final long offsetRecoveredFromSnowflake) {
final long offsetRecoveredFromSnowflake,
SnowflakeStreamingIngestChannel newChannel) {
if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.info(
"{} Channel:{}, offset token is NULL",
Expand Down Expand Up @@ -689,6 +699,7 @@ private void resetChannelMetadataAfterRecovery(
// Set the flag so that any leftover rows in the buffer should be skipped, it will be
// re-ingested since the offset in kafka was reset
needToSkipCurrentBatch = true;
this.channel = newChannel;

LOGGER.warn(
"{} Channel:{}, setting sinkTaskOffset to {}, offsetPersistedInSnowflake to {},"
Expand All @@ -709,16 +720,11 @@ private void resetChannelMetadataAfterRecovery(
* @param streamingApiFallbackInvoker Streaming API which invoked this function.
* @return offset which was last present in Snowflake
*/
private long getRecoveredOffsetFromSnowflake(
private SnowflakeStreamingIngestChannel reopenChannel(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker) {
LOGGER.warn(
"{} Re-opening channel:{}", streamingApiFallbackInvoker, this.getChannelNameFormatV1());
this.channel = Preconditions.checkNotNull(openChannelForTable());
LOGGER.warn(
"{} Fetching offsetToken after re-opening the channel:{}",
streamingApiFallbackInvoker,
this.getChannelNameFormatV1());
return fetchLatestCommittedOffsetFromSnowflake();
return Preconditions.checkNotNull(openChannelForTable());
}

/**
Expand All @@ -734,9 +740,14 @@ private long getRecoveredOffsetFromSnowflake(
private long fetchLatestCommittedOffsetFromSnowflake() {
LOGGER.debug(
"Fetching last committed offset for partition channel:{}", this.getChannelNameFormatV1());
SnowflakeStreamingIngestChannel channelToGetOffset = this.channel;
return fetchLatestOffsetFromChannel(channelToGetOffset);
}

private long fetchLatestOffsetFromChannel(SnowflakeStreamingIngestChannel channel) {
String offsetToken = null;
try {
offsetToken = this.channel.getLatestCommittedOffsetToken();
offsetToken = channel.getLatestCommittedOffsetToken();
LOGGER.info(
"Fetched offsetToken for channelName:{}, offset:{}",
this.getChannelNameFormatV1(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyIterable;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -1245,4 +1247,68 @@ public void testOffsetTokenVerificationFunction() {
Assert.assertFalse(StreamingUtils.offsetTokenVerificationFunction.verify("1", "3", "4", 3));
Assert.assertFalse(StreamingUtils.offsetTokenVerificationFunction.verify("2", "1", "4", 3));
}

@Test
public void assignANewChannelAfterTheSetupIsFullyDone() throws Exception {
// given

SnowflakeStreamingIngestChannel channel1 = Mockito.mock(SnowflakeStreamingIngestChannel.class);
Mockito.when(channel1.getLatestCommittedOffsetToken())
.thenReturn("0")
.thenThrow(new SFException(ErrorCode.CHANNEL_STATUS_INVALID));

Mockito.when(channel1.insertRow(anyMap(), anyString()))
.thenThrow(new SFException(ErrorCode.CHANNEL_STATUS_INVALID));
Mockito.when(channel1.insertRows(anyIterable(), anyString(), anyString()))
.thenThrow(new SFException(ErrorCode.CHANNEL_STATUS_INVALID));

SnowflakeStreamingIngestChannel channel2 = Mockito.mock(SnowflakeStreamingIngestChannel.class);
Mockito.when(channel2.getLatestCommittedOffsetToken())
.thenThrow(new SFException(ErrorCode.IO_ERROR));
Mockito.when(channel2.insertRow(anyMap(), anyString()))
.thenReturn(new InsertValidationResponse());
Mockito.when(channel2.insertRows(anyIterable(), anyString(), anyString()))
.thenReturn(new InsertValidationResponse());

Mockito.when(mockStreamingClient.openChannel(any(OpenChannelRequest.class)))
.thenReturn(channel1, channel2);

TopicPartitionChannel topicPartitionChannel =
createTopicPartitionChannel(
this.mockStreamingClient,
this.topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
this.enableSchematization,
this.streamingBufferThreshold,
this.sfConnectorConfig,
this.mockKafkaRecordErrorReporter,
this.mockSinkTaskContext,
this.mockSnowflakeConnectionService,
new RecordService(),
this.mockTelemetryService,
true,
null);

// expect
Assert.assertThrows(
SFException.class, () -> topicPartitionChannel.getOffsetSafeToCommitToKafka());

// when
List<SinkRecord> records = TestUtils.createJsonStringSinkRecords(0, 2, TOPIC, PARTITION);

// expect
Assert.assertThrows(SFException.class, () -> insertAndFlush(topicPartitionChannel, records));
}

private void insertAndFlush(TopicPartitionChannel channel, List<SinkRecord> records)
throws InterruptedException {
for (int idx = 0; idx < records.size(); idx++) {
channel.insertRecord(records.get(idx), idx == 0);
}

// expect
Thread.sleep(this.streamingBufferThreshold.getFlushTimeThresholdSeconds() + 1);
channel.insertBufferedRecordsIfFlushTimeThresholdReached();
}
}

0 comments on commit 05dcbdf

Please sign in to comment.