From 0112d6a8af82f792bc99451f91047eab8f9f5252 Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Fri, 4 Oct 2024 16:50:17 +0200 Subject: [PATCH] Create metadata column if not exists --- .../internal/SnowflakeConnectionService.java | 7 ++ .../SnowflakeConnectionServiceV1.java | 37 ++++-- .../connector/internal/SnowflakeErrors.java | 8 +- .../streaming/SnowflakeSinkServiceV2.java | 3 +- .../streaming/iceberg/IcebergDDLTypes.java | 49 ++++++++ .../streaming/iceberg/IcebergInitService.java | 1 + .../iceberg/IcebergTableSchemaValidator.java | 37 ++++-- .../iceberg/IcebergInitServiceIT.java | 22 +++- .../iceberg/IcebergInitServiceTest.java | 2 + .../IcebergTableSchemaValidatorIT.java | 114 +++++++++++++----- 10 files changed, 222 insertions(+), 58 deletions(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index 852a8d84a..b9f81d94e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -326,6 +326,13 @@ ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( */ void initializeMetadataColumnTypeForIceberg(String tableName); + /** + * Add the RECORD_METADATA column to the iceberg table if it does not exist. + * + * @param tableName iceberg table name + */ + void addMetadataColumnForIcebergIfNotExists(String tableName); + /** * Calls describe table statement and returns all columns and corresponding types. * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 80f5ddab2..d876afc3d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -2,6 +2,7 @@ import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; +import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -164,23 +165,35 @@ public void createTableWithOnlyMetadataColumn(final String tableName) { LOGGER.info("Created table {} with only RECORD_METADATA column", tableName); } + @Override + public void addMetadataColumnForIcebergIfNotExists(String tableName) { + checkConnection(); + InternalUtils.assertNotEmpty("tableName", tableName); + String query = + "ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA " + + ICEBERG_METADATA_OBJECT_SCHEMA; + try { + PreparedStatement stmt = conn.prepareStatement(query); + stmt.setString(1, tableName); + stmt.execute(); + stmt.close(); + } catch (SQLException e) { + LOGGER.error( + "Couldn't alter table {} add RECORD_METADATA column to align with iceberg format", + tableName); + throw SnowflakeErrors.ERROR_2019.getException(e); + } + LOGGER.info( + "alter table {} add RECORD_METADATA column to align with iceberg format", tableName); + } + @Override public void initializeMetadataColumnTypeForIceberg(String tableName) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); String query = - "ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE OBJECT(" - + "offset INTEGER," - + "topic STRING," - + "partition INTEGER," - + "key STRING," - + "schema_id INTEGER," - + "key_schema_id INTEGER," - + "CreateTime BIGINT," - + "LogAppendTime BIGINT," - + "SnowflakeConnectorPushTime BIGINT," - + "headers MAP(VARCHAR, VARCHAR)" - + ")"; + "ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE " + + ICEBERG_METADATA_OBJECT_SCHEMA; try { PreparedStatement stmt = conn.prepareStatement(query); stmt.setString(1, tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 8771186b0..4406cd608 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -220,8 +220,12 @@ public enum SnowflakeErrors { ERROR_2018( "2018", "Failed to alter RECORD_METADATA column type for iceberg", - "Failed to alter RECORD_METADATA column type to required format for iceberg, please check" - + " column exists."), + "Failed to alter RECORD_METADATA column type to required format for iceberg."), + ERROR_2019( + "2019", + "Failed to add RECORD_METADATA column for iceberg", + "Failed to add RECORD_METADATA column with required format for iceberg."), + // Snowpipe related issues 3--- ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"), diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index d0f1a0762..f783aa879 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -274,7 +274,8 @@ public void startPartitions( private void perTopicActionsOnStartPartitions(String topic, Map topic2Table) { String tableName = Utils.tableName(topic, topic2Table); if (Utils.isIcebergEnabled(connectorConfig)) { - icebergTableSchemaValidator.validateTable(tableName, Utils.role(connectorConfig)); + icebergTableSchemaValidator.validateTable( + tableName, Utils.role(connectorConfig), enableSchematization); icebergInitService.initializeIcebergTableProperties(tableName); } else { createTableIfNotExists(tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java new file mode 100644 index 000000000..5b08b71f0 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergDDLTypes.java @@ -0,0 +1,49 @@ +package com.snowflake.kafka.connector.streaming.iceberg; + +public class IcebergDDLTypes { + public static String INTEGER = "INTEGER"; + public static String STRING = "STRING"; + public static String BIGINT = "BIGINT"; + public static String MAP = "MAP"; + public static String OBJECT = "OBJECT"; + public static String VARCHAR = "VARCHAR"; + + public static String ICEBERG_METADATA_OBJECT_SCHEMA = + OBJECT + + "(" + + "offset " + + INTEGER + + "," + + "topic " + + STRING + + "," + + "partition " + + INTEGER + + "," + + "key " + + STRING + + "," + + "schema_id " + + INTEGER + + "," + + "key_schema_id " + + INTEGER + + "," + + "CreateTime " + + BIGINT + + "," + + "LogAppendTime " + + BIGINT + + "," + + "SnowflakeConnectorPushTime " + + BIGINT + + "," + + "headers " + + MAP + + "(" + + VARCHAR + + "," + + VARCHAR + + ")" + + ")"; +} diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java index e86606b26..67742b0af 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitService.java @@ -15,6 +15,7 @@ public IcebergInitService(SnowflakeConnectionService snowflakeConnectionService) public void initializeIcebergTableProperties(String tableName) { LOGGER.info("Initializing properties for Iceberg table: {}", tableName); + snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName); snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName); } } diff --git a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java index e2f73a206..7a6fb9f5e 100644 --- a/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidator.java @@ -7,6 +7,7 @@ import com.snowflake.kafka.connector.internal.SnowflakeErrors; import java.util.List; import java.util.Objects; +import java.util.Optional; /** Performs validations of Iceberg table schema on the connector startup. */ public class IcebergTableSchemaValidator { @@ -24,29 +25,51 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio * * @param tableName table to be validated * @param role role used for validation + * @param schemaEvolutionEnabled whether schema evolution is enabled */ - public void validateTable(String tableName, String role) { + public void validateTable(String tableName, String role, boolean schemaEvolutionEnabled) { List columns = snowflakeConnectionService .describeTable(tableName) .orElseThrow(() -> SnowflakeErrors.ERROR_0032.getException("table_not_found")); - DescribeTableRow metadata = + Optional metadata = columns.stream() .filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA)) - .findFirst() - .orElseThrow( - () -> SnowflakeErrors.ERROR_0032.getException("record_metadata_not_found")); + .findFirst(); - if (!isOfStructuredObjectType(metadata)) { - throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + if (schemaEvolutionEnabled) { + validateSchemaEvolutionScenario(tableName, role, metadata); + } else { + validateNoSchemaEvolutionScenario(metadata); } + } + + private void validateSchemaEvolutionScenario( + String tableName, String role, Optional metadata) { + metadata.ifPresent( + m -> { + // if metadata column exists it must be of type OBJECT(), if not exists we create on our + // own this column + if (!isOfStructuredObjectType(m)) { + throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + } + }); if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) { throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled"); } } + private static void validateNoSchemaEvolutionScenario(Optional metadata) { + if (!metadata.isPresent()) { + throw SnowflakeErrors.ERROR_0032.getException("metadata_column_not_found"); + } + if (!isOfStructuredObjectType(metadata.get())) { + throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type"); + } + } + private static boolean isOfStructuredObjectType(DescribeTableRow metadata) { return metadata.getType().startsWith(SF_STRUCTURED_OBJECT); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java index ce274a520..7f70f20c8 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java @@ -64,12 +64,26 @@ void shouldThrowExceptionWhenTableDoesNotExist() { } @Test - void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + void shouldCreateMetadataWhenColumnNotExists() { // given createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); - // expect - assertThatThrownBy(() -> icebergInitService.initializeIcebergTableProperties(tableName)) - .isInstanceOf(SnowflakeKafkaConnectorException.class); + // when + icebergInitService.initializeIcebergTableProperties(tableName); + + // then + assertThat(describeRecordMetadataType(tableName)) + .isEqualTo( + "OBJECT(offset NUMBER(10,0), " + + "topic VARCHAR(16777216), " + + "partition NUMBER(10,0), " + + "key VARCHAR(16777216), " + + "schema_id NUMBER(10,0), " + + "key_schema_id NUMBER(10,0), " + + "CreateTime NUMBER(19,0), " + + "LogAppendTime NUMBER(19,0), " + + "SnowflakeConnectorPushTime NUMBER(19,0), " + + "headers MAP(VARCHAR(16777216), " + + "VARCHAR(16777216)))"); } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java index b8d7cb576..3fa035421 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceTest.java @@ -15,6 +15,8 @@ class IcebergInitServiceTest { @Test void testInitializeIcebergTableProperties() { icebergInitService.initializeIcebergTableProperties("test_table"); + + verify(mockConnection).addMetadataColumnForIcebergIfNotExists("test_table"); verify(mockConnection).initializeMetadataColumnTypeForIceberg("test_table"); verifyNoMoreInteractions(mockConnection); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java index 9c4250740..c4ac3f05d 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergTableSchemaValidatorIT.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; public class IcebergTableSchemaValidatorIT extends BaseIcebergIT { @@ -33,44 +34,93 @@ public void tearDown() { dropIcebergTable(tableName); } - @Test - public void shouldValidateExpectedIcebergTableSchema() { - // given - createIcebergTable(tableName); - enableSchemaEvolution(tableName); + @Nested + class SchemaEvolutionEnabled { + public static final boolean SCHEMA_EVOLUTION = true; - // when, then - schemaValidator.validateTable(tableName, TEST_ROLE); - } + @Test + public void shouldValidateExpectedIcebergTableSchema() { + // given + createIcebergTable(tableName); + enableSchemaEvolution(tableName); - @Test - public void shouldThrowExceptionWhenTableDoesNotExist() { - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); - } + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } - @Test - public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { - // given - createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); - enableSchemaEvolution(tableName); + @Test + public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); + enableSchemaEvolution(tableName); - // expect - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + // expect + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // given + createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); + enableSchemaEvolution(tableName); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } } - @Test - public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { - // given - createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); - enableSchemaEvolution(tableName); + @Nested + class SchemaEvolutionNotEnabled { + public static final boolean SCHEMA_EVOLUTION = false; + + @Test + public void shouldValidateExpectedIcebergTableSchema() { + // given + createIcebergTable(tableName); + enableSchemaEvolution(tableName); + + // when, then + schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION); + } + + @Test + public void shouldThrowExceptionWhenTableDoesNotExist() { + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() { + // given + createIcebergTableWithColumnClause(tableName, "some_column VARCHAR"); + enableSchemaEvolution(tableName); + + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } + + @Test + public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() { + // given + createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)"); + enableSchemaEvolution(tableName); - // expect - Assertions.assertThrows( - SnowflakeKafkaConnectorException.class, - () -> schemaValidator.validateTable(tableName, TEST_ROLE)); + // expect + Assertions.assertThrows( + SnowflakeKafkaConnectorException.class, + () -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION)); + } } }