Skip to content

Commit

Permalink
SNOW-1680410 adjust validations and create metadata column if not exi…
Browse files Browse the repository at this point in the history
…sts (#946)
  • Loading branch information
sfc-gh-wtrefon authored Oct 7, 2024
1 parent 3bf31ff commit 225d56f
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
import java.util.HashMap;
import java.util.Map;

/** Validates dependencies between parameters in Iceberg mode. */
public class IcebergConfigValidator implements StreamingConfigValidator {

private static final String NO_SCHEMATIZATION_ERROR_MESSAGE =
"Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true";
private static final String INCOMPATIBLE_INGESTION_METHOD =
"Ingestion to Iceberg table is supported only for Snowpipe Streaming";

Expand All @@ -28,15 +24,9 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {

Map<String, String> validationErrors = new HashMap<>();

boolean isSchematizationEnabled =
Boolean.parseBoolean(
inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG));
IngestionMethodConfig ingestionMethod =
IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase());

if (!isSchematizationEnabled) {
validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE);
}
if (ingestionMethod != SNOWPIPE_STREAMING) {
validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
*/
void initializeMetadataColumnTypeForIceberg(String tableName);

/**
* Add the RECORD_METADATA column to the iceberg table if it does not exist.
*
* @param tableName iceberg table name
*/
void addMetadataColumnForIcebergIfNotExists(String tableName);

/**
* Calls describe table statement and returns all columns and corresponding types.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;
import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -164,23 +165,35 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
LOGGER.info("Created table {} with only RECORD_METADATA column", tableName);
}

@Override
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
LOGGER.error(
"Couldn't alter table {} add RECORD_METADATA column to align with iceberg format",
tableName);
throw SnowflakeErrors.ERROR_2019.getException(e);
}
LOGGER.info(
"alter table {} add RECORD_METADATA column to align with iceberg format", tableName);
}

@Override
public void initializeMetadataColumnTypeForIceberg(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE OBJECT("
+ "offset INTEGER,"
+ "topic STRING,"
+ "partition INTEGER,"
+ "key STRING,"
+ "schema_id INTEGER,"
+ "key_schema_id INTEGER,"
+ "CreateTime BIGINT,"
+ "LogAppendTime BIGINT,"
+ "SnowflakeConnectorPushTime BIGINT,"
+ "headers MAP(VARCHAR, VARCHAR)"
+ ")";
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ public enum SnowflakeErrors {
ERROR_2018(
"2018",
"Failed to alter RECORD_METADATA column type for iceberg",
"Failed to alter RECORD_METADATA column type to required format for iceberg, please check"
+ " column exists."),
"Failed to alter RECORD_METADATA column type to required format for iceberg."),
ERROR_2019(
"2019",
"Failed to add RECORD_METADATA column for iceberg",
"Failed to add RECORD_METADATA column with required format for iceberg."),

// Snowpipe related issues 3---
ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ public void startPartitions(
private void perTopicActionsOnStartPartitions(String topic, Map<String, String> topic2Table) {
String tableName = Utils.tableName(topic, topic2Table);
if (Utils.isIcebergEnabled(connectorConfig)) {
icebergTableSchemaValidator.validateTable(tableName, Utils.role(connectorConfig));
icebergTableSchemaValidator.validateTable(
tableName, Utils.role(connectorConfig), enableSchematization);
icebergInitService.initializeIcebergTableProperties(tableName);
} else {
createTableIfNotExists(tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.snowflake.kafka.connector.streaming.iceberg;

public class IcebergDDLTypes {

public static String ICEBERG_METADATA_OBJECT_SCHEMA =
"OBJECT("
+ "offset INTEGER,"
+ "topic STRING,"
+ "partition INTEGER,"
+ "key STRING,"
+ "schema_id INTEGER,"
+ "key_schema_id INTEGER,"
+ "CreateTime BIGINT,"
+ "LogAppendTime BIGINT,"
+ "SnowflakeConnectorPushTime BIGINT,"
+ "headers MAP(VARCHAR, VARCHAR)"
+ ")";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public IcebergInitService(SnowflakeConnectionService snowflakeConnectionService)

public void initializeIcebergTableProperties(String tableName) {
LOGGER.info("Initializing properties for Iceberg table: {}", tableName);
snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName);
snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;

import com.snowflake.kafka.connector.internal.DescribeTableRow;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** Performs validations of Iceberg table schema on the connector startup. */
public class IcebergTableSchemaValidator {
Expand All @@ -24,29 +26,54 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio
*
* @param tableName table to be validated
* @param role role used for validation
* @param schemaEvolutionEnabled whether schema evolution is enabled
*/
public void validateTable(String tableName, String role) {
public void validateTable(String tableName, String role, boolean schemaEvolutionEnabled) {
List<DescribeTableRow> columns =
snowflakeConnectionService
.describeTable(tableName)
.orElseThrow(() -> SnowflakeErrors.ERROR_0032.getException("table_not_found"));

DescribeTableRow metadata =
Optional<DescribeTableRow> metadata =
columns.stream()
.filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA))
.findFirst()
.orElseThrow(
() -> SnowflakeErrors.ERROR_0032.getException("record_metadata_not_found"));
.findFirst();

metadata.ifPresent(
m -> {
// if metadata column exists it must be of type OBJECT(), if not exists we create on our
// own this column
if (!isOfStructuredObjectType(m)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
}
});

if (!isOfStructuredObjectType(metadata)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
if (schemaEvolutionEnabled) {
validateSchemaEvolutionScenario(tableName, role);
} else {
validateNoSchemaEvolutionScenario(columns);
}
}

private void validateSchemaEvolutionScenario(String tableName, String role) {
if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) {
throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled");
}
}

private static void validateNoSchemaEvolutionScenario(List<DescribeTableRow> columns) {
DescribeTableRow recordContent =
columns.stream()
.filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_CONTENT))
.findFirst()
.orElseThrow(
() -> SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found"));

if (!isOfStructuredObjectType(recordContent)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_record_content_type");
}
}

