Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1680410 adjust validations and create metadata column if not exists #946

Merged
merged 5 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
import java.util.HashMap;
import java.util.Map;

/** Validates dependencies between parameters in Iceberg mode. */
public class IcebergConfigValidator implements StreamingConfigValidator {

private static final String NO_SCHEMATIZATION_ERROR_MESSAGE =
"Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true";
private static final String INCOMPATIBLE_INGESTION_METHOD =
"Ingestion to Iceberg table is supported only for Snowpipe Streaming";

Expand All @@ -28,15 +24,9 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {

Map<String, String> validationErrors = new HashMap<>();

boolean isSchematizationEnabled =
Boolean.parseBoolean(
inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG));
IngestionMethodConfig ingestionMethod =
IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase());

if (!isSchematizationEnabled) {
validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE);
}
if (ingestionMethod != SNOWPIPE_STREAMING) {
validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
*/
void initializeMetadataColumnTypeForIceberg(String tableName);

/**
* Add the RECORD_METADATA column to the iceberg table if it does not exist.
*
* @param tableName iceberg table name
*/
void addMetadataColumnForIcebergIfNotExists(String tableName);

/**
* Calls describe table statement and returns all columns and corresponding types.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;
import static com.snowflake.kafka.connector.streaming.iceberg.IcebergDDLTypes.ICEBERG_METADATA_OBJECT_SCHEMA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -164,23 +165,35 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
LOGGER.info("Created table {} with only RECORD_METADATA column", tableName);
}

@Override
public void addMetadataColumnForIcebergIfNotExists(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ADD COLUMN IF NOT EXISTS RECORD_METADATA "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
stmt.execute();
stmt.close();
} catch (SQLException e) {
LOGGER.error(
"Couldn't alter table {} add RECORD_METADATA column to align with iceberg format",
tableName);
throw SnowflakeErrors.ERROR_2019.getException(e);
}
LOGGER.info(
"alter table {} add RECORD_METADATA column to align with iceberg format", tableName);
}

@Override
public void initializeMetadataColumnTypeForIceberg(String tableName) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String query =
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE OBJECT("
+ "offset INTEGER,"
+ "topic STRING,"
+ "partition INTEGER,"
+ "key STRING,"
+ "schema_id INTEGER,"
+ "key_schema_id INTEGER,"
+ "CreateTime BIGINT,"
+ "LogAppendTime BIGINT,"
+ "SnowflakeConnectorPushTime BIGINT,"
+ "headers MAP(VARCHAR, VARCHAR)"
+ ")";
"ALTER ICEBERG TABLE identifier(?) ALTER COLUMN RECORD_METADATA SET DATA TYPE "
+ ICEBERG_METADATA_OBJECT_SCHEMA;
try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ public enum SnowflakeErrors {
ERROR_2018(
"2018",
"Failed to alter RECORD_METADATA column type for iceberg",
"Failed to alter RECORD_METADATA column type to required format for iceberg, please check"
+ " column exists."),
"Failed to alter RECORD_METADATA column type to required format for iceberg."),
ERROR_2019(
"2019",
"Failed to add RECORD_METADATA column for iceberg",
"Failed to add RECORD_METADATA column with required format for iceberg."),

// Snowpipe related issues 3---
ERROR_3001("3001", "Failed to ingest file", "Exception reported by Ingest SDK"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ public void startPartitions(
private void perTopicActionsOnStartPartitions(String topic, Map<String, String> topic2Table) {
String tableName = Utils.tableName(topic, topic2Table);
if (Utils.isIcebergEnabled(connectorConfig)) {
icebergTableSchemaValidator.validateTable(tableName, Utils.role(connectorConfig));
icebergTableSchemaValidator.validateTable(
tableName, Utils.role(connectorConfig), enableSchematization);
icebergInitService.initializeIcebergTableProperties(tableName);
} else {
createTableIfNotExists(tableName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.snowflake.kafka.connector.streaming.iceberg;

public class IcebergDDLTypes {
public static String INTEGER = "INTEGER";
sfc-gh-wtrefon marked this conversation as resolved.
Show resolved Hide resolved
public static String STRING = "STRING";
public static String BIGINT = "BIGINT";
public static String MAP = "MAP";
public static String OBJECT = "OBJECT";
public static String VARCHAR = "VARCHAR";

public static String ICEBERG_METADATA_OBJECT_SCHEMA =
sfc-gh-wtrefon marked this conversation as resolved.
Show resolved Hide resolved
OBJECT
+ "("
+ "offset "
+ INTEGER
+ ","
+ "topic "
+ STRING
+ ","
+ "partition "
+ INTEGER
+ ","
+ "key "
+ STRING
+ ","
+ "schema_id "
+ INTEGER
+ ","
+ "key_schema_id "
+ INTEGER
+ ","
+ "CreateTime "
+ BIGINT
+ ","
+ "LogAppendTime "
+ BIGINT
+ ","
+ "SnowflakeConnectorPushTime "
+ BIGINT
+ ","
+ "headers "
+ MAP
+ "("
+ VARCHAR
+ ","
+ VARCHAR
+ ")"
+ ")";
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public IcebergInitService(SnowflakeConnectionService snowflakeConnectionService)

public void initializeIcebergTableProperties(String tableName) {
LOGGER.info("Initializing properties for Iceberg table: {}", tableName);
snowflakeConnectionService.addMetadataColumnForIcebergIfNotExists(tableName);
snowflakeConnectionService.initializeMetadataColumnTypeForIceberg(tableName);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;

import com.snowflake.kafka.connector.internal.DescribeTableRow;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** Performs validations of Iceberg table schema on the connector startup. */
public class IcebergTableSchemaValidator {
Expand All @@ -24,29 +26,54 @@ public IcebergTableSchemaValidator(SnowflakeConnectionService snowflakeConnectio
*
* @param tableName table to be validated
* @param role role used for validation
* @param schemaEvolutionEnabled whether schema evolution is enabled
*/
public void validateTable(String tableName, String role) {
public void validateTable(String tableName, String role, boolean schemaEvolutionEnabled) {
List<DescribeTableRow> columns =
snowflakeConnectionService
.describeTable(tableName)
.orElseThrow(() -> SnowflakeErrors.ERROR_0032.getException("table_not_found"));

DescribeTableRow metadata =
Optional<DescribeTableRow> metadata =
columns.stream()
.filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA))
.findFirst()
.orElseThrow(
() -> SnowflakeErrors.ERROR_0032.getException("record_metadata_not_found"));
.findFirst();

if (!isOfStructuredObjectType(metadata)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
metadata.ifPresent(
m -> {
// if metadata column exists it must be of type OBJECT(), if not exists we create on our
// own this column
if (!isOfStructuredObjectType(m)) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
}
});

if (schemaEvolutionEnabled) {
validateSchemaEvolutionScenario(tableName, role);
} else {
validateNoSchemaEvolutionScenario(columns);
}
}

private void validateSchemaEvolutionScenario(String tableName, String role) {
if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) {
throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled");
}
}

private static void validateNoSchemaEvolutionScenario(List<DescribeTableRow> columns) {
Optional<DescribeTableRow> recordContent =
columns.stream()
.filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_CONTENT))
.findFirst();
if (!recordContent.isPresent()) {
sfc-gh-wtrefon marked this conversation as resolved.
Show resolved Hide resolved
throw SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found");
}
if (!isOfStructuredObjectType(recordContent.get())) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_record_content_type");
}
}

