From 56f96bb5e1b397ceb045103dbd24777d543f9b01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Bobowski?= <145468486+sfc-gh-mbobowski@users.noreply.github.com> Date: Mon, 26 Aug 2024 09:42:58 +0200 Subject: [PATCH] SNOW-1623269 Fail sink task on authorization exception from Snowflake (#916) --- .../SnowflakeSinkConnectorConfig.java | 13 +++- .../kafka/connector/SnowflakeSinkTask.java | 8 +++ ...SinkTaskAuthorizationExceptionTracker.java | 54 +++++++++++++++ .../connector/internal/SnowflakeErrors.java | 4 ++ ...TaskAuthorizationExceptionTrackerTest.java | 69 +++++++++++++++++++ 5 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java create mode 100644 src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 482963f5e..5c3c45335 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -214,6 +214,10 @@ public class SnowflakeSinkConnectorConfig { + " format is deprecated and V1 will be used always, disabling this config could have" + " ramifications. Please consult Snowflake support before setting this to false."; + public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS = + "enable.task.fail.on.authorization.errors"; + public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false; + // MDC logging header public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging"; public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging"; @@ -714,7 +718,14 @@ static ConfigDef newConfigDef() { CONNECTOR_CONFIG, 9, ConfigDef.Width.NONE, - ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY); + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY) + .define( + ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, + Type.BOOLEAN, + ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT, + Importance.LOW, + "If set to true the Connector will fail its tasks when authorization error from" + + " Snowflake occurred"); } public static class TopicToTableValidator implements ConfigDef.Validator { diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java index 66a06fcd7..ca516db10 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTask.java @@ -89,6 +89,9 @@ public class SnowflakeSinkTask extends SinkTask { private IngestionMethodConfig ingestionMethodConfig; + private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + /** default constructor, invoked by kafka connect framework */ public SnowflakeSinkTask() { DYNAMIC_LOGGER = new KCLogger(this.getClass().getName()); @@ -156,6 +159,8 @@ public void start(final Map parsedConfig) { // generate topic to table map this.topic2table = getTopicToTableMap(parsedConfig); + this.authorizationExceptionTracker.updateStateOnTaskStart(parsedConfig); + // generate metadataConfig table SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig); @@ -294,6 +299,8 @@ public void close(final Collection partitions) { */ @Override public void put(final Collection records) { + this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed(); + final long recordSize = records.size(); if (enableRebalancing && recordSize > 0) { processRebalancingTest(); @@ -345,6 +352,7 @@ public Map preCommit( } }); } catch (Exception e) { + this.authorizationExceptionTracker.reportPrecommitException(e); this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage()); } diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java new file mode 100644 index 000000000..b6931192a --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTracker.java @@ -0,0 +1,54 @@ +package com.snowflake.kafka.connector; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT; +import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_1005; + +import java.util.Map; + +/** + * When the user rotates Snowflake key that is stored in an external file the Connector hangs and + * does not mark its tasks as failed. To fix this corner case we need to track the authorization + * exception thrown during preCommit() and stop tasks during put(). + * + *

Note that exceptions thrown during preCommit() are swallowed by Kafka Connect and will not + * cause task failure. + */ +public class SnowflakeSinkTaskAuthorizationExceptionTracker { + + private static final String AUTHORIZATION_EXCEPTION_MESSAGE = "Authorization failed after retry"; + + private boolean authorizationTaskFailureEnabled; + private boolean authorizationErrorReported; + + public SnowflakeSinkTaskAuthorizationExceptionTracker() { + this.authorizationTaskFailureEnabled = true; + this.authorizationErrorReported = false; + } + + public void updateStateOnTaskStart(Map taskConfig) { + authorizationTaskFailureEnabled = + Boolean.parseBoolean( + taskConfig.getOrDefault( + ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, + Boolean.toString(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT))); + } + + /** + * Check if the thrown exception is related to authorization + * + * @param ex - any exception that occurred during preCommit + */ + public void reportPrecommitException(Exception ex) { + if (ex.getMessage().contains(AUTHORIZATION_EXCEPTION_MESSAGE)) { + authorizationErrorReported = true; + } + } + + /** Throw exception if authorization has failed before */ + public void throwExceptionIfAuthorizationFailed() { + if (authorizationTaskFailureEnabled && authorizationErrorReported) { + throw ERROR_1005.getException(); + } + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 14cbe2d51..aaffcb174 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -155,6 +155,10 @@ public enum SnowflakeErrors { "Either the current connection is closed or hasn't connect to snowflake" + " server"), ERROR_1004( "1004", "Fetching OAuth token fail", "Fail to get OAuth token from authorization server"), + ERROR_1005( + "1005", + "Task failed due to authorization error", + "Set `enable.task.fail.on.authorization.errors=false` to avoid this behavior"), // SQL issues 2--- ERROR_2001( "2001", "Failed to prepare SQL statement", "SQL Exception, reported by Snowflake JDBC"), diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java new file mode 100644 index 000000000..650bb5732 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskAuthorizationExceptionTrackerTest.java @@ -0,0 +1,69 @@ +package com.snowflake.kafka.connector; + +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS; + +import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; +import com.snowflake.kafka.connector.internal.TestUtils; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class SnowflakeSinkTaskAuthorizationExceptionTrackerTest { + + @Test + public void shouldThrowExceptionOnAuthorizationError() { + // given + SnowflakeSinkTaskAuthorizationExceptionTracker tracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + Map config = TestUtils.getConfig(); + config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, "true"); + tracker.updateStateOnTaskStart(config); + + // when + tracker.reportPrecommitException(new Exception("Authorization failed after retry")); + + // then + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, tracker::throwExceptionIfAuthorizationFailed); + } + + @Test + public void shouldNotThrowExceptionWhenNoExceptionReported() { + // given + SnowflakeSinkTaskAuthorizationExceptionTracker tracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + Map config = TestUtils.getConfig(); + config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, "true"); + tracker.updateStateOnTaskStart(config); + + // expect + Assertions.assertDoesNotThrow(tracker::throwExceptionIfAuthorizationFailed); + } + + @ParameterizedTest + @MethodSource("noExceptionConditions") + public void shouldNotThrowException(boolean enabled, String exceptionMessage) { + // given + SnowflakeSinkTaskAuthorizationExceptionTracker tracker = + new SnowflakeSinkTaskAuthorizationExceptionTracker(); + Map config = TestUtils.getConfig(); + config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, Boolean.toString(enabled)); + tracker.updateStateOnTaskStart(config); + + // when + tracker.reportPrecommitException(new Exception(exceptionMessage)); + + // then + Assertions.assertDoesNotThrow(tracker::throwExceptionIfAuthorizationFailed); + } + + public static Stream noExceptionConditions() { + return Stream.of( + Arguments.of(false, "Authorization failed after retry"), + Arguments.of(true, "NullPointerException")); + } +}