diff --git a/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java index 90c07e13b..f8f13e1ee 100644 --- a/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/DefaultConnectorConfigValidator.java @@ -125,6 +125,14 @@ public String validateConfig(Map config) { "Streaming client Channel migration is only available with {}.", IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); } + if (config.containsKey( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, + Utils.formatString( + "Streaming channel offset verification function is only available with {}.", + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } } if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index b78eea533..d52b9d95a 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -201,6 +201,16 @@ public class SnowflakeSinkConnectorConfig { + " format is deprecated and V1 will be used always, disabling this config could have" + " ramifications. Please consult Snowflake support before setting this to false."; + public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG = + "enable.streaming.channel.offset.verification"; + public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DISPLAY = + "Enable streaming channel offset verification function"; + public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DEFAULT = true; + public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DOC = + "Whether to enable streaming channel offset verification function. The function checks only" + + " for incremental offsets (might contain gaps) and might signal false positives in case" + + " of SMT. Can only be set if Streaming Snowpipe is enabled"; + public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS = "enable.task.fail.on.authorization.errors"; public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false; diff --git a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java index aeb003d12..e9823eab3 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java +++ b/src/main/java/com/snowflake/kafka/connector/config/ConnectorConfigDefinition.java @@ -478,6 +478,16 @@ public static ConfigDef getConfig() { ICEBERG_ENABLED_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "When set to true the connector will ingest data into the Iceberg table. Check the" - + " official Snowflake documentation for the prerequisites."); + + " official Snowflake documentation for the prerequisites.") + .define( + ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, + ConfigDef.Type.BOOLEAN, + ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DEFAULT, + ConfigDef.Importance.LOW, + ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DOC, + CONNECTOR_CONFIG_DOC, + 11, + ConfigDef.Width.NONE, + ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DISPLAY); } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java index f9141433b..c2f1b3489 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DefaultStreamingConfigValidator.java @@ -79,6 +79,11 @@ public ImmutableMap validate(Map inputConfig) { ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)); } + if (inputConfig.containsKey(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) { + BOOLEAN_VALIDATOR.ensureValid( + ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, + inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)); + } if (inputConfig.containsKey(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) { BOOLEAN_VALIDATOR.ensureValid( diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java index dea5b5ae0..f685116ab 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java @@ -58,16 +58,14 @@ public class StreamingUtils { public static final String STREAMING_CONSTANT_OAUTH_CLIENT_SECRET = "oauth_client_secret"; public static final String STREAMING_CONSTANT_OAUTH_REFRESH_TOKEN = "oauth_refresh_token"; - // Offset verification function to verify that the current start offset has to be the previous end - // offset + 1, note that there are some false positives when SMT is used. + // Offset verification function to verify that the current start offset has to incremental, + // note that there are some false positives when SMT is used. public static final OffsetTokenVerificationFunction offsetTokenVerificationFunction = (prevBatchEndOffset, curBatchStartOffset, curBatchEndOffset, rowCount) -> { if (prevBatchEndOffset != null && curBatchStartOffset != null) { long curStart = Long.parseLong(curBatchStartOffset); long prevEnd = Long.parseLong(prevBatchEndOffset); - if (curStart != prevEnd + 1) { - return false; - } + return curStart > prevEnd; } return true; }; diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java index f43116f46..080a5a91b 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java @@ -838,6 +838,50 @@ public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() { SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG); } + @Test + public void testEnableStreamingChannelOffsetVerificationConfig() { + // given + Map config = + SnowflakeSinkConnectorConfigBuilder.streamingConfig() + .withRole("ACCOUNTADMIN") + .withChannelOffsetTokenVerificationFunctionEnabled(true) + .build(); + + // when, then + connectorConfigValidator.validateConfig(config); + } + + @Test + public void testEnableStreamingChannelOffsetVerificationConfig_invalidWithSnowpipe() { + // given + Map config = + SnowflakeSinkConnectorConfigBuilder.snowpipeConfig() + .withChannelOffsetTokenVerificationFunctionEnabled(true) + .build(); + + // when, then + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) + .isInstanceOf(SnowflakeKafkaConnectorException.class) + .hasMessageContaining( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG); + } + + @Test + public void + testEnableStreamingChannelOffsetVerificationConfig_invalidBooleanValue_WithSnowpipeStreaming() { + // given + Map config = SnowflakeSinkConnectorConfigBuilder.streamingConfig().build(); + config.put( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, + "INVALID"); + + // when, then + assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) + .isInstanceOf(SnowflakeKafkaConnectorException.class) + .hasMessageContaining( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG); + } + @Test public void testStreamingProviderOverrideConfig_invalidWithSnowpipe() { Map config = getConfig(); diff --git a/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java b/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java index 21c98e75c..c43ed876a 100644 --- a/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java +++ b/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.config; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; import static com.snowflake.kafka.connector.Utils.*; import static com.snowflake.kafka.connector.Utils.SF_DATABASE; @@ -24,6 +25,10 @@ public static SnowflakeSinkConnectorConfigBuilder snowpipeConfig() { return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE); } + public static SnowflakeSinkConnectorConfigBuilder streamingConfig() { + return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING); + } + public static SnowflakeSinkConnectorConfigBuilder icebergConfig() { return commonRequiredFields() .withIcebergEnabled() @@ -113,6 +118,12 @@ public SnowflakeSinkConnectorConfigBuilder withSchematizationEnabled(boolean ena return this; } + public SnowflakeSinkConnectorConfigBuilder withChannelOffsetTokenVerificationFunctionEnabled( + boolean enabled) { + config.put(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, Boolean.toString(enabled)); + return this; + } + public Map build() { return config; } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 93210c828..8318bdc9c 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -896,9 +896,9 @@ public void testInsertRowsWithSchemaEvolution() throws Exception { null, this.schemaEvolutionService); - final int noOfRecords = 3; + final int noOfRecords = 2; List records = - TestUtils.createNativeJsonSinkRecords(0, noOfRecords, TOPIC, PARTITION); + TestUtils.createNativeJsonSinkRecords(1, noOfRecords, TOPIC, PARTITION); for (int idx = 0; idx < records.size(); idx++) { topicPartitionChannel.insertRecord(records.get(idx), idx == 0); @@ -1284,17 +1284,18 @@ public void testOffsetTokenVerificationFunction() { Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify("1", "2", "4", 2)); Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify("1", "2", null, 1)); Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify(null, null, null, 0)); - Assert.assertFalse(StreamingUtils.offsetTokenVerificationFunction.verify("1", "3", "4", 3)); + Assert.assertTrue(StreamingUtils.offsetTokenVerificationFunction.verify("1", "3", "4", 3)); Assert.assertFalse(StreamingUtils.offsetTokenVerificationFunction.verify("2", "1", "4", 3)); } @Test public void assignANewChannelAfterTheSetupIsFullyDone() throws Exception { // given + String noOffset = "-1"; SnowflakeStreamingIngestChannel channel1 = Mockito.mock(SnowflakeStreamingIngestChannel.class); Mockito.when(channel1.getLatestCommittedOffsetToken()) - .thenReturn("0") + .thenReturn(noOffset) .thenThrow(new SFException(ErrorCode.CHANNEL_STATUS_INVALID)); Mockito.when(channel1.insertRow(anyMap(), anyString()))