Skip to content

Commit

Permalink
SNOW-1623269 Fail sink task on authorization exception from Snowflake (
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Aug 26, 2024
1 parent d55cd76 commit 56f96bb
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public class SnowflakeSinkConnectorConfig {
+ " format is deprecated and V1 will be used always, disabling this config could have"
+ " ramifications. Please consult Snowflake support before setting this to false.";

public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS =
"enable.task.fail.on.authorization.errors";
public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false;

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -714,7 +718,14 @@ static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY);
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY)
.define(
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS,
Type.BOOLEAN,
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT,
Importance.LOW,
"If set to true the Connector will fail its tasks when authorization error from"
+ " Snowflake occurred");
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public class SnowflakeSinkTask extends SinkTask {

private IngestionMethodConfig ingestionMethodConfig;

private final SnowflakeSinkTaskAuthorizationExceptionTracker authorizationExceptionTracker =
new SnowflakeSinkTaskAuthorizationExceptionTracker();

/** default constructor, invoked by kafka connect framework */
public SnowflakeSinkTask() {
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
Expand Down Expand Up @@ -156,6 +159,8 @@ public void start(final Map<String, String> parsedConfig) {
// generate topic to table map
this.topic2table = getTopicToTableMap(parsedConfig);

this.authorizationExceptionTracker.updateStateOnTaskStart(parsedConfig);

// generate metadataConfig table
SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);

Expand Down Expand Up @@ -294,6 +299,8 @@ public void close(final Collection<TopicPartition> partitions) {
*/
@Override
public void put(final Collection<SinkRecord> records) {
this.authorizationExceptionTracker.throwExceptionIfAuthorizationFailed();

final long recordSize = records.size();
if (enableRebalancing && recordSize > 0) {
processRebalancingTest();
Expand Down Expand Up @@ -345,6 +352,7 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
}
});
} catch (Exception e) {
this.authorizationExceptionTracker.reportPrecommitException(e);
this.DYNAMIC_LOGGER.error("PreCommit error: {} ", e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT;
import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_1005;

import java.util.Map;

/**
* When the user rotates Snowflake key that is stored in an external file the Connector hangs and
* does not mark its tasks as failed. To fix this corner case we need to track the authorization
* exception thrown during preCommit() and stop tasks during put().
*
* <p>Note that exceptions thrown during preCommit() are swallowed by Kafka Connect and will not
* cause task failure.
*/
public class SnowflakeSinkTaskAuthorizationExceptionTracker {

private static final String AUTHORIZATION_EXCEPTION_MESSAGE = "Authorization failed after retry";

private boolean authorizationTaskFailureEnabled;
private boolean authorizationErrorReported;

public SnowflakeSinkTaskAuthorizationExceptionTracker() {
this.authorizationTaskFailureEnabled = true;
this.authorizationErrorReported = false;
}

public void updateStateOnTaskStart(Map<String, String> taskConfig) {
authorizationTaskFailureEnabled =
Boolean.parseBoolean(
taskConfig.getOrDefault(
ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS,
Boolean.toString(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT)));
}

/**
* Check if the thrown exception is related to authorization
*
* @param ex - any exception that occurred during preCommit
*/
public void reportPrecommitException(Exception ex) {
if (ex.getMessage().contains(AUTHORIZATION_EXCEPTION_MESSAGE)) {
authorizationErrorReported = true;
}
}

/** Throw exception if authorization has failed before */
public void throwExceptionIfAuthorizationFailed() {
if (authorizationTaskFailureEnabled && authorizationErrorReported) {
throw ERROR_1005.getException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ public enum SnowflakeErrors {
"Either the current connection is closed or hasn't connect to snowflake" + " server"),
ERROR_1004(
"1004", "Fetching OAuth token fail", "Fail to get OAuth token from authorization server"),
ERROR_1005(
"1005",
"Task failed due to authorization error",
"Set `enable.task.fail.on.authorization.errors=false` to avoid this behavior"),
// SQL issues 2---
ERROR_2001(
"2001", "Failed to prepare SQL statement", "SQL Exception, reported by Snowflake JDBC"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS;

import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import com.snowflake.kafka.connector.internal.TestUtils;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class SnowflakeSinkTaskAuthorizationExceptionTrackerTest {

@Test
public void shouldThrowExceptionOnAuthorizationError() {
// given
SnowflakeSinkTaskAuthorizationExceptionTracker tracker =
new SnowflakeSinkTaskAuthorizationExceptionTracker();
Map<String, String> config = TestUtils.getConfig();
config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, "true");
tracker.updateStateOnTaskStart(config);

// when
tracker.reportPrecommitException(new Exception("Authorization failed after retry"));

// then
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class, tracker::throwExceptionIfAuthorizationFailed);
}

@Test
public void shouldNotThrowExceptionWhenNoExceptionReported() {
// given
SnowflakeSinkTaskAuthorizationExceptionTracker tracker =
new SnowflakeSinkTaskAuthorizationExceptionTracker();
Map<String, String> config = TestUtils.getConfig();
config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, "true");
tracker.updateStateOnTaskStart(config);

// expect
Assertions.assertDoesNotThrow(tracker::throwExceptionIfAuthorizationFailed);
}

@ParameterizedTest
@MethodSource("noExceptionConditions")
public void shouldNotThrowException(boolean enabled, String exceptionMessage) {
// given
SnowflakeSinkTaskAuthorizationExceptionTracker tracker =
new SnowflakeSinkTaskAuthorizationExceptionTracker();
Map<String, String> config = TestUtils.getConfig();
config.put(ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS, Boolean.toString(enabled));
tracker.updateStateOnTaskStart(config);

// when
tracker.reportPrecommitException(new Exception(exceptionMessage));

// then
Assertions.assertDoesNotThrow(tracker::throwExceptionIfAuthorizationFailed);
}

public static Stream<Arguments> noExceptionConditions() {
return Stream.of(
Arguments.of(false, "Authorization failed after retry"),
Arguments.of(true, "NullPointerException"));
}
}

0 comments on commit 56f96bb

Please sign in to comment.