Skip to content

Commit

Permalink
Create metadata column if not exists
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 4, 2024
1 parent 3bf31ff commit 0112d6a
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
*/
void initializeMetadataColumnTypeForIceberg(String tableName);

/**
* Add the RECORD_METADATA column to the iceberg table if it does not exist.
*
* @param tableName iceberg table name
*/
void addMetadataColumnForIcebergIfNotExists(String tableName);

/**
* Calls describe table statement and returns all columns and corresponding types.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;
import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -164,23 +165,35 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
LOGGER.info("Created table {} with only RECORD_METADATA column", tableName);
}

@Override
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
LOGGER.error(
"Couldn't alter table {} add RECORD_METADATA column to align with iceberg format",
tableName);
throw SnowflakeErrors.ERROR_2019.getException(e);
}
LOGGER.info(
"alter table {} add RECORD_METADATA column to align with iceberg format", tableName);
}

@Override
public void initializeMetadataColumnTypeForIceberg(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE OBJECT("
+ "offset INTEGER,"
+ "topic STRING,"
+ "partition INTEGER,"
+ "key STRING,"
+ "schema_id INTEGER,"
+ "key_schema_id INTEGER,"
+ "CreateTime BIGINT,"
+ "LogAppendTime BIGINT,"
+ "SnowflakeConnectorPushTime BIGINT,"
+ "headers MAP(VARCHAR, VARCHAR)"
+ ")";
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ public enum SnowflakeErrors {
ERROR_2018(
"2018",
"Failed to alter RECORD_METADATA column type for iceberg",
"Failed to alter RECORD_METADATA column type to required format for iceberg, please check"
+ " column exists."),
"Failed to alter RECORD_METADATA column type to required format for iceberg."),
ERROR_2019(
"2019",
"Failed to add RECORD_METADATA column for iceberg",
"Failed to add RECORD_METADATA column with required format for iceberg."),

// Snowpipe related issues 3---
ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ public void startPartitions(
private void perTopicActionsOnStartPartitions(String topic, Map<String, String> topic2Table) {
String tableName = Utils.tableName(topic, topic2Table);
if (Utils.isIcebergEnabled(connectorConfig)) {
icebergTableSchemaValidator.validateTable(tableName, Utils.role(connectorConfig));
icebergTableSchemaValidator.validateTable(
tableName, Utils.role(connectorConfig), enableSchematization);
icebergInitService.initializeIcebergTableProperties(tableName);
} else {
createTableIfNotExists(tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.snowflake.kafka.connector.streaming.iceberg;

public class IcebergDDLTypes {
public static String INTEGER = "INTEGER";
public static String STRING = "STRING";
public static String BIGINT = "BIGINT";
public static String MAP = "MAP";
public static String OBJECT = "OBJECT";
public static String VARCHAR = "VARCHAR";

public static String ICEBERG_METADATA_OBJECT_SCHEMA =
OBJECT
+ "("
+ "offset "
+ INTEGER
+ ","
+ "topic "
+ STRING
+ ","
+ "partition "
+ INTEGER
+ ","
+ "key "
+ STRING
+ ","
+ "schema_id "
+ INTEGER
+ ","
+ "key_schema_id "
+ INTEGER
+ ","
+ "CreateTime "
+ BIGINT
+ ","
+ "LogAppendTime "
+ BIGINT
+ ","
+ "SnowflakeConnectorPushTime "
+ BIGINT
+ ","
+ "headers "
+ MAP
+ "("
+ VARCHAR
+ ","
+ VARCHAR
+ ")"
+ ")";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public IcebergInitService(SnowflakeConnectionService snowflakeConnectionService)

public void initializeIcebergTableProperties(String tableName) {
LOGGER.info("Initializing properties for Iceberg table: {}", tableName);
snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName);
snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** Performs validations of Iceberg table schema on the connector startup. */
public class IcebergTableSchemaValidator {
Expand All @@ -24,29 +25,51 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio
*
* @param tableName table to be validated
* @param role role used for validation
* @param schemaEvolutionEnabled whether schema evolution is enabled
*/
public void validateTable(String tableName, String role) {
public void validateTable(String tableName, String role, boolean schemaEvolutionEnabled) {
List<DescribeTableRow> columns =
snowflakeConnectionService
.describeTable(tableName)
.orElseThrow(() -> SnowflakeErrors.ERROR_0032.getException("table_not_found"));

DescribeTableRow metadata =
Optional<DescribeTableRow> metadata =
columns.stream()
.filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA))
.findFirst()
.orElseThrow(
() -> SnowflakeErrors.ERROR_0032.getException("record_metadata_not_found"));
.findFirst();

if (!isOfStructuredObjectType(metadata)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
if (schemaEvolutionEnabled) {
validateSchemaEvolutionScenario(tableName, role, metadata);
} else {
validateNoSchemaEvolutionScenario(metadata);
}
}

private void validateSchemaEvolutionScenario(
String tableName, String role, Optional<DescribeTableRow> metadata) {
metadata.ifPresent(
m -> {
// if metadata column exists it must be of type OBJECT(), if not exists we create on our
// own this column
if (!isOfStructuredObjectType(m)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
}
});

if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) {
throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled");
}
}

