Skip to content

Commit

Permalink
Add validations for record content
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 4, 2024
1 parent 0112d6a commit 4540ad0
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
import java.util.HashMap;
import java.util.Map;

/** Validates dependencies between parameters in Iceberg mode. */
public class IcebergConfigValidator implements StreamingConfigValidator {

private static final String NO_SCHEMATIZATION_ERROR_MESSAGE =
"Ingestion to Iceberg table requires " + ENABLE_SCHEMATIZATION_CONFIG + " set to true";
private static final String INCOMPATIBLE_INGESTION_METHOD =
"Ingestion to Iceberg table is supported only for Snowpipe Streaming";

Expand All @@ -28,15 +24,9 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {

Map<String, String> validationErrors = new HashMap<>();

boolean isSchematizationEnabled =
Boolean.parseBoolean(
inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG));
IngestionMethodConfig ingestionMethod =
IngestionMethodConfig.valueOf(inputConfig.get(INGESTION_METHOD_OPT).toUpperCase());

if (!isSchematizationEnabled) {
validationErrors.put(ENABLE_SCHEMATIZATION_CONFIG, NO_SCHEMATIZATION_ERROR_MESSAGE);
}
if (ingestionMethod != SNOWPIPE_STREAMING) {
validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;

import com.snowflake.kafka.connector.internal.DescribeTableRow;
Expand Down Expand Up @@ -38,15 +39,6 @@ public void validateTable(String tableName, String role, boolean schemaEvolution
.filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_METADATA))
.findFirst();

if (schemaEvolutionEnabled) {
validateSchemaEvolutionScenario(tableName, role, metadata);
} else {
validateNoSchemaEvolutionScenario(metadata);
}
}

private void validateSchemaEvolutionScenario(
String tableName, String role, Optional<DescribeTableRow> metadata) {
metadata.ifPresent(
m -> {
// if metadata column exists it must be of type OBJECT(), if not exists we create on our
Expand All @@ -56,17 +48,29 @@ private void validateSchemaEvolutionScenario(
}
});

if (schemaEvolutionEnabled) {
validateSchemaEvolutionScenario(tableName, role);
} else {
validateNoSchemaEvolutionScenario(columns);
}
}

private void validateSchemaEvolutionScenario(String tableName, String role) {
if (!snowflakeConnectionService.hasSchemaEvolutionPermission(tableName, role)) {
throw SnowflakeErrors.ERROR_0032.getException("schema_evolution_not_enabled");
}
}

private static void validateNoSchemaEvolutionScenario(Optional<DescribeTableRow> metadata) {
if (!metadata.isPresent()) {
throw SnowflakeErrors.ERROR_0032.getException("metadata_column_not_found");
private static void validateNoSchemaEvolutionScenario(List<DescribeTableRow> columns) {
Optional<DescribeTableRow> recordContent =
columns.stream()
.filter(c -> Objects.equals(c.getColumn(), TABLE_COLUMN_CONTENT))
.findFirst();
if (!recordContent.isPresent()) {
throw SnowflakeErrors.ERROR_0032.getException("record_content_column_not_found");
}
if (!isOfStructuredObjectType(metadata.get())) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_metadata_type");
if (!isOfStructuredObjectType(recordContent.get())) {
throw SnowflakeErrors.ERROR_0032.getException("invalid_record_content_type");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ public void shouldReturnErrorOnInvalidConfig(Map<String, String> config, String
public static Stream<Arguments> validConfigs() {
return Stream.of(
Arguments.of(SnowflakeSinkConnectorConfigBuilder.snowpipeConfig().build()),
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()));
Arguments.of(SnowflakeSinkConnectorConfigBuilder.icebergConfig().build()),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSchematizationEnabled(false)
.build(),
ENABLE_SCHEMATIZATION_CONFIG));
}

public static Stream<Arguments> invalidConfigs() {
Expand All @@ -49,11 +54,6 @@ public static Stream<Arguments> invalidConfigs() {
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE)
.build(),
INGESTION_METHOD_OPT),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSchematizationEnabled(false)
.build(),
ENABLE_SCHEMATIZATION_CONFIG));
INGESTION_METHOD_OPT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ protected static void createIcebergTable(String tableName) {
createIcebergTableWithColumnClause(tableName, "record_metadata object()");
}

protected static void createIcebergTableNoSchemaEvolution(String tableName) {
createIcebergTableWithColumnClause(
tableName, "record_metadata object(), record_content object()");
}

protected static void createIcebergTableWithColumnClause(String tableName, String columnClause) {
String query =
"create or replace iceberg table identifier(?) ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SchemaEvolutionNotEnabled {
@Test
public void shouldValidateExpectedIcebergTableSchema() {
// given
createIcebergTable(tableName);
createIcebergTableNoSchemaEvolution(tableName);
enableSchemaEvolution(tableName);

// when, then
Expand All @@ -100,7 +100,7 @@ public void shouldThrowExceptionWhenTableDoesNotExist() {
}

@Test
public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
public void shouldThrowExceptionWhenRecordContentDoesNotExist() {
// given
createIcebergTableWithColumnClause(tableName, "some_column VARCHAR");
enableSchemaEvolution(tableName);
Expand All @@ -111,6 +111,28 @@ public void shouldThrowExceptionWhenRecordMetadataDoesNotExist() {
() -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION));
}

@Test
public void shouldThrowExceptionWhenRecordContentHasInvalidType() {
// given
createIcebergTableWithColumnClause(tableName, "record_content MAP(VARCHAR, VARCHAR)");
enableSchemaEvolution(tableName);

// expect
Assertions.assertThrows(
SnowflakeKafkaConnectorException.class,
() -> schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION));
}

@Test
public void shouldNotThrowExceptionWhenColumnRecordMetadataDoesNotExist() {
// given
createIcebergTableWithColumnClause(tableName, "record_content object()");
enableSchemaEvolution(tableName);

// expect
schemaValidator.validateTable(tableName, TEST_ROLE, SCHEMA_EVOLUTION);
}

@Test
public void shouldThrowExceptionWhenRecordMetadataHasInvalidType() {
// given
Expand Down

0 comments on commit 4540ad0

Please sign in to comment.