private static boolean isOfStructuredObjectType(DescribeTableRow metadata) {
return metadata.getType().startsWith(SF_STRUCTURED_OBJECT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ public void shouldReturnErrorOnInvalidConfig(Map<String, String> config, String
public static Stream<Arguments> validConfigs() {
return Stream.of(
Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()),
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()));
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSchematizationEnabled(false)
.build(),
ENABLE_SCHEMATIZATION_CONFIG));
sfc-gh-wtrefon marked this conversation as resolved.
Show resolved Hide resolved
}

public static Stream<Arguments> invalidConfigs() {
Expand All @@ -49,11 +54,6 @@ public static Stream<Arguments> invalidConfigs() {
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE)
.build(),
INGESTION_METHOD_OPT),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSchematizationEnabled(false)
.build(),
ENABLE_SCHEMATIZATION_CONFIG));
INGESTION_METHOD_OPT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ protected static void createIcebergTable(String tableName) {
createIcebergTableWithColumnClause(tableName, "record_metadata object()");
}

protected static void createIcebergTableNoSchemaEvolution(String tableName) {
createIcebergTableWithColumnClause(
tableName, "record_metadata object(), record_content object()");
}

protected static void createIcebergTableWithColumnClause(String tableName, String columnClause) {
String query =
"create or replace iceberg table identifier(?) ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,26 @@ void shouldThrowExceptionWhenTableDoesNotExist() {
}

@Test
void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
void shouldCreateMetadataWhenColumnNotExists() {
// given
createIcebergTableWithColumnClause(tableName, "some_column VARCHAR");

// expect
assertThatThrownBy(() -> icebergInitService.initializeIcebergTableProperties(tableName))
.isInstanceOf(SnowflakeKafkaConnectorException.class);
// when
icebergInitService.initializeIcebergTableProperties(tableName);

// then
assertThat(describeRecordMetadataType(tableName))
.isEqualTo(
"OBJECT(offset NUMBER(10,0), "
+ "topic VARCHAR(16777216), "
+ "partition NUMBER(10,0), "
+ "key VARCHAR(16777216), "
+ "schema_id NUMBER(10,0), "
+ "key_schema_id NUMBER(10,0), "
+ "CreateTime NUMBER(19,0), "
+ "LogAppendTime NUMBER(19,0), "
+ "SnowflakeConnectorPushTime NUMBER(19,0), "
+ "headers MAP(VARCHAR(16777216), "
+ "VARCHAR(16777216)))");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class IcebergInitServiceTest {
@Test
void testInitializeIcebergTableProperties() {
icebergInitService.initializeIcebergTableProperties("test_table");

verify(mockConnection).addMetadataColumnForIcebergIfNotExists("test_table");
verify(mockConnection).initializeMetadataColumnTypeForIceberg("test_table");
verifyNoMoreInteractions(mockConnection);
}
Expand Down
Loading
Loading