Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake v2.4.1 upgrade #26

Open
wants to merge 11 commits into
base: streamkap-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/dependabot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ updates:
- package-ecosystem: "maven" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
interval: "weekly"
ignore:
- dependency-name: "org.apache.kafka:*"
update-types: ["version-update:semver-major"]
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ /*
~ * Copyright (c) 2019 - 2020 Snowflake Computing Inc. All rights reserved.
~ * Copyright (c) 2019 - 2024 Snowflake Computing Inc. All rights reserved.
~ */
-->

Expand All @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>2.4.0</version>
<version>2.4.1</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -363,7 +363,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.2.0</version>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down Expand Up @@ -572,6 +572,13 @@
</dependency>

<!--Kafka JSON converter for SMT unit test-->
<dependency>
<groupId>com.streamkap</groupId>
<artifactId>streamkap-kafka-connect-utilities</artifactId>
<version>0.0.1</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions pom_confluent.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ /*
~ * Copyright (c) 2019 - 2020 Snowflake Computing Inc. All rights reserved.
~ * Copyright (c) 2019 - 2024 Snowflake Computing Inc. All rights reserved.
~ */
-->

Expand All @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>2.4.0</version>
<version>2.4.1</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -510,7 +510,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.2.0</version>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,20 @@ public Config validate(Map<String, String> connectorConfigs) {
return result;
}

// Disabling config validation due to ENG-789/Auto create schema feature for Append SF connector
/*try {
testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode());
if (e.getCode().equals("2001")) {
Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist");
} else {
throw e;
boolean createSchemaAuto = Boolean.parseBoolean(connectorConfigs.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false"));
if(!createSchemaAuto) {
try {
testConnection.schemaExists(connectorConfigs.get(Utils.SF_SCHEMA));
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error("Validate Error msg:{}, errorCode:{}", e.getMessage(), e.getCode());
if (e.getCode().equals("2001")) {
Utils.updateConfigErrorMessage(result, Utils.SF_SCHEMA, " schema does not exist");
} else {
throw e;
}
return result;
}
return result;
}*/
}

LOGGER.info("Validated config with no error");
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public class SnowflakeSinkConnectorConfig {
// JDBC trace Info (environment variable)
public static final String SNOWFLAKE_JDBC_TRACE = "JDBC_TRACE";

// JDBC properties map
public static final String SNOWFLAKE_JDBC_MAP = "snowflake.jdbc.map";

// Snowflake Metadata Flags
private static final String SNOWFLAKE_METADATA_FLAGS = "Snowflake Metadata Flags";
public static final String SNOWFLAKE_METADATA_CREATETIME = "snowflake.metadata.createtime";
Expand Down Expand Up @@ -218,6 +221,10 @@ public class SnowflakeSinkConnectorConfig {
+ " format is deprecated and V1 will be used always, disabling this config could have"
+ " ramifications. Please consult Snowflake support before setting this to false.";

public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS =
"enable.task.fail.on.authorization.errors";
public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false;

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -721,7 +728,14 @@ public static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY);
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY)
.define(
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS,
Type.BOOLEAN,
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT,
Importance.LOW,
"If set to true the Connector will fail its tasks when authorization error from"
+ " Snowflake occurred");
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public class SnowflakeSinkTask extends SinkTask {

private StreamkapQueryTemplate streamkapQueryTemplate = new StreamkapQueryTemplate();

private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker =
new SnowflakeSinkTaskAuthorizationExceptionTracker();

/** default constructor, invoked by kafka connect framework */
public SnowflakeSinkTask() {
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
Expand Down Expand Up @@ -159,6 +162,8 @@ public void start(final Map<String, String> parsedConfig) {
// generate topic to table map
this.topic2table = getTopicToTableMap(parsedConfig);

this.authorizationExceptionTracker.updateStateOnTaskStart(parsedConfig);

// generate metadataConfig table
SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);

Expand Down Expand Up @@ -229,8 +234,11 @@ public void start(final Map<String, String> parsedConfig) {
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(this.context)
.build();
createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));

if(Boolean.parseBoolean(parsedConfig.getOrDefault(Utils.CREATE_SCHEMA_AUTO,"false"))) {
createSchemaIfNotExists(getConnection(),
parsedConfig.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA));
}
this.streamkapQueryTemplate = StreamkapQueryTemplate.buildStreamkapQueryTemplateFromConfig(parsedConfig);

