From 0112d6a8af82f792bc99451f91047eab8f9f5252 Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Fri, 4 Oct 2024 16:50:17 +0200 Subject: [PATCH 1/5] Create metadata column if not exists --- .../internal/SnowflakeConnectionService.java | 7 ++ .../SnowflakeConnectionServiceV1.java | 37 ++++-- .../connector/internal/SnowflakeErrors.java | 8 +- .../streaming/SnowflakeSinkServiceV2.java | 3 +- .../streaming/iceberg/IcebergDDLTypes.java | 49 ++++++++ .../streaming/iceberg/IcebergInitService.java | 1 + .../iceberg/IcebergTableSchemaValidator.java | 37 ++++-- .../iceberg/IcebergInitServiceIT.java | 22 +++- .../iceberg/IcebergInitServiceTest.java | 2 + .../IcebergTableSchemaValidatorIT.java | 114 +++++++++++++----- 10 files changed, 222 insertions(+), 58 deletions(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index 852a8d84a..b9f81d94e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -326,6 +326,13 @@ ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( */ void initializeMetadataColumnTypeForIceberg(String tableName); + /** + * Add the RECORD_METADATA column to the iceberg table if it does not exist. + * + * @param tableName iceberg table name + */ + void addMetadataColumnForIcebergIfNotExists(String tableName); + /** * Calls describe table statement and returns all columns and corresponding types. * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 80f5ddab2..d876afc3d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -2,6 +2,7 @@ import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; +import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -164,23 +165,35 @@ public void createTableWithOnlyMetadataColumn(final String tableName) { LOGGER.info("Created table {} with only RECORD_METADATA column", tableName); } + @Override + public void addMetadataColumnForIcebergIfNotExists(String tableName) { + checkConnection(); + InternalUtils.assertNotEmpty("tableName", tableName); + String query = + "ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA " + + ICEBERG_METADATA_OBJECT_SCHEMA; + try { + PreparedStatement stmt = conn.prepareStatement(query); + stmt.setString(1, tableName); + stmt.execute(); + stmt.close(); + } catch (SQLException e) { + LOGGER.error( + "Couldn't alter table {} add RECORD_METADATA column to align with iceberg format", + tableName); + throw SnowflakeErrors.ERROR_2019.getException(e); + } + LOGGER.info( + "alter table {} add RECORD_METADATA column to align with iceberg format", tableName); + } + @Override public void initializeMetadataColumnTypeForIceberg(String tableName) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); String query = - "ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE OBJECT(" - + "offset INTEGER," - + "topic STRING," - + "partition INTEGER," - + "key STRING," - + "schema_id INTEGER," - + "key_schema_id INTEGER," - + "CreateTime BIGINT," - + "LogAppendTime BIGINT," - + "SnowflakeConnectorPushTime BIGINT," - + "headers MAP(VARCHAR, VARCHAR)" - + ")"; + "ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE " + + ICEBERG_METADATA_OBJECT_SCHEMA; try { PreparedStatement stmt = conn.prepareStatement(query); stmt.setString(1, tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 8771186b0..4406cd608 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -220,8 +220,12 @@ public enum SnowflakeErrors { ERROR_2018( "2018", "Failed to alter RECORD_METADATA column type for iceberg", - "Failed to alter RECORD_METADATA column type to required format for iceberg, please check" - + " column exists."), + "Failed to alter RECORD_METADATA column type to required format for iceberg."), + ERROR_2019( + "2019", + "Failed to add RECORD_METADATA column for iceberg", + "Failed to add RECORD_METADATA column with required format for iceberg."), + // Snowpipe related issues 3--- ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"), diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index d0f1a0762..f783aa879 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -274,7 +274,8 @@ public void startPartitions( private void perTopicActionsOnStartPartitions(String topic, Map topic2Table) { String tableName = Utils.tableName(topic, topic2Table); if (Utils.isIcebergEnabled(connectorConfig)) { - icebergTableSchemaValidator.validateTable(tableName, Utils.role(connectorConfig)); + icebergTableSchemaValidator.validateTable( + tableName, Utils.role(connectorConfig), enableSchematization); icebergInitService.initializeIcebergTableProperties(tableName); } else { createTableIfNotExists(tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java new file mode 100644 index 000000000..5b08b71f0 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java @@ -0,0 +1,49 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +public class IcebergDDLTypes { + public static String INTEGER = "INTEGER"; + public static String STRING = "STRING"; + public static String BIGINT = "BIGINT"; + public static String MAP = "MAP"; + public static String OBJECT = "OBJECT"; + public static String VARCHAR = "VARCHAR"; + + public static String ICEBERG_METADATA_OBJECT_SCHEMA = + OBJECT + + "(" + + "offset " + + INTEGER + + "," + + "topic " + + STRING + + "," + + "partition " + + INTEGER + + "," + + "key " + + STRING + + "," + + "schema_id " + + INTEGER + + "," + + "key_schema_id " + + INTEGER + + "," + + "CreateTime " + + BIGINT + + "," + + "LogAppendTime " + + BIGINT + + "," + + "SnowflakeConnectorPushTime " + + BIGINT + + "," + + "headers " + + MAP + + "(" + + VARCHAR + + "," + + VARCHAR + + ")" + + ")"; +} diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java index e86606b26..67742b0af 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java @@ -15,6 +15,7 @@ public IcebergInitService(SnowflakeConnectionService snowflakeConnectionService) public void initializeIcebergTableProperties(String tableName) { LOGGER.info("Initializing properties for Iceberg table: {}", tableName); + snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName); snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName); } } diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index e2f73a206..7a6fb9f5e 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -7,6 +7,7 @@ import com.snowflake.kafka.connector.internal.SnowflakeErrors; import java.util.List; import java.util.Objects; +import java.util.Optional; /** Performs validations of Iceberg table schema on the connector startup. */ public class IcebergTableSchemaValidator { @@ -24,29 +25,51 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio * * @param tableName table to be validated * @param role role used for validation + * @param schemaEvolutionEnabled whether schema evolution is enabled */ - public void validateTable(String tableName, String role) { + public void validateTable(String tableName, String role, boolean schemaEvolutionEnabled) { List columns = snowflakeConnectionService .describeTable(tableName) .orElseThrow(() -> SnowflakeErrors.ERROR_0032.getException("table_not_found")); - DescribeTableRow metadata = + Optional metadata = columns.stream() .filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA)) - .findFirst() - .orElseThrow( - () -> SnowflakeErrors.ERROR_0032.getException("record_metadata_not_found")); + .findFirst(); - if (!isOfStructuredObjectType(metadata)) { - throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + if (schemaEvolutionEnabled) { + validateSchemaEvolutionScenario(tableName, role, metadata); + } else { + validateNoSchemaEvolutionScenario(metadata); } + } + + private void validateSchemaEvolutionScenario( + String tableName, String role, Optional metadata) { + metadata.ifPresent( + m -> { + // if metadata column exists it must be of type OBJECT(), if not exists we create on our + // own this column + if (!isOfStructuredObjectType(m)) { + throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + } + }); if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled"); } } + private static void validateNoSchemaEvolutionScenario(Optional metadata) { + if (!metadata.isPresent()) { + throw SnowflakeErrors.ERROR_0032.getException("metadata_column_not_found"); + } + if (!isOfStructuredObjectType(metadata.get())) { + throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + } + } + private static boolean isOfStructuredObjectType(DescribeTableRow metadata) { return metadata.getType().startsWith(SF_STRUCTURED_OBJECT); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java index ce274a520..7f70f20c8 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java @@ -64,12 +64,26 @@ void shouldThrowExceptionWhenTableDoesNotExist() { } @Test - void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + void shouldCreateMetadataWhenColumnNotExists() { // given createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); - // expect - assertThatThrownBy(() -> icebergInitService.initializeIcebergTableProperties(tableName)) - .isInstanceOf(SnowflakeKafkaConnectorException.class); + // when + icebergInitService.initializeIcebergTableProperties(tableName); + + // then + assertThat(describeRecordMetadataType(tableName)) + .isEqualTo( + "OBJECT(offset NUMBER(10,0), " + + "topic VARCHAR(16777216), " + + "partition NUMBER(10,0), " + + "key VARCHAR(16777216), " + + "schema_id NUMBER(10,0), " + + "key_schema_id NUMBER(10,0), " + + "CreateTime NUMBER(19,0), " + + "LogAppendTime NUMBER(19,0), " + + "SnowflakeConnectorPushTime NUMBER(19,0), " + + "headers MAP(VARCHAR(16777216), " + + "VARCHAR(16777216)))"); } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java index b8d7cb576..3fa035421 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java @@ -15,6 +15,8 @@ class IcebergInitServiceTest { @Test void testInitializeIcebergTableProperties() { icebergInitService.initializeIcebergTableProperties("test_table"); + + verify(mockConnection).addMetadataColumnForIcebergIfNotExists("test_table"); verify(mockConnection).initializeMetadataColumnTypeForIceberg("test_table"); verifyNoMoreInteractions(mockConnection); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index 9c4250740..c4ac3f05d 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { @@ -33,44 +34,93 @@ public void tearDown() { dropIcebergTable(tableName); } - @Test - public void shouldValidateExpectedIcebergTableSchema() { - // given - createIcebergTable(tableName); - enableSchemaEvolution(tableName); + @Nested + class SchemaEvolutionEnabled { + public static final boolean SCHEMA_EVOLUTION = true; - // when, then - schemaValidator.validateTable(tableName, TEST_ROLE); - } + @Test + public void shouldValidateExpectedIcebergTableSchema() { + // given + createIcebergTable(tableName); + enableSchemaEvolution(tableName); - @Test - public void shouldThrowExceptionWhenTableDoesNotExist() { - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); - } + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } - @Test - public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { - // given - createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); - enableSchemaEvolution(tableName); + @Test + public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); + enableSchemaEvolution(tableName); - // expect - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + // expect + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // given + createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); + enableSchemaEvolution(tableName); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } } - @Test - public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { - // given - createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); - enableSchemaEvolution(tableName); + @Nested + class SchemaEvolutionNotEnabled { + public static final boolean SCHEMA_EVOLUTION = false; + + @Test + public void shouldValidateExpectedIcebergTableSchema() { + // given + createIcebergTable(tableName); + enableSchemaEvolution(tableName); + + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); + enableSchemaEvolution(tableName); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // given + createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); + enableSchemaEvolution(tableName); - // expect - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } } } From 4540ad0309a79256d3431b12c1ae1f61ff2021df Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Fri, 4 Oct 2024 17:26:12 +0200 Subject: [PATCH 2/5] Add validations for record content --- .../config/IcebergConfigValidator.java | 10 ------ .../iceberg/IcebergTableSchemaValidator.java | 32 +++++++++++-------- .../config/IcebergConfigValidationTest.java | 14 ++++---- .../streaming/iceberg/BaseIcebergIT.java | 5 +++ .../IcebergTableSchemaValidatorIT.java | 26 +++++++++++++-- 5 files changed, 54 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java index ecc8c8ff8..670ec7508 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java @@ -4,7 +4,6 @@ import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING; import com.google.common.collect.ImmutableMap; -import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; import java.util.HashMap; @@ -12,9 +11,6 @@ /** Validates dependencies between parameters in Iceberg mode. */ public class IcebergConfigValidator implements StreamingConfigValidator { - - private static final String NO_SCHEMATIZATION_ERROR_MESSAGE = - "Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true"; private static final String INCOMPATIBLE_INGESTION_METHOD = "Ingestion to Iceberg table is supported only for Snowpipe Streaming"; @@ -28,15 +24,9 @@ public ImmutableMap validate(Map inputConfig) { Map validationErrors = new HashMap<>(); - boolean isSchematizationEnabled = - Boolean.parseBoolean( - inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); IngestionMethodConfig ingestionMethod = IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase()); - if (!isSchematizationEnabled) { - validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE); - } if (ingestionMethod != SNOWPIPE_STREAMING) { validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD); } diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index 7a6fb9f5e..2148dd8de 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; import com.snowflake.kafka.connector.internal.DescribeTableRow; @@ -38,15 +39,6 @@ public void validateTable(String tableName, String role, boolean schemaEvolution .filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA)) .findFirst(); - if (schemaEvolutionEnabled) { - validateSchemaEvolutionScenario(tableName, role, metadata); - } else { - validateNoSchemaEvolutionScenario(metadata); - } - } - - private void validateSchemaEvolutionScenario( - String tableName, String role, Optional metadata) { metadata.ifPresent( m -> { // if metadata column exists it must be of type OBJECT(), if not exists we create on our @@ -56,17 +48,29 @@ private void validateSchemaEvolutionScenario( } }); + if (schemaEvolutionEnabled) { + validateSchemaEvolutionScenario(tableName, role); + } else { + validateNoSchemaEvolutionScenario(columns); + } + } + + private void validateSchemaEvolutionScenario(String tableName, String role) { if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled"); } } - private static void validateNoSchemaEvolutionScenario(Optional metadata) { - if (!metadata.isPresent()) { - throw SnowflakeErrors.ERROR_0032.getException("metadata_column_not_found"); + private static void validateNoSchemaEvolutionScenario(List columns) { + Optional recordContent = + columns.stream() + .filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_CONTENT)) + .findFirst(); + if (!recordContent.isPresent()) { + throw SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found"); } - if (!isOfStructuredObjectType(metadata.get())) { - throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + if (!isOfStructuredObjectType(recordContent.get())) { + throw SnowflakeErrors.ERROR_0032.getException("invalid_record_content_type"); } } diff --git a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java index 421555428..147dfee3a 100644 --- a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java +++ b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java @@ -40,7 +40,12 @@ public void shouldReturnErrorOnInvalidConfig(Map config, String public static Stream validConfigs() { return Stream.of( Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()), - Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build())); + Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()), + Arguments.of( + SnowflakeSinkConnectorConfigBuilder.icebergConfig() + .withSchematizationEnabled(false) + .build(), + ENABLE_SCHEMATIZATION_CONFIG)); } public static Stream invalidConfigs() { @@ -49,11 +54,6 @@ public static Stream invalidConfigs() { SnowflakeSinkConnectorConfigBuilder.icebergConfig() .withIngestionMethod(IngestionMethodConfig.SNOWPIPE) .build(), - INGESTION_METHOD_OPT), - Arguments.of( - SnowflakeSinkConnectorConfigBuilder.icebergConfig() - .withSchematizationEnabled(false) - .build(), - ENABLE_SCHEMATIZATION_CONFIG)); + INGESTION_METHOD_OPT)); } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java index 00d15edfb..7570ea973 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/BaseIcebergIT.java @@ -27,6 +27,11 @@ protected static void createIcebergTable(String tableName) { createIcebergTableWithColumnClause(tableName, "record_metadata object()"); } + protected static void createIcebergTableNoSchemaEvolution(String tableName) { + createIcebergTableWithColumnClause( + tableName, "record_metadata object(), record_content object()"); + } + protected static void createIcebergTableWithColumnClause(String tableName, String columnClause) { String query = "create or replace iceberg table identifier(?) (" diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index c4ac3f05d..05af8350e 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -85,7 +85,7 @@ class SchemaEvolutionNotEnabled { @Test public void shouldValidateExpectedIcebergTableSchema() { // given - createIcebergTable(tableName); + createIcebergTableNoSchemaEvolution(tableName); enableSchemaEvolution(tableName); // when, then @@ -100,7 +100,7 @@ public void shouldThrowExceptionWhenTableDoesNotExist() { } @Test - public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + public void shouldThrowExceptionWhenRecordContentDoesNotExist() { // given createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); enableSchemaEvolution(tableName); @@ -111,6 +111,28 @@ public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); } + @Test + public void shouldThrowExceptionWhenRecordContentHasInvalidType() { + // given + createIcebergTableWithColumnClause(tableName, "record_content MAP(VARCHAR, VARCHAR)"); + enableSchemaEvolution(tableName); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "record_content object()"); + enableSchemaEvolution(tableName); + + // expect + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + @Test public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { // given From f55e648754d77ab06b5c0040ea667b82be735031 Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Mon, 7 Oct 2024 15:46:29 +0200 Subject: [PATCH 3/5] Reformat --- .../streaming/iceberg/IcebergDDLTypes.java | 53 ++++--------------- 1 file changed, 11 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java index 5b08b71f0..d7b4dfa4e 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java @@ -1,49 +1,18 @@ package com.snowflake.kafka.connector.streaming.iceberg; public class IcebergDDLTypes { - public static String INTEGER = "INTEGER"; - public static String STRING = "STRING"; - public static String BIGINT = "BIGINT"; - public static String MAP = "MAP"; - public static String OBJECT = "OBJECT"; - public static String VARCHAR = "VARCHAR"; public static String ICEBERG_METADATA_OBJECT_SCHEMA = - OBJECT - + "(" - + "offset " - + INTEGER - + "," - + "topic " - + STRING - + "," - + "partition " - + INTEGER - + "," - + "key " - + STRING - + "," - + "schema_id " - + INTEGER - + "," - + "key_schema_id " - + INTEGER - + "," - + "CreateTime " - + BIGINT - + "," - + "LogAppendTime " - + BIGINT - + "," - + "SnowflakeConnectorPushTime " - + BIGINT - + "," - + "headers " - + MAP - + "(" - + VARCHAR - + "," - + VARCHAR - + ")" + "OBJECT(" + + "offset INTEGER," + + "topic STRING," + + "partition INTEGER," + + "key STRING," + + "schema_id INTEGER," + + "key_schema_id INTEGER," + + "CreateTime BIGINT," + + "LogAppendTime BIGINT," + + "SnowflakeConnectorPushTime BIGINT," + + "headers MAP(VARCHAR, VARCHAR)" + ")"; } From 7b1452b58ef2e8ef4774afb662c25c07da696dd5 Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Mon, 7 Oct 2024 16:12:14 +0200 Subject: [PATCH 4/5] CR changes --- .../config/IcebergConfigValidationTest.java | 4 +-- .../IcebergTableSchemaValidatorIT.java | 31 ++++++------------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java index 147dfee3a..6bcd570d0 100644 --- a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java +++ b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java @@ -1,6 +1,5 @@ package com.snowflake.kafka.connector.config; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; import com.google.common.collect.ImmutableMap; @@ -44,8 +43,7 @@ public static Stream validConfigs() { Arguments.of( SnowflakeSinkConnectorConfigBuilder.icebergConfig() .withSchematizationEnabled(false) - .build(), - ENABLE_SCHEMATIZATION_CONFIG)); + .build())); } public static Stream invalidConfigs() { diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index 05af8350e..2f659e5e0 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -8,6 +8,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { @@ -48,13 +50,6 @@ public void shouldValidateExpectedIcebergTableSchema() { schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); } - @Test - public void shouldThrowExceptionWhenTableDoesNotExist() { - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); - } - @Test public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() { // given @@ -86,7 +81,6 @@ class SchemaEvolutionNotEnabled { public void shouldValidateExpectedIcebergTableSchema() { // given createIcebergTableNoSchemaEvolution(tableName); - enableSchemaEvolution(tableName); // when, then schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); @@ -103,7 +97,6 @@ public void shouldThrowExceptionWhenTableDoesNotExist() { public void shouldThrowExceptionWhenRecordContentDoesNotExist() { // given createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); - enableSchemaEvolution(tableName); // expect Assertions.assertThrows( @@ -115,7 +108,6 @@ public void shouldThrowExceptionWhenRecordContentDoesNotExist() { public void shouldThrowExceptionWhenRecordContentHasInvalidType() { // given createIcebergTableWithColumnClause(tableName, "record_content MAP(VARCHAR, VARCHAR)"); - enableSchemaEvolution(tableName); // expect Assertions.assertThrows( @@ -127,22 +119,17 @@ public void shouldThrowExceptionWhenRecordContentHasInvalidType() { public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() { // given createIcebergTableWithColumnClause(tableName, "record_content object()"); - enableSchemaEvolution(tableName); // expect schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); } + } - @Test - public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { - // given - createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); - enableSchemaEvolution(tableName); - - // expect - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); - } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldThrowExceptionWhenTableDoesNotExist(boolean schemaEvolution) { + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, schemaEvolution)); } } From da3bc910706dc24098f80ebc538b479d723c125d Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Mon, 7 Oct 2024 16:15:49 +0200 Subject: [PATCH 5/5] CR changes2 --- .../iceberg/IcebergTableSchemaValidator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index 2148dd8de..2cfa3db98 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -62,14 +62,14 @@ private void validateSchemaEvolutionScenario(String tableName, String role) { } private static void validateNoSchemaEvolutionScenario(List columns) { - Optional recordContent = + DescribeTableRow recordContent = columns.stream() .filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_CONTENT)) - .findFirst(); - if (!recordContent.isPresent()) { - throw SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found"); - } - if (!isOfStructuredObjectType(recordContent.get())) { + .findFirst() + .orElseThrow( + () -> SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found")); + + if (!isOfStructuredObjectType(recordContent)) { throw SnowflakeErrors.ERROR_0032.getException("invalid_record_content_type"); } }