private static boolean isOfStructuredObjectType(DescribeTableRow metadata) {
return metadata.getType().startsWith(SF_STRUCTURED_OBJECT);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -40,7 +39,11 @@ public void shouldReturnErrorOnInvalidConfig(Map<String, String> config, String
public static Stream<Arguments> validConfigs() {
return Stream.of(
Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()),
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()));
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSchematizationEnabled(false)
.build()));
}

public static Stream<Arguments> invalidConfigs() {
Expand All @@ -49,11 +52,6 @@ public static Stream<Arguments> invalidConfigs() {
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE)
.build(),
INGESTION_METHOD_OPT),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSchematizationEnabled(false)
.build(),
ENABLE_SCHEMATIZATION_CONFIG));
INGESTION_METHOD_OPT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ protected static void createIcebergTable(String tableName) {
createIcebergTableWithColumnClause(tableName, "record_metadata object()");
}

protected static void createIcebergTableNoSchemaEvolution(String tableName) {
createIcebergTableWithColumnClause(
tableName, "record_metadata object(), record_content object()");
}

protected static void createIcebergTableWithColumnClause(String tableName, String columnClause) {
String query =
"create or replace iceberg table identifier(?) ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,26 @@ void shouldThrowExceptionWhenTableDoesNotExist() {
}

@Test
void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
void shouldCreateMetadataWhenColumnNotExists() {
// given
createIcebergTableWithColumnClause(tableName, "some_column VARCHAR");

// expect
assertThatThrownBy(() -> icebergInitService.initializeIcebergTableProperties(tableName))
.isInstanceOf(SnowflakeKafkaConnectorException.class);
// when
icebergInitService.initializeIcebergTableProperties(tableName);

// then
assertThat(describeRecordMetadataType(tableName))
.isEqualTo(
"OBJECT(offset NUMBER(10,0), "
+ "topic VARCHAR(16777216), "
+ "partition NUMBER(10,0), "
+ "key VARCHAR(16777216), "
+ "schema_id NUMBER(10,0), "
+ "key_schema_id NUMBER(10,0), "
+ "CreateTime NUMBER(19,0), "
+ "LogAppendTime NUMBER(19,0), "
+ "SnowflakeConnectorPushTime NUMBER(19,0), "
+ "headers MAP(VARCHAR(16777216), "
+ "VARCHAR(16777216)))");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class IcebergInitServiceTest {
@Test
void testInitializeIcebergTableProperties() {
icebergInitService.initializeIcebergTableProperties("test_table");

verify(mockConnection).addMetadataColumnForIcebergIfNotExists("test_table");
verify(mockConnection).initializeMetadataColumnTypeForIceberg("test_table");
verifyNoMoreInteractions(mockConnection);
}
Expand Down
Loading

0 comments on commit 225d56f

Please sign in to comment.