diff --git a/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java index ecc8c8ff8..670ec7508 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java @@ -4,7 +4,6 @@ import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING; import com.google.common.collect.ImmutableMap; -import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; import java.util.HashMap; @@ -12,9 +11,6 @@ /** Validates dependencies between parameters in Iceberg mode. */ public class IcebergConfigValidator implements StreamingConfigValidator { - - private static final String NO_SCHEMATIZATION_ERROR_MESSAGE = - "Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true"; private static final String INCOMPATIBLE_INGESTION_METHOD = "Ingestion to Iceberg table is supported only for Snowpipe Streaming"; @@ -28,15 +24,9 @@ public ImmutableMap validate(Map inputConfig) { Map validationErrors = new HashMap<>(); - boolean isSchematizationEnabled = - Boolean.parseBoolean( - inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); IngestionMethodConfig ingestionMethod = IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase()); - if (!isSchematizationEnabled) { - validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE); - } if (ingestionMethod != SNOWPIPE_STREAMING) { validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index 852a8d84a..b9f81d94e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -326,6 +326,13 @@ ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( */ void initializeMetadataColumnTypeForIceberg(String tableName); + /** + * Add the RECORD_METADATA column to the iceberg table if it does not exist. + * + * @param tableName iceberg table name + */ + void addMetadataColumnForIcebergIfNotExists(String tableName); + /** * Calls describe table statement and returns all columns and corresponding types. * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 80f5ddab2..d876afc3d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -2,6 +2,7 @@ import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; +import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -164,23 +165,35 @@ public void createTableWithOnlyMetadataColumn(final String tableName) { LOGGER.info("Created table {} with only RECORD_METADATA column", tableName); } + @Override + public void addMetadataColumnForIcebergIfNotExists(String tableName) { + checkConnection(); + InternalUtils.assertNotEmpty("tableName", tableName); + String query = + "ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA " + + ICEBERG_METADATA_OBJECT_SCHEMA; + try { + PreparedStatement stmt = conn.prepareStatement(query); + stmt.setString(1, tableName); + stmt.execute(); + stmt.close(); + } catch (SQLException e) { + LOGGER.error( + "Couldn't alter table {} add RECORD_METADATA column to align with iceberg format", + tableName); + throw SnowflakeErrors.ERROR_2019.getException(e); + } + LOGGER.info( + "alter table {} add RECORD_METADATA column to align with iceberg format", tableName); + } + @Override public void initializeMetadataColumnTypeForIceberg(String tableName) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); String query = - "ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE OBJECT(" - + "offset INTEGER," - + "topic STRING," - + "partition INTEGER," - + "key STRING," - + "schema_id INTEGER," - + "key_schema_id INTEGER," - + "CreateTime BIGINT," - + "LogAppendTime BIGINT," - + "SnowflakeConnectorPushTime BIGINT," - + "headers MAP(VARCHAR, VARCHAR)" - + ")"; + "ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE " + + ICEBERG_METADATA_OBJECT_SCHEMA; try { PreparedStatement stmt = conn.prepareStatement(query); stmt.setString(1, tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 8771186b0..4406cd608 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -220,8 +220,12 @@ public enum SnowflakeErrors { ERROR_2018( "2018", "Failed to alter RECORD_METADATA column type for iceberg", - "Failed to alter RECORD_METADATA column type to required format for iceberg, please check" - + " column exists."), + "Failed to alter RECORD_METADATA column type to required format for iceberg."), + ERROR_2019( + "2019", + "Failed to add RECORD_METADATA column for iceberg", + "Failed to add RECORD_METADATA column with required format for iceberg."), + // Snowpipe related issues 3--- ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"), diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index d0f1a0762..f783aa879 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -274,7 +274,8 @@ public void startPartitions( private void perTopicActionsOnStartPartitions(String topic, Map topic2Table) { String tableName = Utils.tableName(topic, topic2Table); if (Utils.isIcebergEnabled(connectorConfig)) { - icebergTableSchemaValidator.validateTable(tableName, Utils.role(connectorConfig)); + icebergTableSchemaValidator.validateTable( + tableName, Utils.role(connectorConfig), enableSchematization); icebergInitService.initializeIcebergTableProperties(tableName); } else { createTableIfNotExists(tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java new file mode 100644 index 000000000..d7b4dfa4e --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java @@ -0,0 +1,18 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +public class IcebergDDLTypes { + + public static String ICEBERG_METADATA_OBJECT_SCHEMA = + "OBJECT(" + + "offset INTEGER," + + "topic STRING," + + "partition INTEGER," + + "key STRING," + + "schema_id INTEGER," + + "key_schema_id INTEGER," + + "CreateTime BIGINT," + + "LogAppendTime BIGINT," + + "SnowflakeConnectorPushTime BIGINT," + + "headers MAP(VARCHAR, VARCHAR)" + + ")"; +} diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java index e86606b26..67742b0af 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java @@ -15,6 +15,7 @@ public IcebergInitService(SnowflakeConnectionService snowflakeConnectionService) public void initializeIcebergTableProperties(String tableName) { LOGGER.info("Initializing properties for Iceberg table: {}", tableName); + snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName); snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName); } } diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index e2f73a206..2cfa3db98 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; import com.snowflake.kafka.connector.internal.DescribeTableRow; @@ -7,6 +8,7 @@ import com.snowflake.kafka.connector.internal.SnowflakeErrors; import java.util.List; import java.util.Objects; +import java.util.Optional; /** Performs validations of Iceberg table schema on the connector startup. */ public class IcebergTableSchemaValidator { @@ -24,29 +26,54 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio * * @param tableName table to be validated * @param role role used for validation + * @param schemaEvolutionEnabled whether schema evolution is enabled */ - public void validateTable(String tableName, String role) { + public void validateTable(String tableName, String role, boolean schemaEvolutionEnabled) { List columns = snowflakeConnectionService .describeTable(tableName) .orElseThrow(() -> SnowflakeErrors.ERROR_0032.getException("table_not_found")); - DescribeTableRow metadata = + Optional metadata = columns.stream() .filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA)) - .findFirst() - .orElseThrow( - () -> SnowflakeErrors.ERROR_0032.getException("record_metadata_not_found")); + .findFirst(); + + metadata.ifPresent( + m -> { + // if metadata column exists it must be of type OBJECT(), if not exists we create on our + // own this column + if (!isOfStructuredObjectType(m)) { + throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + } + }); - if (!isOfStructuredObjectType(metadata)) { - throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + if (schemaEvolutionEnabled) { + validateSchemaEvolutionScenario(tableName, role); + } else { + validateNoSchemaEvolutionScenario(columns); } + } + private void validateSchemaEvolutionScenario(String tableName, String role) { if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled"); } } + private static void validateNoSchemaEvolutionScenario(List columns) { + DescribeTableRow recordContent = + columns.stream() + .filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_CONTENT)) + .findFirst() + .orElseThrow( + () -> SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found")); + + if (!isOfStructuredObjectType(recordContent)) { + throw SnowflakeErrors.ERROR_0032.getException("invalid_record_content_type"); + } + } + private static boolean isOfStructuredObjectType(DescribeTableRow metadata) { return metadata.getType().startsWith(SF_STRUCTURED_OBJECT); } diff --git a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java index 421555428..6bcd570d0 100644 --- a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java +++ b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java @@ -1,6 +1,5 @@ package com.snowflake.kafka.connector.config; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; import com.google.common.collect.ImmutableMap; @@ -40,7 +39,11 @@ public void shouldReturnErrorOnInvalidConfig(Map config, String public static Stream validConfigs() { return Stream.of( Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()), - Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build())); + Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()), + Arguments.of( + SnowflakeSinkConnectorConfigBuilder.icebergConfig() + .withSchematizationEnabled(false) + .build())); } public static Stream invalidConfigs() { @@ -49,11 +52,6 @@ public static Stream invalidConfigs() { SnowflakeSinkConnectorConfigBuilder.icebergConfig() .withIngestionMethod(IngestionMethodConfig.SNOWPIPE) .build(), - INGESTION_METHOD_OPT), - Arguments.of( - SnowflakeSinkConnectorConfigBuilder.icebergConfig() - .withSchematizationEnabled(false) - .build(), - ENABLE_SCHEMATIZATION_CONFIG)); + INGESTION_METHOD_OPT)); } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java index 00d15edfb..7570ea973 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java @@ -27,6 +27,11 @@ protected static void createIcebergTable(String tableName) { createIcebergTableWithColumnClause(tableName, "record_metadata object()"); } + protected static void createIcebergTableNoSchemaEvolution(String tableName) { + createIcebergTableWithColumnClause( + tableName, "record_metadata object(), record_content object()"); + } + protected static void createIcebergTableWithColumnClause(String tableName, String columnClause) { String query = "create or replace iceberg table identifier(?) (" diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java index ce274a520..7f70f20c8 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java @@ -64,12 +64,26 @@ void shouldThrowExceptionWhenTableDoesNotExist() { } @Test - void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + void shouldCreateMetadataWhenColumnNotExists() { // given createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); - // expect - assertThatThrownBy(() -> icebergInitService.initializeIcebergTableProperties(tableName)) - .isInstanceOf(SnowflakeKafkaConnectorException.class); + // when + icebergInitService.initializeIcebergTableProperties(tableName); + + // then + assertThat(describeRecordMetadataType(tableName)) + .isEqualTo( + "OBJECT(offset NUMBER(10,0), " + + "topic VARCHAR(16777216), " + + "partition NUMBER(10,0), " + + "key VARCHAR(16777216), " + + "schema_id NUMBER(10,0), " + + "key_schema_id NUMBER(10,0), " + + "CreateTime NUMBER(19,0), " + + "LogAppendTime NUMBER(19,0), " + + "SnowflakeConnectorPushTime NUMBER(19,0), " + + "headers MAP(VARCHAR(16777216), " + + "VARCHAR(16777216)))"); } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java index b8d7cb576..3fa035421 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java @@ -15,6 +15,8 @@ class IcebergInitServiceTest { @Test void testInitializeIcebergTableProperties() { icebergInitService.initializeIcebergTableProperties("test_table"); + + verify(mockConnection).addMetadataColumnForIcebergIfNotExists("test_table"); verify(mockConnection).initializeMetadataColumnTypeForIceberg("test_table"); verifyNoMoreInteractions(mockConnection); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index 9c4250740..2f659e5e0 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -6,7 +6,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { @@ -33,44 +36,100 @@ public void tearDown() { dropIcebergTable(tableName); } - @Test - public void shouldValidateExpectedIcebergTableSchema() { - // given - createIcebergTable(tableName); - enableSchemaEvolution(tableName); + @Nested + class SchemaEvolutionEnabled { + public static final boolean SCHEMA_EVOLUTION = true; - // when, then - schemaValidator.validateTable(tableName, TEST_ROLE); - } + @Test + public void shouldValidateExpectedIcebergTableSchema() { + // given + createIcebergTable(tableName); + enableSchemaEvolution(tableName); - @Test - public void shouldThrowExceptionWhenTableDoesNotExist() { - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); - } + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } - @Test - public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { - // given - createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); - enableSchemaEvolution(tableName); + @Test + public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); + enableSchemaEvolution(tableName); - // expect - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + // expect + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // given + createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); + enableSchemaEvolution(tableName); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } } - @Test - public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { - // given - createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); - enableSchemaEvolution(tableName); + @Nested + class SchemaEvolutionNotEnabled { + public static final boolean SCHEMA_EVOLUTION = false; + + @Test + public void shouldValidateExpectedIcebergTableSchema() { + // given + createIcebergTableNoSchemaEvolution(tableName); + + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldThrowExceptionWhenRecordContentDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldThrowExceptionWhenRecordContentHasInvalidType() { + // given + createIcebergTableWithColumnClause(tableName, "record_content MAP(VARCHAR, VARCHAR)"); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "record_content object()"); + + // expect + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + } - // expect + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldThrowExceptionWhenTableDoesNotExist(boolean schemaEvolution) { Assertions.assertThrows( SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + () -> schemaValidator.validateTable(tableName, TEST_ROLE, schemaEvolution)); } }