DYNAMIC_LOGGER.info(
Expand Down Expand Up @@ -306,6 +314,8 @@ public void close(final Collection<TopicPartition> partitions) {
*/
@Override
public void put(final Collection<SinkRecord> records) {
this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed();

final long recordSize = records.size();
if (enableRebalancing && recordSize > 0) {
processRebalancingTest();
Expand Down Expand Up @@ -359,6 +369,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
}
});
} catch (Exception e) {
this.authorizationExceptionTracker.reportPrecommitException(e);
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT;
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_1005;

import java.util.Map;

/**
* When the user rotates Snowflake key that is stored in an external file the Connector hangs and
* does not mark its tasks as failed. To fix this corner case we need to track the authorization
* exception thrown during preCommit() and stop tasks during put().
*
* <p>Note that exceptions thrown during preCommit() are swallowed by Kafka Connect and will not
* cause task failure.
*/
public class SnowflakeSinkTaskAuthorizationExceptionTracker {

private static final String AUTHORIZATION_EXCEPTION_MESSAGE = "Authorization failed after retry";

private boolean authorizationTaskFailureEnabled;
private boolean authorizationErrorReported;

public SnowflakeSinkTaskAuthorizationExceptionTracker() {
this.authorizationTaskFailureEnabled = true;
this.authorizationErrorReported = false;
}

public void updateStateOnTaskStart(Map<String, String> taskConfig) {
authorizationTaskFailureEnabled =
Boolean.parseBoolean(
taskConfig.getOrDefault(
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS,
Boolean.toString(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT)));
}

/**
* Check if the thrown exception is related to authorization
*
* @param ex - any exception that occurred during preCommit
*/
public void reportPrecommitException(Exception ex) {
if (ex.getMessage().contains(AUTHORIZATION_EXCEPTION_MESSAGE)) {
authorizationErrorReported = true;
}
}

/** Throw exception if authorization has failed before */
public void throwExceptionIfAuthorizationFailed() {
if (authorizationTaskFailureEnabled && authorizationErrorReported) {
throw ERROR_1005.getException();
}
}
}
5 changes: 3 additions & 2 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 Snowflake Inc. All rights reserved.
* Copyright (c) 2024 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
Expand Down Expand Up @@ -65,7 +65,7 @@
public class Utils {

// Connector version, change every release
public static final String VERSION = "2.4.0";
public static final String VERSION = "2.4.1";

// connector parameter list
public static final String NAME = "name";
Expand Down Expand Up @@ -136,6 +136,7 @@ public class Utils {
public static final String TOPICS_MAP_CONF = "topics.config.map";
public static final String SCHEMA_CHANGE_CHECK_MS = "schema.changes.check.interval.ms";
public static final String APPLY_DYNAMIC_TABLE_SCRIPT_CONF = "apply.dynamic.table.script";
public static final String CREATE_SCHEMA_AUTO = "create.schema.auto";

private static final KCLogger LOGGER = new KCLogger(Utils.class.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static void assertNotEmpty(String name, Object value) {
if (value == null || (value instanceof String && value.toString().isEmpty())) {
switch (name.toLowerCase()) {
case "schemaname":
throw SnowflakeErrors.ERROR_0031.getException();
throw SnowflakeErrors.ERROR_S0031.getException();
case "tablename":
throw SnowflakeErrors.ERROR_0005.getException();
case "stagename":
Expand Down Expand Up @@ -308,6 +308,17 @@ protected static Properties generateProxyParametersIfRequired(Map<String, String
return proxyProperties;
}

protected static Properties parseJdbcPropertiesMap(Map<String, String> conf) {
String jdbcConfigMapInput = conf.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_JDBC_MAP);
if (jdbcConfigMapInput == null) {
return new Properties();
}
Map<String, String> jdbcMap = Utils.parseCommaSeparatedKeyValuePairs(jdbcConfigMapInput);
Properties properties = new Properties();
properties.putAll(jdbcMap);
return properties;
}

/**
* convert ingest status to ingested file status
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.snowflake.kafka.connector.internal;

import java.util.Properties;

/** Wrapper class for all snowflake jdbc properties */
public class JdbcProperties {

/** All jdbc properties including proxyProperties */
private final Properties properties;
/** Proxy related properties */
private final Properties proxyProperties;

private JdbcProperties(Properties combinedProperties, Properties proxyProperties) {
this.properties = combinedProperties;
this.proxyProperties = proxyProperties;
}

public Properties getProperties() {
return properties;
}

public String getProperty(String key) {
return properties.getProperty(key);
}

public Object get(String key) {
return properties.get(key);
}

public Properties getProxyProperties() {
return proxyProperties;
}

/**
* Combine all jdbc related properties. Throws error if jdbcPropertiesMap overrides any property
* defined in connectionProperties or proxyProperties.
*
* @param connectionProperties snowflake.database.name, snowflake.schema,name,
* snowflake.private.key etc.
* @param proxyProperties jvm.proxy.xxx
* @param jdbcPropertiesMap snowflake.jdbc.map
*/
static JdbcProperties create(
Properties connectionProperties, Properties proxyProperties, Properties jdbcPropertiesMap) {
InternalUtils.assertNotEmpty("connectionProperties", connectionProperties);
proxyProperties = setEmptyIfNull(proxyProperties);
jdbcPropertiesMap = setEmptyIfNull(jdbcPropertiesMap);

Properties proxyAndConnection = mergeProperties(connectionProperties, proxyProperties);
detectOverrides(proxyAndConnection, jdbcPropertiesMap);

Properties combinedProperties = mergeProperties(proxyAndConnection, jdbcPropertiesMap);

return new JdbcProperties(combinedProperties, proxyProperties);
}

/** Test method */
static JdbcProperties create(Properties connectionProperties) {
return create(connectionProperties, new Properties(), new Properties());
}

private static void detectOverrides(Properties proxyAndConnection, Properties jdbcPropertiesMap) {
jdbcPropertiesMap.forEach(
(k, v) -> {
if (proxyAndConnection.containsKey(k)) {
throw SnowflakeErrors.ERROR_0031.getException("Duplicated property: " + k);
}
});
}

private static Properties mergeProperties(
Properties connectionProperties, Properties proxyProperties) {
Properties mergedProperties = new Properties();
mergedProperties.putAll(connectionProperties);
mergedProperties.putAll(proxyProperties);
return mergedProperties;
}

/** Parsing methods does not return null. However, It's better to be perfectly sure. */
private static Properties setEmptyIfNull(Properties properties) {
if (properties != null) {
return properties;
}
return new Properties();
}
}
Loading
Loading