Skip to content

Commit

Permalink
Fix tombstone ingestion with schematization (snowflakedb#700)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng authored and khsoneji committed Oct 12, 2023
1 parent 2b33c63 commit fcf81fe
Show file tree
Hide file tree
Showing 14 changed files with 572 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column
private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) {
Map<String, String> schemaMap = new HashMap<>();
Schema schema = record.valueSchema();
if (schema != null) {
if (schema != null && schema.fields() != null) {
for (Field field : schema.fields()) {
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ public Map<String, Object> getProcessedRecordForStreamingIngest(SinkRecord recor
private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node)
throws JsonProcessingException {
final Map<String, Object> streamingIngestRow = new HashMap<>();

// return empty if tombstone record
if (node.size() == 0
&& this.behaviorOnNullValues == SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT) {
return streamingIngestRow;
}

Iterator<String> columnNames = node.fieldNames();
while (columnNames.hasNext()) {
String columnName = columnNames.next();
Expand Down Expand Up @@ -559,6 +566,7 @@ public boolean shouldSkipNullValue(
// get valueSchema
Schema valueSchema = record.valueSchema();
if (valueSchema instanceof SnowflakeJsonSchema) {
// TODO SNOW-916052: will not skip if record.value() == null
// we can conclude this is a custom/KC defined converter.
// i.e one of SFJson, SFAvro and SFAvroWithSchemaRegistry Converter
if (record.value() instanceof SnowflakeRecordContent) {
Expand Down
149 changes: 59 additions & 90 deletions src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,53 +26,19 @@
import static org.junit.Assert.assertThrows;

public class ConnectorConfigTest {
// subset of valid community converters
public static final List<Converter> COMMUNITY_CONVERTER_SUBSET =
Arrays.asList(
new org.apache.kafka.connect.storage.StringConverter(),
new org.apache.kafka.connect.json.JsonConverter(),
new io.confluent.connect.avro.AvroConverter());

public enum CommunityConverterSubset {
STRING_CONVERTER(
"org.apache.kafka.connect.storage.StringConverter",
new org.apache.kafka.connect.storage.StringConverter()),
JSON_CONVERTER(
"org.apache.kafka.connect.json.JsonConverter",
new org.apache.kafka.connect.json.JsonConverter()),
AVRO_CONVERTER(
"io.confluent.connect.avro.AvroConverter", new io.confluent.connect.avro.AvroConverter());

private final String name;
public final Converter converter;

CommunityConverterSubset(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
};

public enum CustomSfConverter {
JSON_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
new com.snowflake.kafka.connector.records.SnowflakeJsonConverter()),
AVRO_CONVERTER_WITHOUT_SCHEMA_REGISTRY(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry()),
AVRO_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverter());

private final String name;
public final Converter converter;

CustomSfConverter(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
}
// custom snowflake converters, not currently allowed for streaming
public static final List<Converter> CUSTOM_SNOWFLAKE_CONVERTERS =
Arrays.asList(
new com.snowflake.kafka.connector.records.SnowflakeJsonConverter(),
new com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry(),
new com.snowflake.kafka.connector.records.SnowflakeAvroConverter());

@Test
public void testConfig() {
Expand Down Expand Up @@ -726,19 +692,21 @@ public void testValidKeyAndValueConvertersForStreamingSnowpipe() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});
COMMUNITY_CONVERTER_SUBSET.forEach(
converter -> {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());
Utils.validateConfig(config);
});

Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});
COMMUNITY_CONVERTER_SUBSET.forEach(
converter -> {
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());
Utils.validateConfig(config);
});
}

@Test
Expand All @@ -749,22 +717,23 @@ public void testInvalidKeyConvertersForStreamingSnowpipe() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
CUSTOM_SNOWFLAKE_CONVERTERS.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
});
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
Expand All @@ -775,23 +744,23 @@ public void testInvalidValueConvertersForStreamingSnowpipe() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
});
CUSTOM_SNOWFLAKE_CONVERTERS.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
Expand Down
Loading

0 comments on commit fcf81fe

Please sign in to comment.