From 3dabfc01e3a42fc44ca73f9d37b5477afef8f4d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Tue, 13 Aug 2024 14:12:30 +0000 Subject: [PATCH 1/9] SNOW-1571459 parse jbdc properties (#909) --- .../SnowflakeSinkConnectorConfig.java | 3 + .../connector/internal/InternalUtils.java | 11 +++ .../connector/internal/JdbcProperties.java | 86 +++++++++++++++++++ .../SnowflakeConnectionServiceFactory.java | 28 +++--- .../SnowflakeConnectionServiceV1.java | 42 +++------ .../connector/internal/SnowflakeErrors.java | 5 +- .../connector/internal/InternalUtilsTest.java | 18 +++- .../internal/JdbcPropertiesTest.java | 74 ++++++++++++++++ .../travis_correct_string_json.json | 3 +- .../travis_correct_string_proxy.json | 3 +- 10 files changed, 226 insertions(+), 47 deletions(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/internal/JdbcProperties.java create mode 100644 src/test/java/com/snowflake/kafka/connector/internal/JdbcPropertiesTest.java diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index f28b2fed2..482963f5e 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -97,6 +97,9 @@ public class SnowflakeSinkConnectorConfig { // JDBC trace Info (environment variable) public static final String SNOWFLAKE_JDBC_TRACE = "JDBC_TRACE"; + // JDBC properties map + public static final String SNOWFLAKE_JDBC_MAP = "snowflake.jdbc.map"; + // Snowflake Metadata Flags private static final String SNOWFLAKE_METADATA_FLAGS = "Snowflake Metadata Flags"; public static final String SNOWFLAKE_METADATA_CREATETIME = "snowflake.metadata.createtime"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java index ec07c8d34..0fb12d535 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java @@ -306,6 +306,17 @@ protected static Properties generateProxyParametersIfRequired(Map conf) { + String jdbcConfigMapInput = conf.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_JDBC_MAP); + if (jdbcConfigMapInput == null) { + return new Properties(); + } + Map jdbcMap = Utils.parseCommaSeparatedKeyValuePairs(jdbcConfigMapInput); + Properties properties = new Properties(); + properties.putAll(jdbcMap); + return properties; + } + /** * convert ingest status to ingested file status * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/JdbcProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/JdbcProperties.java new file mode 100644 index 000000000..38318033a --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/JdbcProperties.java @@ -0,0 +1,86 @@ +package com.snowflake.kafka.connector.internal; + +import java.util.Properties; + +/** Wrapper class for all snowflake jdbc properties */ +public class JdbcProperties { + + /** All jdbc properties including proxyProperties */ + private final Properties properties; + /** Proxy related properties */ + private final Properties proxyProperties; + + private JdbcProperties(Properties combinedProperties, Properties proxyProperties) { + this.properties = combinedProperties; + this.proxyProperties = proxyProperties; + } + + public Properties getProperties() { + return properties; + } + + public String getProperty(String key) { + return properties.getProperty(key); + } + + public Object get(String key) { + return properties.get(key); + } + + public Properties getProxyProperties() { + return proxyProperties; + } + + /** + * Combine all jdbc related properties. Throws error if jdbcPropertiesMap overrides any property + * defined in connectionProperties or proxyProperties. + * + * @param connectionProperties snowflake.database.name, snowflake.schema,name, + * snowflake.private.key etc. + * @param proxyProperties jvm.proxy.xxx + * @param jdbcPropertiesMap snowflake.jdbc.map + */ + static JdbcProperties create( + Properties connectionProperties, Properties proxyProperties, Properties jdbcPropertiesMap) { + InternalUtils.assertNotEmpty("connectionProperties", connectionProperties); + proxyProperties = setEmptyIfNull(proxyProperties); + jdbcPropertiesMap = setEmptyIfNull(jdbcPropertiesMap); + + Properties proxyAndConnection = mergeProperties(connectionProperties, proxyProperties); + detectOverrides(proxyAndConnection, jdbcPropertiesMap); + + Properties combinedProperties = mergeProperties(proxyAndConnection, jdbcPropertiesMap); + + return new JdbcProperties(combinedProperties, proxyProperties); + } + + /** Test method */ + static JdbcProperties create(Properties connectionProperties) { + return create(connectionProperties, new Properties(), new Properties()); + } + + private static void detectOverrides(Properties proxyAndConnection, Properties jdbcPropertiesMap) { + jdbcPropertiesMap.forEach( + (k, v) -> { + if (proxyAndConnection.containsKey(k)) { + throw SnowflakeErrors.ERROR_0031.getException("Duplicated property: " + k); + } + }); + } + + private static Properties mergeProperties( + Properties connectionProperties, Properties proxyProperties) { + Properties mergedProperties = new Properties(); + mergedProperties.putAll(connectionProperties); + mergedProperties.putAll(proxyProperties); + return mergedProperties; + } + + /** Parsing methods does not return null. However, It's better to be perfectly sure. */ + private static Properties setEmptyIfNull(Properties properties) { + if (properties != null) { + return properties; + } + return new Properties(); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java index 52a531701..c7bc5bb51 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceFactory.java @@ -15,8 +15,8 @@ public static SnowflakeConnectionServiceBuilder builder() { } public static class SnowflakeConnectionServiceBuilder { - private Properties prop; - private Properties proxyProperties; + + private JdbcProperties jdbcProperties; private SnowflakeURL url; private String connectorName; private String taskID = "-1"; @@ -30,15 +30,15 @@ public static class SnowflakeConnectionServiceBuilder { private IngestionMethodConfig ingestionMethodConfig; @VisibleForTesting - public SnowflakeConnectionServiceBuilder setProperties(Properties prop) { - this.prop = prop; + public SnowflakeConnectionServiceBuilder setProperties(Properties connectionProperties) { + this.jdbcProperties = JdbcProperties.create(connectionProperties); this.ingestionMethodConfig = IngestionMethodConfig.SNOWPIPE; return this; } // For testing only public Properties getProperties() { - return this.prop; + return this.jdbcProperties.getProperties(); } public SnowflakeConnectionServiceBuilder setURL(SnowflakeURL url) { @@ -63,24 +63,24 @@ public SnowflakeConnectionServiceBuilder setProperties(Map conf) this.url = new SnowflakeURL(conf.get(Utils.SF_URL)); this.kafkaProvider = SnowflakeSinkConnectorConfig.KafkaProvider.of(conf.get(PROVIDER_CONFIG)).name(); - // TODO: Ideally only one property is required, but because we dont pass it around in JDBC and - // snowpipe SDK, - // it is better if we have two properties decoupled - // Right now, proxy parameters are picked from jvm system properties, in future they need to - // be decoupled - this.proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf); this.connectorName = conf.get(Utils.NAME); this.ingestionMethodConfig = IngestionMethodConfig.determineIngestionMethod(conf); - this.prop = InternalUtils.createProperties(conf, this.url, ingestionMethodConfig); + + Properties proxyProperties = InternalUtils.generateProxyParametersIfRequired(conf); + Properties connectionProperties = + InternalUtils.createProperties(conf, this.url, ingestionMethodConfig); + Properties jdbcPropertiesMap = InternalUtils.parseJdbcPropertiesMap(conf); + this.jdbcProperties = + JdbcProperties.create(connectionProperties, proxyProperties, jdbcPropertiesMap); return this; } public SnowflakeConnectionService build() { - InternalUtils.assertNotEmpty("properties", prop); + InternalUtils.assertNotEmpty("jdbcProperties", jdbcProperties); InternalUtils.assertNotEmpty("url", url); InternalUtils.assertNotEmpty("connectorName", connectorName); return new SnowflakeConnectionServiceV1( - prop, url, connectorName, taskID, proxyProperties, kafkaProvider, ingestionMethodConfig); + jdbcProperties, url, connectorName, taskID, kafkaProvider, ingestionMethodConfig); } } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 6bedd93e0..d2d9e1ef2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -39,10 +39,8 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService private final SnowflakeTelemetryService telemetry; private final String connectorName; private final String taskID; - private final Properties prop; + private final JdbcProperties jdbcProperties; - // Placeholder for all proxy related properties set in the connector configuration - private final Properties proxyProperties; private final SnowflakeURL url; private final SnowflakeInternalStage internalStage; @@ -60,30 +58,27 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); SnowflakeConnectionServiceV1( - Properties prop, + JdbcProperties jdbcProperties, SnowflakeURL url, String connectorName, String taskID, - Properties proxyProperties, String kafkaProvider, IngestionMethodConfig ingestionMethodConfig) { + this.jdbcProperties = jdbcProperties; this.connectorName = connectorName; this.taskID = taskID; this.url = url; - this.prop = prop; this.stageType = null; - this.proxyProperties = proxyProperties; this.kafkaProvider = kafkaProvider; + Properties proxyProperties = jdbcProperties.getProxyProperties(); + Properties combinedProperties = jdbcProperties.getProperties(); try { - if (proxyProperties != null && !proxyProperties.isEmpty()) { - Properties combinedProperties = - mergeProxyAndConnectionProperties(this.prop, this.proxyProperties); + if (!proxyProperties.isEmpty()) { LOGGER.debug("Proxy properties are set, passing in JDBC while creating the connection"); - this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), combinedProperties); } else { LOGGER.info("Establishing a JDBC connection with url:{}", url.getJdbcUrl()); - this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), prop); } + this.conn = new SnowflakeDriver().connect(url.getJdbcUrl(), combinedProperties); } catch (SQLException e) { throw SnowflakeErrors.ERROR_1001.getException(e); } @@ -99,17 +94,6 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService LOGGER.info("initialized the snowflake connection"); } - /* Merges the two properties. */ - private static Properties mergeProxyAndConnectionProperties( - Properties connectionProperties, Properties proxyProperties) { - assert connectionProperties != null; - assert proxyProperties != null; - Properties mergedProperties = new Properties(); - mergedProperties.putAll(connectionProperties); - mergedProperties.putAll(proxyProperties); - return mergedProperties; - } - @Override public void createTable(final String tableName, final boolean overwrite) { checkConnection(); @@ -931,19 +915,19 @@ public String getConnectorName() { public SnowflakeIngestionService buildIngestService( final String stageName, final String pipeName) { String account = url.getAccount(); - String user = prop.getProperty(InternalUtils.JDBC_USER); + String user = jdbcProperties.getProperty(InternalUtils.JDBC_USER); String userAgentSuffixInHttpRequest = String.format(USER_AGENT_SUFFIX_FORMAT, Utils.VERSION, kafkaProvider); String host = url.getUrlWithoutPort(); int port = url.getPort(); String connectionScheme = url.getScheme(); String fullPipeName = - prop.getProperty(InternalUtils.JDBC_DATABASE) + jdbcProperties.getProperty(InternalUtils.JDBC_DATABASE) + "." - + prop.getProperty(InternalUtils.JDBC_SCHEMA) + + jdbcProperties.getProperty(InternalUtils.JDBC_SCHEMA) + "." + pipeName; - PrivateKey privateKey = (PrivateKey) prop.get(InternalUtils.JDBC_PRIVATE_KEY); + PrivateKey privateKey = (PrivateKey) jdbcProperties.get(InternalUtils.JDBC_PRIVATE_KEY); return SnowflakeIngestionServiceFactory.builder( account, user, @@ -1027,9 +1011,9 @@ public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( InternalUtils.assertNotEmpty("sourceChannelName", sourceChannelName); InternalUtils.assertNotEmpty("destinationChannelName", destinationChannelName); String fullyQualifiedTableName = - prop.getProperty(InternalUtils.JDBC_DATABASE) + jdbcProperties.getProperty(InternalUtils.JDBC_DATABASE) + "." - + prop.getProperty(InternalUtils.JDBC_SCHEMA) + + jdbcProperties.getProperty(InternalUtils.JDBC_SCHEMA) + "." + tableName; String query = "select SYSTEM$SNOWPIPE_STREAMING_MIGRATE_CHANNEL_OFFSET_TOKEN((?), (?), (?));"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 14378a978..14cbe2d51 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -136,7 +136,10 @@ public enum SnowflakeErrors { String.format( "Failed to parse %s map", SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP)), - + ERROR_0031( + "0031", + "Failed to combine JDBC properties", + "One of snowflake.jdbc.map property overrides other jdbc property"), // Snowflake connection issues 1--- ERROR_1001( "1001", diff --git a/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java b/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java index dc6980c00..7d4fc54c1 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/InternalUtilsTest.java @@ -1,5 +1,7 @@ package com.snowflake.kafka.connector.internal; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.mock.MockResultSetForSizeTest; import java.sql.ResultSet; @@ -8,7 +10,7 @@ import java.util.Map; import java.util.Properties; import net.snowflake.ingest.connection.IngestStatus; -import org.junit.Test; +import org.junit.jupiter.api.Test; public class InternalUtilsTest { @Test @@ -134,4 +136,18 @@ public void testResultSize() throws SQLException { resultSet = new MockResultSetForSizeTest(100); assert InternalUtils.resultSize(resultSet) == 100; } + + @Test + public void parseJdbcPropertiesMapTest() { + String key = "snowflake.jdbc.map"; + String input = + "isInsecureMode:true, disableSamlURLCheck:false, passcodeInPassword:on, foo:bar," + + " networkTimeout:100"; + Map config = new HashMap<>(); + config.put(key, input); + // when + Properties jdbcPropertiesMap = InternalUtils.parseJdbcPropertiesMap(config); + // then + assertEquals(jdbcPropertiesMap.size(), 5); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/JdbcPropertiesTest.java b/src/test/java/com/snowflake/kafka/connector/internal/JdbcPropertiesTest.java new file mode 100644 index 000000000..785d41934 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/JdbcPropertiesTest.java @@ -0,0 +1,74 @@ +package com.snowflake.kafka.connector.internal; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Properties; +import org.junit.jupiter.api.Test; + +public class JdbcPropertiesTest { + + @Test + public void shouldCombineProperties() { + // given + SnowflakeURL url = TestUtils.getUrl(); + Properties connection = InternalUtils.createProperties(TestUtils.getConf(), url); + + Properties proxy = new Properties(); + proxy.put("useProxy", "true"); + + Properties jdbcMap = new Properties(); + jdbcMap.put("insecureMode", "true"); + // when + JdbcProperties jdbcProperties = JdbcProperties.create(connection, proxy, jdbcMap); + // then + int givenPropertiesSize = connection.size() + proxy.size() + jdbcMap.size(); + int mergedPropertiesSize = jdbcProperties.getProperties().size(); + + assertEquals(givenPropertiesSize, mergedPropertiesSize); + } + + @Test + public void shouldThrowWhen_jdbcMap_overridesConnection() { + Properties connection = new Properties(); + connection.put("user", "test_user1"); + + Properties proxy = new Properties(); + + Properties jdbcMap = new Properties(); + jdbcMap.put("user", "test_user2"); + jdbcMap.put("insecureMode", "true"); + // expect + assertThatThrownBy(() -> JdbcProperties.create(connection, proxy, jdbcMap)) + .isInstanceOfSatisfying( + SnowflakeKafkaConnectorException.class, + ex -> { + // property key is printed not value + assertTrue(ex.getMessage().contains("user")); + assertEquals("0031", ex.getCode()); + }); + } + + @Test + public void shouldThrowWhen_jdbcMap_overridesProxy() { + Properties connection = new Properties(); + connection.put("user", "test_user1"); + + Properties proxy = new Properties(); + proxy.put("useProxy", "true"); + + Properties jdbcMap = new Properties(); + jdbcMap.put("useProxy", "true"); + jdbcMap.put("insecureMode", "false"); + // expect + assertThatThrownBy(() -> JdbcProperties.create(connection, proxy, jdbcMap)) + .isInstanceOfSatisfying( + SnowflakeKafkaConnectorException.class, + ex -> { + // property key is printed not value + assertTrue(ex.getMessage().contains("useProxy")); + assertEquals("0031", ex.getCode()); + }); + } +} diff --git a/test/rest_request_template/travis_correct_string_json.json b/test/rest_request_template/travis_correct_string_json.json index 0e935d4d1..a02de9889 100644 --- a/test/rest_request_template/travis_correct_string_json.json +++ b/test/rest_request_template/travis_correct_string_json.json @@ -10,6 +10,7 @@ "snowflake.database.name":"SNOWFLAKE_DATABASE", "snowflake.schema.name":"SNOWFLAKE_SCHEMA", "key.converter":"org.apache.kafka.connect.storage.StringConverter", - "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter" + "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter", + "snowflake.jdbc.map": "isInsecureMode : true, notYetExistingProp : true" } } diff --git a/test/rest_request_template/travis_correct_string_proxy.json b/test/rest_request_template/travis_correct_string_proxy.json index 9852f624f..5c5b62817 100644 --- a/test/rest_request_template/travis_correct_string_proxy.json +++ b/test/rest_request_template/travis_correct_string_proxy.json @@ -14,6 +14,7 @@ "jvm.proxy.host": "localhost", "jvm.proxy.port": "3128", "jvm.proxy.username": "admin", - "jvm.proxy.password": "test" + "jvm.proxy.password": "test", + "snowflake.jdbc.map": "isInsecureMode : true, notYetExistingProp : true" } } From b68d226b8c0c6c5739958fa357073971f0f58708 Mon Sep 17 00:00:00 2001 From: Lex Shcharbaty <147443583+sfc-gh-lshcharbaty@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:49:19 +0200 Subject: [PATCH 2/9] SNOW-1514185 Assign new channel when no offset is present in Snowflake (#913) --- .../BufferedTopicPartitionChannel.java | 1 + .../DirectTopicPartitionChannel.java | 1 + .../streaming/TopicPartitionChannelTest.java | 42 +++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java index 47d36274c..737ce6cfe 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java @@ -905,6 +905,7 @@ private void resetChannelMetadataAfterRecovery( ? latestConsumerOffset.get() : offsetRecoveredFromSnowflake + 1L; if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + this.channel = newChannel; return; } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 3fa7e98e4..9e249e382 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -707,6 +707,7 @@ private void resetChannelMetadataAfterRecovery( ? currentConsumerGroupOffset.get() : offsetRecoveredFromSnowflake + 1L; if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + this.channel = newChannel; return; } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index c7584f02a..8f2eb9cbd 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -1311,4 +1311,46 @@ private void insertAndFlush(TopicPartitionChannel channel, List reco Thread.sleep(this.streamingBufferThreshold.getFlushTimeThresholdSeconds() + 1); channel.insertBufferedRecordsIfFlushTimeThresholdReached(); } + + @Test + public void assignANewChannel_whenNoOffsetIsPresentInSnowflake() { + // given + String noOffset = "-1"; + + SnowflakeStreamingIngestChannel originalChannel = + Mockito.mock(SnowflakeStreamingIngestChannel.class); + Mockito.when(originalChannel.getLatestCommittedOffsetToken()) + .thenReturn(noOffset) + .thenThrow(new SFException(ErrorCode.CHANNEL_STATUS_INVALID)); + + SnowflakeStreamingIngestChannel reopenedChannel = + Mockito.mock(SnowflakeStreamingIngestChannel.class); + Mockito.when(reopenedChannel.getLatestCommittedOffsetToken()).thenReturn(noOffset); + + Mockito.when(mockStreamingClient.openChannel(any(OpenChannelRequest.class))) + .thenReturn(originalChannel, reopenedChannel); + + TopicPartitionChannel topicPartitionChannel = + createTopicPartitionChannel( + this.mockStreamingClient, + this.topicPartition, + TEST_CHANNEL_NAME, + TEST_TABLE_NAME, + this.enableSchematization, + this.streamingBufferThreshold, + this.sfConnectorConfig, + this.mockKafkaRecordErrorReporter, + this.mockSinkTaskContext, + this.mockSnowflakeConnectionService, + new RecordService(), + this.mockTelemetryService, + true, + null); + + // when + topicPartitionChannel.getOffsetSafeToCommitToKafka(); + + // then + Assert.assertEquals(reopenedChannel, topicPartitionChannel.getChannel()); + } } From d55cd768c75ea58a42002d3ebe848c4ca91e85fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= <145468486+sfc-gh-mbobowski@users.noreply.github.com> Date: Wed, 21 Aug 2024 09:34:11 +0200 Subject: [PATCH 3/9] NO-SNOW Ignore major updates on Kafka dependencies (#915) --- .github/dependabot.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml index cb22132d3..8ba754cfe 100644 --- a/.github/dependabot.yaml +++ b/.github/dependabot.yaml @@ -6,4 +6,7 @@ updates: - package-ecosystem: "maven" # See documentation for possible values directory: "/" # Location of package manifests schedule: - interval: "weekly" \ No newline at end of file + interval: "weekly" + ignore: + - dependency-name: "org.apache.kafka:*" + update-types: ["version-update:semver-major"] From 56f96bb5e1b397ceb045103dbd24777d543f9b01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= <145468486+sfc-gh-mbobowski@users.noreply.github.com> Date: Mon, 26 Aug 2024 09:42:58 +0200 Subject: [PATCH 4/9] SNOW-1623269 Fail sink task on authorization exception from Snowflake (#916) --- .../SnowflakeSinkConnectorConfig.java | 13 +++- .../kafka/connector/SnowflakeSinkTask.java | 8 +++ ...SinkTaskAuthorizationExceptionTracker.java | 54 +++++++++++++++ .../connector/internal/SnowflakeErrors.java | 4 ++ ...TaskAuthorizationExceptionTrackerTest.java | 69 +++++++++++++++++++ 5 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java create mode 100644 src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 482963f5e..5c3c45335 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -214,6 +214,10 @@ public class SnowflakeSinkConnectorConfig { + " format is deprecated and V1 will be used always, disabling this config could have" + " ramifications. Please consult Snowflake support before setting this to false."; + public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS = + "enable.task.fail.on.authorization.errors"; + public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false; + // MDC logging header public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging"; public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging"; @@ -714,7 +718,14 @@ static ConfigDef newConfigDef() { CONNECTOR_CONFIG, 9, ConfigDef.Width.NONE, - ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY); + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY) + .define( + ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, + Type.BOOLEAN, + ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT, + Importance.LOW, + "If set to true the Connector will fail its tasks when authorization error from" + + " Snowflake occurred"); } public static class TopicToTableValidator implements ConfigDef.Validator { diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index 66a06fcd7..ca516db10 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -89,6 +89,9 @@ public class SnowflakeSinkTask extends SinkTask { private IngestionMethodConfig ingestionMethodConfig; + private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + /** default constructor, invoked by kafka connect framework */ public SnowflakeSinkTask() { DYNAMIC_LOGGER = new KCLogger(this.getClass().getName()); @@ -156,6 +159,8 @@ public void start(final Map parsedConfig) { // generate topic to table map this.topic2table = getTopicToTableMap(parsedConfig); + this.authorizationExceptionTracker.updateStateOnTaskStart(parsedConfig); + // generate metadataConfig table SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig); @@ -294,6 +299,8 @@ public void close(final Collection partitions) { */ @Override public void put(final Collection records) { + this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed(); + final long recordSize = records.size(); if (enableRebalancing && recordSize > 0) { processRebalancingTest(); @@ -345,6 +352,7 @@ public Map preCommit( } }); } catch (Exception e) { + this.authorizationExceptionTracker.reportPrecommitException(e); this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage()); } diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java new file mode 100644 index 000000000..b6931192a --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java @@ -0,0 +1,54 @@ +package com.snowflake.kafka.connector; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT; +import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_1005; + +import java.util.Map; + +/** + * When the user rotates Snowflake key that is stored in an external file the Connector hangs and + * does not mark its tasks as failed. To fix this corner case we need to track the authorization + * exception thrown during preCommit() and stop tasks during put(). + * + *

Note that exceptions thrown during preCommit() are swallowed by Kafka Connect and will not + * cause task failure. + */ +public class SnowflakeSinkTaskAuthorizationExceptionTracker { + + private static final String AUTHORIZATION_EXCEPTION_MESSAGE = "Authorization failed after retry"; + + private boolean authorizationTaskFailureEnabled; + private boolean authorizationErrorReported; + + public SnowflakeSinkTaskAuthorizationExceptionTracker() { + this.authorizationTaskFailureEnabled = true; + this.authorizationErrorReported = false; + } + + public void updateStateOnTaskStart(Map taskConfig) { + authorizationTaskFailureEnabled = + Boolean.parseBoolean( + taskConfig.getOrDefault( + ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, + Boolean.toString(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT))); + } + + /** + * Check if the thrown exception is related to authorization + * + * @param ex - any exception that occurred during preCommit + */ + public void reportPrecommitException(Exception ex) { + if (ex.getMessage().contains(AUTHORIZATION_EXCEPTION_MESSAGE)) { + authorizationErrorReported = true; + } + } + + /** Throw exception if authorization has failed before */ + public void throwExceptionIfAuthorizationFailed() { + if (authorizationTaskFailureEnabled && authorizationErrorReported) { + throw ERROR_1005.getException(); + } + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 14cbe2d51..aaffcb174 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -155,6 +155,10 @@ public enum SnowflakeErrors { "Either the current connection is closed or hasn't connect to snowflake" + " server"), ERROR_1004( "1004", "Fetching OAuth token fail", "Fail to get OAuth token from authorization server"), + ERROR_1005( + "1005", + "Task failed due to authorization error", + "Set `enable.task.fail.on.authorization.errors=false` to avoid this behavior"), // SQL issues 2--- ERROR_2001( "2001", "Failed to prepare SQL statement", "SQL Exception, reported by Snowflake JDBC"), diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java new file mode 100644 index 000000000..650bb5732 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java @@ -0,0 +1,69 @@ +package com.snowflake.kafka.connector; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS; + +import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; +import com.snowflake.kafka.connector.internal.TestUtils; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class SnowflakeSinkTaskAuthorizationExceptionTrackerTest { + + @Test + public void shouldThrowExceptionOnAuthorizationError() { + // given + SnowflakeSinkTaskAuthorizationExceptionTracker tracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + Map config = TestUtils.getConfig(); + config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, "true"); + tracker.updateStateOnTaskStart(config); + + // when + tracker.reportPrecommitException(new Exception("Authorization failed after retry")); + + // then + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, tracker::throwExceptionIfAuthorizationFailed); + } + + @Test + public void shouldNotThrowExceptionWhenNoExceptionReported() { + // given + SnowflakeSinkTaskAuthorizationExceptionTracker tracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + Map config = TestUtils.getConfig(); + config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, "true"); + tracker.updateStateOnTaskStart(config); + + // expect + Assertions.assertDoesNotThrow(tracker::throwExceptionIfAuthorizationFailed); + } + + @ParameterizedTest + @MethodSource("noExceptionConditions") + public void shouldNotThrowException(boolean enabled, String exceptionMessage) { + // given + SnowflakeSinkTaskAuthorizationExceptionTracker tracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + Map config = TestUtils.getConfig(); + config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, Boolean.toString(enabled)); + tracker.updateStateOnTaskStart(config); + + // when + tracker.reportPrecommitException(new Exception(exceptionMessage)); + + // then + Assertions.assertDoesNotThrow(tracker::throwExceptionIfAuthorizationFailed); + } + + public static Stream noExceptionConditions() { + return Stream.of( + Arguments.of(false, "Authorization failed after retry"), + Arguments.of(true, "NullPointerException")); + } +} From 0b8af94f55385a7c9ddfd687eb5525f55a474254 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= <145468486+sfc-gh-mbobowski@users.noreply.github.com> Date: Mon, 9 Sep 2024 10:25:14 +0200 Subject: [PATCH 5/9] SNOW-1649161 Fix NaN value handling in schematization (#920) --- .../connector/records/RecordService.java | 11 +- .../internal/SchematizationTestUtils.java | 42 --- .../kafka/connector/internal/TestUtils.java | 20 +- ...lakeSinkServiceV2AvroSchematizationIT.java | 243 ++++++++++++++++++ .../streaming/SnowflakeSinkServiceV2IT.java | 99 ------- test/run_test_confluent.sh | 1 - test/test_suit/test_avrosr_avrosr.py | 8 +- .../test_schema_evolution_avro_sr.py | 9 +- .../test_snowpipe_streaming_string_avro_sr.py | 12 +- test/test_suites.py | 2 +- 10 files changed, 279 insertions(+), 168 deletions(-) create mode 100644 src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 4becdc448..e2b7bdd2f 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -41,6 +41,7 @@ import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ArrayNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.ConnectSchema; @@ -277,7 +278,7 @@ private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) } else if (columnNode.isNull()) { columnValue = null; } else { - columnValue = MAPPER.writeValueAsString(columnNode); + columnValue = writeValueAsStringOrNan(columnNode); } // while the value is always dumped into a string, the Streaming Ingest SDK // will transform the value according to its type in the table @@ -291,6 +292,14 @@ private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) return streamingIngestRow; } + private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException { + if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) { + return "NaN"; + } else { + return MAPPER.writeValueAsString(columnNode); + } + } + /** For now there are two columns one is content and other is metadata. Both are Json */ private static class SnowflakeTableRow { // This can be a JsonNode but we will keep this as is. diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java index ac33b3126..cf00ddb27 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java @@ -5,26 +5,6 @@ public class SchematizationTestUtils { - public static final Map SF_AVRO_SCHEMA_FOR_TABLE_CREATION; - - static { - SF_AVRO_SCHEMA_FOR_TABLE_CREATION = new HashMap<>(); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT8", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT8_OPTIONAL", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT16", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT32", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT64", "NUMBER"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("FIRST_NAME", "VARCHAR"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_INT", "ARRAY"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_JSON", "ARRAY"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_MAP", "VARIANT"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT"); - } - public static final Map SF_JSON_SCHEMA_FOR_TABLE_CREATION; static { @@ -43,28 +23,6 @@ public class SchematizationTestUtils { SF_JSON_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT"); } - public static final Map CONTENT_FOR_AVRO_TABLE_CREATION; - - static { - CONTENT_FOR_AVRO_TABLE_CREATION = new HashMap<>(); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT8", 0L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT8_OPTIONAL", null); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT16", 42L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT32", 42L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT64", 42L); - CONTENT_FOR_AVRO_TABLE_CREATION.put("FIRST_NAME", "zekai"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99); - CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99); - CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true); - CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_INT", "[1,2]"); - CONTENT_FOR_AVRO_TABLE_CREATION.put( - "INFO_ARRAY_JSON", - "[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_MAP", "{\"field\":3}"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("RECORD_METADATA", "RECORD_METADATA_PLACE_HOLDER"); - } - public static final Map CONTENT_FOR_JSON_TABLE_CREATION; static { diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index eda7545f2..610ede649 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -39,7 +39,6 @@ import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; -import com.snowflake.kafka.connector.internal.streaming.StreamingUtils; import com.snowflake.kafka.connector.records.SnowflakeJsonSchema; import com.snowflake.kafka.connector.records.SnowflakeRecordContent; import io.confluent.connect.avro.AvroConverter; @@ -78,8 +77,6 @@ import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import net.snowflake.client.jdbc.internal.google.gson.JsonObject; import net.snowflake.client.jdbc.internal.google.gson.JsonParser; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; -import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -887,13 +884,16 @@ public static void checkTableContentOneRow(String tableName, Map } } - public static SnowflakeStreamingIngestClient createStreamingClient( - Map config, String clientName) { - Properties clientProperties = new Properties(); - clientProperties.putAll(StreamingUtils.convertConfigForStreamingClient(new HashMap<>(config))); - return SnowflakeStreamingIngestClientFactory.builder(clientName) - .setProperties(clientProperties) - .build(); + public static Map getTableContentOneRow(String tableName) throws SQLException { + String getRowQuery = "select * from " + tableName + " limit 1"; + ResultSet result = executeQuery(getRowQuery); + result.next(); + + Map contentMap = new HashMap<>(); + for (int i = 0; i < result.getMetaData().getColumnCount(); i++) { + contentMap.put(result.getMetaData().getColumnName(i + 1), result.getObject(i + 1)); + } + return contentMap; } /** diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java new file mode 100644 index 000000000..93a6f3bc0 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java @@ -0,0 +1,243 @@ +package com.snowflake.kafka.connector.internal.streaming; + +import static com.snowflake.kafka.connector.internal.TestUtils.getTableContentOneRow; +import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; +import static org.awaitility.Awaitility.await; + +import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; +import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.SnowflakeSinkService; +import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory; +import com.snowflake.kafka.connector.internal.TestUtils; +import io.confluent.connect.avro.AvroConverter; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class SnowflakeSinkServiceV2AvroSchematizationIT { + + private static final int PARTITION = 0; + private static final int START_OFFSET = 0; + + private static final String ID_INT8 = "ID_INT8"; + private static final String ID_INT8_OPTIONAL = "ID_INT8_OPTIONAL"; + private static final String ID_INT16 = "ID_INT16"; + private static final String ID_INT32 = "ID_INT32"; + private static final String ID_INT64 = "ID_INT64"; + private static final String FIRST_NAME = "FIRST_NAME"; + private static final String RATING_FLOAT32 = "RATING_FLOAT32"; + private static final String FLOAT_NAN = "FLOAT_NAN"; + private static final String RATING_FLOAT64 = "RATING_FLOAT64"; + private static final String APPROVAL = "APPROVAL"; + private static final String INFO_ARRAY_STRING = "INFO_ARRAY_STRING"; + private static final String INFO_ARRAY_INT = "INFO_ARRAY_INT"; + private static final String INFO_ARRAY_JSON = "INFO_ARRAY_JSON"; + private static final String INFO_MAP = "INFO_MAP"; + private static final String RECORD_METADATA = "RECORD_METADATA"; + + private static final Map EXPECTED_AVRO_SCHEMA = + new HashMap() { + { + put(ID_INT8, "NUMBER"); + put(ID_INT8_OPTIONAL, "NUMBER"); + put(ID_INT16, "NUMBER"); + put(ID_INT32, "NUMBER"); + put(ID_INT64, "NUMBER"); + put(FIRST_NAME, "VARCHAR"); + put(RATING_FLOAT32, "FLOAT"); + put(FLOAT_NAN, "FLOAT"); + put(RATING_FLOAT64, "FLOAT"); + put(APPROVAL, "BOOLEAN"); + put(INFO_ARRAY_STRING, "ARRAY"); + put(INFO_ARRAY_INT, "ARRAY"); + put(INFO_ARRAY_JSON, "ARRAY"); + put(INFO_MAP, "VARIANT"); + put(RECORD_METADATA, "VARIANT"); + } + }; + + private String table; + private SnowflakeConnectionService conn; + private String topic; + private TopicPartition topicPartition; + + private SnowflakeSinkService service; + + @BeforeEach + void before() { + table = TestUtils.randomTableName(); + topic = table; + conn = TestUtils.getConnectionServiceForStreaming(); + topicPartition = new TopicPartition(topic, PARTITION); + } + + @AfterEach + void after() { + service.closeAll(); + } + + @ParameterizedTest(name = "useSingleBuffer: {0}") + @MethodSource("singleBufferParameters") + public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer) + throws Exception { + // given + conn.createTableWithOnlyMetadataColumn(table); + SinkRecord avroRecordValue = createSinkRecord(); + service = createService(useSingleBuffer); + + // when + // The first insert should fail and schema evolution will kick in to update the schema + service.insert(Collections.singletonList(avroRecordValue)); + + // then + waitUntilOffsetEquals(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + TestUtils.checkTableSchema(table, EXPECTED_AVRO_SCHEMA); + + // when + // Retry the insert should succeed now with the updated schema + service.insert(Collections.singletonList(avroRecordValue)); + + // then + waitUntilOffsetEquals(START_OFFSET + 1); + + Map actual = getTableContentOneRow(topic); + Assertions.assertEquals(actual.get(ID_INT8), 0L); + Assertions.assertNull(actual.get(ID_INT8_OPTIONAL)); + Assertions.assertEquals(actual.get(ID_INT16), 42L); + Assertions.assertEquals(actual.get(ID_INT32), 42L); + Assertions.assertEquals(actual.get(ID_INT64), 42L); + Assertions.assertEquals(actual.get(FIRST_NAME), "zekai"); + Assertions.assertEquals(actual.get(RATING_FLOAT32), 0.99); + Assertions.assertEquals( + actual.get(FLOAT_NAN), Double.NaN); // float is extended to double on SF side + Assertions.assertEquals(actual.get(RATING_FLOAT64), 0.99); + Assertions.assertEquals(actual.get(APPROVAL), true); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_ARRAY_STRING).toString()), "[\"a\",\"b\"]"); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_ARRAY_INT).toString()), "[1,2]"); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_ARRAY_JSON).toString()), + "[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]"); + Assertions.assertEquals( + StringUtils.deleteWhitespace(actual.get(INFO_MAP).toString()), "{\"field\":3}"); + } + + private SnowflakeSinkService createService(boolean useSingleBuffer) { + Map config = prepareConfig(useSingleBuffer); + return SnowflakeSinkServiceFactory.builder( + conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(table, new TopicPartition(topic, PARTITION)) + .build(); + } + + private SinkRecord createSinkRecord() { + Schema schema = prepareSchema(); + Struct data = prepareData(schema); + AvroConverter avroConverter = prepareAvroConverter(); + + byte[] converted = avroConverter.fromConnectData(topic, data.schema(), data); + conn.createTableWithOnlyMetadataColumn(table); + + SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); + + return new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + avroInputValue.schema(), + avroInputValue.value(), + START_OFFSET); + } + + private AvroConverter prepareAvroConverter() { + SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); + AvroConverter avroConverter = new AvroConverter(schemaRegistry); + avroConverter.configure( + Collections.singletonMap("schema.registry.url", "http://fake-url"), false); + return avroConverter; + } + + private Map prepareConfig(boolean useSingleBuffer) { + Map config = TestUtils.getConfForStreaming(useSingleBuffer); + config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); + config.put( + SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, + "io.confluent.connect.avro.AvroConverter"); + config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url"); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + return config; + } + + private Schema prepareSchema() { + SchemaBuilder schemaBuilder = + SchemaBuilder.struct() + .field(ID_INT8, Schema.INT8_SCHEMA) + .field(ID_INT8_OPTIONAL, Schema.OPTIONAL_INT8_SCHEMA) + .field(ID_INT16, Schema.INT16_SCHEMA) + .field(ID_INT32, Schema.INT32_SCHEMA) + .field(ID_INT64, Schema.INT64_SCHEMA) + .field(FIRST_NAME, Schema.STRING_SCHEMA) + .field(RATING_FLOAT32, Schema.FLOAT32_SCHEMA) + .field(FLOAT_NAN, Schema.FLOAT32_SCHEMA) + .field(RATING_FLOAT64, Schema.FLOAT64_SCHEMA) + .field(APPROVAL, Schema.BOOLEAN_SCHEMA) + .field(INFO_ARRAY_STRING, SchemaBuilder.array(Schema.STRING_SCHEMA).build()) + .field(INFO_ARRAY_INT, SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field(INFO_ARRAY_JSON, SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) + .field(INFO_MAP, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build()); + return schemaBuilder.build(); + } + + private Struct prepareData(Schema schema) { + return new Struct(schema) + .put(ID_INT8, (byte) 0) + .put(ID_INT16, (short) 42) + .put(ID_INT32, 42) + .put(ID_INT64, 42L) + .put(FIRST_NAME, "zekai") + .put(RATING_FLOAT32, 0.99f) + .put(FLOAT_NAN, Float.NaN) + .put(RATING_FLOAT64, 0.99d) + .put(APPROVAL, true) + .put(INFO_ARRAY_STRING, Arrays.asList("a", "b")) + .put(INFO_ARRAY_INT, Arrays.asList(1, 2)) + .put( + INFO_ARRAY_JSON, + Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}")) + .put(INFO_MAP, Collections.singletonMap("field", 3)); + } + + private static Stream singleBufferParameters() { + return Stream.of(Arguments.of(false), Arguments.of(true)); + } + + private void waitUntilOffsetEquals(long expectedOffset) { + await() + .timeout(Duration.ofSeconds(60)) + .until(() -> service.getOffset(new TopicPartition(topic, PARTITION)) == expectedOffset); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index 726b0ac29..3f23a4fa6 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1256,105 +1256,6 @@ public void testStreamingIngestionWithExactlyOnceSemanticsOverlappingOffsets( service2.closeAll(); } - @ParameterizedTest(name = "useSingleBuffer: {0}") - @MethodSource("singleBufferParameters") - public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer) - throws Exception { - conn = getConn(false); - Map config = TestUtils.getConfForStreaming(useSingleBuffer); - config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); - config.put( - SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, - "io.confluent.connect.avro.AvroConverter"); - config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url"); - // get rid of these at the end - SnowflakeSinkConnectorConfig.setDefaultValues(config); - // avro - SchemaBuilder schemaBuilder = - SchemaBuilder.struct() - .field("id_int8", Schema.INT8_SCHEMA) - .field("id_int8_optional", Schema.OPTIONAL_INT8_SCHEMA) - .field("id_int16", Schema.INT16_SCHEMA) - .field("ID_INT32", Schema.INT32_SCHEMA) - .field("id_int64", Schema.INT64_SCHEMA) - .field("first_name", Schema.STRING_SCHEMA) - .field("rating_float32", Schema.FLOAT32_SCHEMA) - .field("rating_float64", Schema.FLOAT64_SCHEMA) - .field("approval", Schema.BOOLEAN_SCHEMA) - .field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build()) - .field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) - .field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) - .field( - "info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build()); - - Struct original = - new Struct(schemaBuilder.build()) - .put("id_int8", (byte) 0) - .put("id_int16", (short) 42) - .put("ID_INT32", 42) - .put("id_int64", 42L) - .put("first_name", "zekai") - .put("rating_float32", 0.99f) - .put("rating_float64", 0.99d) - .put("approval", true) - .put("info_array_string", Arrays.asList("a", "b")) - .put("info_array_int", Arrays.asList(1, 2)) - .put( - "info_array_json", - Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}")) - .put("info_map", Collections.singletonMap("field", 3)); - - SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); - AvroConverter avroConverter = new AvroConverter(schemaRegistry); - avroConverter.configure( - Collections.singletonMap("schema.registry.url", "http://fake-url"), false); - byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); - - SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); - - long startOffset = 0; - - SinkRecord avroRecordValue = - new SinkRecord( - topic, - partition, - Schema.STRING_SCHEMA, - "test", - avroInputValue.schema(), - avroInputValue.value(), - startOffset); - - SnowflakeSinkService service = - SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .setRecordNumber(1) - .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) - .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) - .addTask(table, new TopicPartition(topic, partition)) - .build(); - - // The first insert should fail and schema evolution will kick in to update the schema - service.insert(Collections.singletonList(avroRecordValue)); - TestUtils.assertWithRetry( - () -> - service.getOffset(new TopicPartition(topic, partition)) - == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, - 20, - 5); - - TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION); - - // Retry the insert should succeed now with the updated schema - service.insert(Collections.singletonList(avroRecordValue)); - TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5); - - TestUtils.checkTableContentOneRow( - table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION); - - service.closeAll(); - } - @ParameterizedTest(name = "useSingleBuffer: {0}") @MethodSource("singleBufferParameters") public void testSchematizationWithTableCreationAndJsonInput(boolean useSingleBuffer) diff --git a/test/run_test_confluent.sh b/test/run_test_confluent.sh index 46db5b043..e3e584e44 100755 --- a/test/run_test_confluent.sh +++ b/test/run_test_confluent.sh @@ -204,6 +204,5 @@ if [ $testError -ne 0 ]; then RED='\033[0;31m' NC='\033[0m' # No Color echo -e "${RED} There is error above this line ${NC}" - cat $APACHE_LOG_PATH/kc.log error_exit "=== test_verify.py failed ===" fi diff --git a/test/test_suit/test_avrosr_avrosr.py b/test/test_suit/test_avrosr_avrosr.py index a8db5bfa9..0ecd5cf8d 100644 --- a/test/test_suit/test_avrosr_avrosr.py +++ b/test/test_suit/test_avrosr_avrosr.py @@ -25,7 +25,9 @@ def __init__(self, driver, nameSalt): "fields":[ {"name":"id","type":"int"}, {"name":"firstName","type":"string"}, - {"name":"time","type":"int"} + {"name":"time","type":"int"}, + {"name":"someFloat","type":"float"}, + {"name":"someFloatNaN","type":"float"} ] } """ @@ -41,7 +43,7 @@ def send(self): for e in range(100): # avro data must follow the schema defined in ValueSchemaStr key.append({"id": e}) - value.append({"id": e, "firstName": "abc0", "time": 1835}) + value.append({"id": e, "firstName": "abc0", "time": 1835, "someFloat": 21.37, "someFloatNaN": "NaN"}) self.driver.sendAvroSRData( self.topic, value, self.valueSchema, key, self.keySchema) @@ -58,7 +60,7 @@ def verify(self, round): "Select * from {} limit 1".format(self.topic)).fetchone() goldMeta = r'{"CreateTime":\d*,"key":{"id":0},"key_schema_id":\d*,"offset":0,"partition":0,"schema_id":\d*,' \ r'"topic":"travis_correct_avrosr_avrosr_\w*"}' - goldContent = r'{"firstName":"abc0","id":0,"time":1835}' + goldContent = r'{"firstName":"abc0","id":0,"someFloat":21.37,"someFloatNaN":"NaN","time":1835}' self.driver.regexMatchOneLine(res, goldMeta, goldContent) self.driver.verifyStageIsCleaned(self.topic) diff --git a/test/test_suit/test_schema_evolution_avro_sr.py b/test/test_suit/test_schema_evolution_avro_sr.py index bf328325f..cce02574a 100644 --- a/test/test_suit/test_schema_evolution_avro_sr.py +++ b/test/test_suit/test_schema_evolution_avro_sr.py @@ -32,7 +32,8 @@ def __init__(self, driver, nameSalt): self.records.append({ 'PERFORMANCE_STRING': 'Excellent', 'RATING_DOUBLE': 0.99, - 'APPROVAL': True + 'APPROVAL': True, + 'SOME_FLOAT_NAN': "NaN" }) self.ValueSchemaStr = [] @@ -56,7 +57,8 @@ def __init__(self, driver, nameSalt): "fields":[ {"name":"RATING_DOUBLE","type":"float"}, {"name":"PERFORMANCE_STRING","type":"string"}, - {"name":"APPROVAL","type":"boolean"} + {"name":"APPROVAL","type":"boolean"}, + {"name":"SOME_FLOAT_NAN","type":"float"} ] } """) @@ -67,7 +69,8 @@ def __init__(self, driver, nameSalt): 'RATING_INT': 'NUMBER', 'RATING_DOUBLE': 'FLOAT', 'APPROVAL': 'BOOLEAN', - 'RECORD_METADATA': 'VARIANT' + 'SOME_FLOAT_NAN': 'FLOAT', + 'RECORD_METADATA': 'VARIANT', } self.gold_columns = [columnName for columnName in self.gold_type] diff --git a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py index e7310770d..25c087339 100644 --- a/test/test_suit/test_snowpipe_streaming_string_avro_sr.py +++ b/test/test_suit/test_snowpipe_streaming_string_avro_sr.py @@ -21,7 +21,9 @@ def __init__(self, driver, nameSalt): "fields":[ {"name":"id","type":"int"}, {"name":"firstName","type":"string"}, - {"name":"time","type":"int"} + {"name":"time","type":"int"}, + {"name":"someFloat","type":"float"}, + {"name":"someFloatNaN","type":"float"} ] } """ @@ -34,19 +36,13 @@ def getConfigFileName(self): return self.fileName + ".json" def send(self): - # create topic with n partitions and only one replication factor - print("Partition count:" + str(self.partitionNum)) - print("Topic:", self.topic) - - self.driver.describeTopic(self.topic) for p in range(self.partitionNum): print("Sending in Partition:" + str(p)) key = [] value = [] - value = [] for e in range(self.recordNum): - value.append({"id": e, "firstName": "abc0", "time": 1835}) + value.append({"id": e, "firstName": "abc0", "time": 1835, "someFloat": 21.37, "someFloatNaN": "NaN"}) self.driver.sendAvroSRData(self.topic, value, self.valueSchema, key=[], key_schema="", partition=p) sleep(2) diff --git a/test/test_suites.py b/test/test_suites.py index eae8d0264..5b86f31d6 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -164,7 +164,7 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSnowpipeStreamingStringJsonDLQ(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSnowpipeStreamingStringAvro", EndToEndTestSuite( + ("TestSnowpipeStreamingStringAvroSR", EndToEndTestSuite( test_instance=TestSnowpipeStreamingStringAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False )), From b07c9e4d87ec3f0205866b6d29564d700e5a7b0b Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 16 Sep 2024 13:30:41 -0700 Subject: [PATCH 6/9] NO-SNOW Release 2.4.1 (#923) --- pom.xml | 6 +++--- pom_confluent.xml | 6 +++--- src/main/java/com/snowflake/kafka/connector/Utils.java | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 2521ce5c1..adbf4853e 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ @@ -12,7 +12,7 @@ com.snowflake snowflake-kafka-connector - 2.4.0 + 2.4.1 jar Snowflake Kafka Connector Snowflake Kafka Connect Sink Connector @@ -361,7 +361,7 @@ net.snowflake snowflake-ingest-sdk - 2.2.0 + 2.2.2 net.snowflake diff --git a/pom_confluent.xml b/pom_confluent.xml index b3e182a67..b745b9a72 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -1,7 +1,7 @@ @@ -12,7 +12,7 @@ com.snowflake snowflake-kafka-connector - 2.4.0 + 2.4.1 jar Snowflake Kafka Connector Snowflake Kafka Connect Sink Connector @@ -502,7 +502,7 @@ net.snowflake snowflake-ingest-sdk - 2.2.0 + 2.2.2 net.snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index c0d021902..e82729613 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Snowflake Inc. All rights reserved. + * Copyright (c) 2024 Snowflake Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance @@ -65,7 +65,7 @@ public class Utils { // Connector version, change every release - public static final String VERSION = "2.4.0"; + public static final String VERSION = "2.4.1"; // connector parameter list public static final String NAME = "name"; From 493c23e86bd8d46798611559b3b3c139ff7bd966 Mon Sep 17 00:00:00 2001 From: wrehman-skap Date: Wed, 23 Oct 2024 11:00:37 +0100 Subject: [PATCH 7/9] ref: ENG-970/SF Schema Issue + schema auto config --- .../connector/SnowflakeSinkConnector.java | 24 ++++++++++--------- .../kafka/connector/SnowflakeSinkTask.java | 7 ++++-- .../com/snowflake/kafka/connector/Utils.java | 1 + .../SnowflakeConnectionServiceV1.java | 2 +- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java index b717d4d45..ef45c7f32 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnector.java @@ -294,18 +294,20 @@ public Config validate(Map connectorConfigs) { return result; } - // Disabling config validation due to ENG-789/Auto create schema feature for Append SF connector - /*try { - testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA)); - } catch (SnowflakeKafkaConnectorException e) { - LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode()); - if (e.getCode().equals("2001")) { - Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist"); - } else { - throw e; + boolean createSchemaAuto = Boolean.parseBoolean(connectorConfigs.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false")); + if(!createSchemaAuto) { + try { + testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA)); + } catch (SnowflakeKafkaConnectorException e) { + LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode()); + if (e.getCode().equals("2001")) { + Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist"); + } else { + throw e; + } + return result; } - return result; - }*/ + } LOGGER.info("Validated config with no error"); return result; diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index bf9854f23..e07fa5b33 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -229,8 +229,11 @@ public void start(final Map parsedConfig) { .setErrorReporter(kafkaRecordErrorReporter) .setSinkTaskContext(this.context) .build(); - createSchemaIfNotExists(getConnection(), - parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA)); + + if(Boolean.parseBoolean(parsedConfig.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false"))) { + createSchemaIfNotExists(getConnection(), + parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA)); + } this.streamkapQueryTemplate = StreamkapQueryTemplate.buildStreamkapQueryTemplateFromConfig(parsedConfig); DYNAMIC_LOGGER.info( diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index ee61733d9..289159bbc 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -136,6 +136,7 @@ public class Utils { public static final String TOPICS_MAP_CONF = "topics.config.map"; public static final String SCHEMA_CHANGE_CHECK_MS = "schema.changes.check.interval.ms"; public static final String APPLY_DYNAMIC_TABLE_SCRIPT_CONF = "apply.dynamic.table.script"; + public static final String CREATE_SCHEMA_AUTO = "create.schema.auto"; private static final KCLogger LOGGER = new KCLogger(Utils.class.getName()); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 05d236d38..dfdd91fe3 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -260,7 +260,7 @@ public void createStage(final String stageName) { public boolean schemaExist(final String schemaName) { checkConnection(); InternalUtils.assertNotEmpty("schemaName", schemaName); - String query = "desc schema identifier(?)"; + String query = "use schema identifier(?)"; PreparedStatement stmt = null; boolean exist; try { From fc3154d0391447965bfd337232e6d4a86e7a391b Mon Sep 17 00:00:00 2001 From: wrehman-skap Date: Tue, 19 Nov 2024 10:00:35 +0000 Subject: [PATCH 8/9] Fixing some upgrading issues --- .../com/snowflake/kafka/connector/internal/InternalUtils.java | 2 +- .../streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java index 87188a129..3dd501d2b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/InternalUtils.java @@ -60,7 +60,7 @@ static void assertNotEmpty(String name, Object value) { if (value == null || (value instanceof String && value.toString().isEmpty())) { switch (name.toLowerCase()) { case "schemaname": - throw SnowflakeErrors.ERROR_0031.getException(); + throw SnowflakeErrors.ERROR_S0031.getException(); case "tablename": throw SnowflakeErrors.ERROR_0005.getException(); case "stagename": diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java index 93a6f3bc0..f503f2083 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java @@ -100,7 +100,7 @@ void after() { public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer) throws Exception { // given - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SinkRecord avroRecordValue = createSinkRecord(); service = createService(useSingleBuffer); @@ -159,7 +159,7 @@ private SinkRecord createSinkRecord() { AvroConverter avroConverter = prepareAvroConverter(); byte[] converted = avroConverter.fromConnectData(topic, data.schema(), data); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); From 62709b72f5ac04d52a2ef80b411c50d97f627224 Mon Sep 17 00:00:00 2001 From: wrehman-skap Date: Tue, 19 Nov 2024 15:26:02 +0000 Subject: [PATCH 9/9] upgrade to v2.4.1 - fixing SnowflakeStreamkapSinkIT unit test --- pom.xml | 7 +++++++ .../kafka/connector/SnowflakeStreamkapSinkIT.java | 10 ++++++++++ 2 files changed, 17 insertions(+) diff --git a/pom.xml b/pom.xml index 83b55bb1b..d508c9a40 100644 --- a/pom.xml +++ b/pom.xml @@ -572,6 +572,13 @@ + + com.streamkap + streamkap-kafka-connect-utilities + 0.0.1 + test-jar + test + org.apache.kafka connect-json diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java index f58a8aec8..6fb9f254f 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamkapSinkIT.java @@ -152,6 +152,16 @@ protected String getSchemaName() { return SCHEMA_NAME; } + @Override + protected String getIntColumnName(String colSuffix){ + return "IntColumn"+colSuffix; + } + + @Override + protected String getJsonColumnName(String jsonColumn, String jsonAttribute){ + return jsonColumn+":'"+ jsonAttribute +"'"; + } + @Override protected SinkRecord applyTransforms(SinkRecord record) { return renameAmbigiousFields.apply(record);