private static void validateNoSchemaEvolutionScenario(Optional<DescribeTableRow> metadata) {
if (!metadata.isPresent()) {
throw SnowflakeErrors.ERROR_0032.getException("metadata_column_not_found");
}
if (!isOfStructuredObjectType(metadata.get())) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
}
}

private static boolean isOfStructuredObjectType(DescribeTableRow metadata) {
return metadata.getType().startsWith(SF_STRUCTURED_OBJECT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,26 @@ void shouldThrowExceptionWhenTableDoesNotExist() {
}

@Test
void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
void shouldCreateMetadataWhenColumnNotExists() {
// given
createIcebergTableWithColumnClause(tableName, "some_column VARCHAR");

// expect
assertThatThrownBy(() -> icebergInitService.initializeIcebergTableProperties(tableName))
.isInstanceOf(SnowflakeKafkaConnectorException.class);
// when
icebergInitService.initializeIcebergTableProperties(tableName);

// then
assertThat(describeRecordMetadataType(tableName))
.isEqualTo(
"OBJECT(offset NUMBER(10,0), "
+ "topic VARCHAR(16777216), "
+ "partition NUMBER(10,0), "
+ "key VARCHAR(16777216), "
+ "schema_id NUMBER(10,0), "
+ "key_schema_id NUMBER(10,0), "
+ "CreateTime NUMBER(19,0), "
+ "LogAppendTime NUMBER(19,0), "
+ "SnowflakeConnectorPushTime NUMBER(19,0), "
+ "headers MAP(VARCHAR(16777216), "
+ "VARCHAR(16777216)))");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class IcebergInitServiceTest {
@Test
void testInitializeIcebergTableProperties() {
icebergInitService.initializeIcebergTableProperties("test_table");

verify(mockConnection).addMetadataColumnForIcebergIfNotExists("test_table");
verify(mockConnection).initializeMetadataColumnTypeForIceberg("test_table");
verifyNoMoreInteractions(mockConnection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

public class IcebergTableSchemaValidatorIT extends BaseIcebergIT {
Expand Down Expand Up @@ -33,44 +34,93 @@ public void tearDown() {
dropIcebergTable(tableName);
}

@Test
public void shouldValidateExpectedIcebergTableSchema() {
// given
createIcebergTable(tableName);
enableSchemaEvolution(tableName);
@Nested
class SchemaEvolutionEnabled {
public static final boolean SCHEMA_EVOLUTION = true;

// when, then
schemaValidator.validateTable(tableName, TEST_ROLE);
}
@Test
public void shouldValidateExpectedIcebergTableSchema() {
// given
createIcebergTable(tableName);
enableSchemaEvolution(tableName);

@Test
public void shouldThrowExceptionWhenTableDoesNotExist() {
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE));
}
// when, then
schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION);
}

@Test
public void shouldThrowExceptionWhenTableDoesNotExist() {
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION));
}

@Test
public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
// given
createIcebergTableWithColumnClause(tableName, "some_column VARCHAR");
enableSchemaEvolution(tableName);
@Test
public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() {
// given
createIcebergTableWithColumnClause(tableName, "some_column VARCHAR");
enableSchemaEvolution(tableName);

// expect
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE));
// expect
schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION);
}

@Test
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {
// given
createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)");
enableSchemaEvolution(tableName);

// expect
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION));
}
}

@Test
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {
// given
createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)");
enableSchemaEvolution(tableName);
@Nested
class SchemaEvolutionNotEnabled {
public static final boolean SCHEMA_EVOLUTION = false;

@Test
public void shouldValidateExpectedIcebergTableSchema() {
// given
createIcebergTable(tableName);
enableSchemaEvolution(tableName);

// when, then
schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION);
}

@Test
public void shouldThrowExceptionWhenTableDoesNotExist() {
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION));
}

@Test
public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
// given
createIcebergTableWithColumnClause(tableName, "some_column VARCHAR");
enableSchemaEvolution(tableName);

// expect
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION));
}

@Test
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {
// given
createIcebergTableWithColumnClause(tableName, "record_metadata MAP(VARCHAR, VARCHAR)");
enableSchemaEvolution(tableName);

// expect
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE));
// expect
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION));
}
}
}

0 comments on commit 0112d6a

Please sign in to comment.