Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1759821 - [GitHub] Too many Warnings: "might have an offset token mismatch based on the provided offset token verification logic" #971

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ public String validateConfig(Map<String, String> config) {
"Streaming client Channel migration is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
if (config.containsKey(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
Utils.formatString(
"Streaming channel offset verification function is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
}

if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ public class SnowflakeSinkConnectorConfig {
+ " format is deprecated and V1 will be used always, disabling this config could have"
+ " ramifications. Please consult Snowflake support before setting this to false.";

public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG =
"enable.streaming.channel.offset.verification";
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DISPLAY =
"Enable streaming channel offset verification function";
public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DEFAULT = true;
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DOC =
"Whether to enable streaming channel offset verification function. The function checks only"
+ " for incremental offsets (might contain gaps) and might signal false positives in case"
+ " of SMT. Can only be set if Streaming Snowpipe is enabled";

public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS =
"enable.task.fail.on.authorization.errors";
public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,16 @@ public static ConfigDef getConfig() {
ICEBERG_ENABLED_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"When set to true the connector will ingest data into the Iceberg table. Check the"
+ " official Snowflake documentation for the prerequisites.");
+ " official Snowflake documentation for the prerequisites.")
.define(
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
ConfigDef.Type.BOOLEAN,
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DEFAULT,
ConfigDef.Importance.LOW,
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DOC,
CONNECTOR_CONFIG_DOC,
11,
ConfigDef.Width.NONE,
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DISPLAY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG,
inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG));
}
if (inputConfig.containsKey(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) {
BOOLEAN_VALIDATOR.ensureValid(
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG));
}

if (inputConfig.containsKey(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) {
BOOLEAN_VALIDATOR.ensureValid(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,14 @@ public class StreamingUtils {
public static final String STREAMING_CONSTANT_OAUTH_CLIENT_SECRET = "oauth_client_secret";
public static final String STREAMING_CONSTANT_OAUTH_REFRESH_TOKEN = "oauth_refresh_token";

// Offset verification function to verify that the current start offset has to be the previous end
// offset + 1, note that there are some false positives when SMT is used.
// Offset verification function to verify that the current start offset has to incremental,
// note that there are some false positives when SMT is used.
public static final OffsetTokenVerificationFunction offsetTokenVerificationFunction =
(prevBatchEndOffset, curBatchStartOffset, curBatchEndOffset, rowCount) -> {
if (prevBatchEndOffset != null && curBatchStartOffset != null) {
long curStart = Long.parseLong(curBatchStartOffset);
long prevEnd = Long.parseLong(prevBatchEndOffset);
if (curStart != prevEnd + 1) {
return false;
}
return curStart > prevEnd;
}
return true;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,50 @@ public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() {
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG);
}

@Test
public void testEnableStreamingChannelOffsetVerificationConfig() {
// given
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withRole("ACCOUNTADMIN")
.withChannelOffsetTokenVerificationFunctionEnabled(true)
.build();

// when, then
connectorConfigValidator.validateConfig(config);
}

@Test
public void testEnableStreamingChannelOffsetVerificationConfig_invalidWithSnowpipe() {
// given
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.snowpipeConfig()
.withChannelOffsetTokenVerificationFunctionEnabled(true)
.build();

// when, then
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
}

@Test
public void
testEnableStreamingChannelOffsetVerificationConfig_invalidBooleanValue_WithSnowpipeStreaming() {
// given
Map<String, String> config = SnowflakeSinkConnectorConfigBuilder.streamingConfig().build();
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
"INVALID");

// when, then
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
}

@Test
public void testStreamingProviderOverrideConfig_invalidWithSnowpipe() {
Map<String, String> config = getConfig();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.Utils.*;
import static com.snowflake.kafka.connector.Utils.SF_DATABASE;
Expand All @@ -24,6 +25,10 @@ public static SnowflakeSinkConnectorConfigBuilder snowpipeConfig() {
return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE);
}

public static SnowflakeSinkConnectorConfigBuilder streamingConfig() {
return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING);
}

public static SnowflakeSinkConnectorConfigBuilder icebergConfig() {
return commonRequiredFields()
.withIcebergEnabled()
Expand Down Expand Up @@ -113,6 +118,12 @@ public SnowflakeSinkConnectorConfigBuilder withSchematizationEnabled(boolean ena
return this;
}

public SnowflakeSinkConnectorConfigBuilder withChannelOffsetTokenVerificationFunctionEnabled(
boolean enabled) {
config.put(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, Boolean.toString(enabled));
return this;
}

public Map<String, String> build() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,9 @@ public void testInsertRowsWithSchemaEvolution() throws Exception {
null,
this.schemaEvolutionService);

final int noOfRecords = 3;
final int noOfRecords = 2;
List<SinkRecord> records =
TestUtils.createNativeJsonSinkRecords(0, noOfRecords, TOPIC, PARTITION);
TestUtils.createNativeJsonSinkRecords(1, noOfRecords, TOPIC, PARTITION);

for (int idx = 0; idx < records.size(); idx++) {
topicPartitionChannel.insertRecord(records.get(idx), idx == 0);
Expand Down Expand Up @@ -1283,17 +1283,18 @@ public void testOffsetTokenVerificationFunction() {
Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify("1", "2", "4", 2));
Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify("1", "2", null, 1));
Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify(null, null, null, 0));
Assert.assertFalse(StreamingUtils.offsetTokenVerificationFunction.verify("1", "3", "4", 3));
Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify("1", "3", "4", 3));
Assert.assertFalse(StreamingUtils.offsetTokenVerificationFunction.verify("2", "1", "4", 3));
}

@Test
public void assignANewChannelAfterTheSetupIsFullyDone() throws Exception {
// given
String noOffset = "-1";

SnowflakeStreamingIngestChannel channel1 = Mockito.mock(SnowflakeStreamingIngestChannel.class);
Mockito.when(channel1.getLatestCommittedOffsetToken())
.thenReturn("0")
.thenReturn(noOffset)
.thenThrow(new SFException(ErrorCode.CHANNEL_STATUS_INVALID));

Mockito.when(channel1.insertRow(anyMap(), anyString()))
Expand Down
Loading