Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tombstone ingestion with schematization #700

Merged
merged 22 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column
private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) {
Map<String, String> schemaMap = new HashMap<>();
Schema schema = record.valueSchema();
if (schema != null) {
if (schema != null && schema.fields() != null) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added additional null check to ensure we don't NPE in the next for loop

for (Field field : schema.fields()) {
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ public Map<String, Object> getProcessedRecordForStreamingIngest(SinkRecord recor
private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node)
throws JsonProcessingException {
final Map<String, Object> streamingIngestRow = new HashMap<>();

// return empty if tombstone record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably has tested this, but all columns will be NULL for that row?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup tombstone records only have a RECORD_METADATA column and the rest are all null. The key is in the RECORD_METADATA
image

if (node.size() == 0
&& this.behaviorOnNullValues == SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crux of the schematization tombstone change

return streamingIngestRow;
}

Iterator<String> columnNames = node.fieldNames();
while (columnNames.hasNext()) {
String columnName = columnNames.next();
Expand Down Expand Up @@ -559,6 +566,7 @@ public boolean shouldSkipNullValue(
// get valueSchema
Schema valueSchema = record.valueSchema();
if (valueSchema instanceof SnowflakeJsonSchema) {
// TODO SNOW-916052: will not skip if record.value() == null
// we can conclude this is a custom/KC defined converter.
// i.e one of SFJson, SFAvro and SFAvroWithSchemaRegistry Converter
if (record.value() instanceof SnowflakeRecordContent) {
Expand Down
149 changes: 59 additions & 90 deletions src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,19 @@
import org.junit.Test;

public class ConnectorConfigTest {
// subset of valid community converters
public static final List<Converter> COMMUNITY_CONVERTER_SUBSET =
Arrays.asList(
new org.apache.kafka.connect.storage.StringConverter(),
new org.apache.kafka.connect.json.JsonConverter(),
new io.confluent.connect.avro.AvroConverter());

public enum CommunityConverterSubset {
STRING_CONVERTER(
"org.apache.kafka.connect.storage.StringConverter",
new org.apache.kafka.connect.storage.StringConverter()),
JSON_CONVERTER(
"org.apache.kafka.connect.json.JsonConverter",
new org.apache.kafka.connect.json.JsonConverter()),
AVRO_CONVERTER(
"io.confluent.connect.avro.AvroConverter", new io.confluent.connect.avro.AvroConverter());

private final String name;
public final Converter converter;

CommunityConverterSubset(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
};

public enum CustomSfConverter {
JSON_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
new com.snowflake.kafka.connector.records.SnowflakeJsonConverter()),
AVRO_CONVERTER_WITHOUT_SCHEMA_REGISTRY(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry()),
AVRO_CONVERTER(
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
new com.snowflake.kafka.connector.records.SnowflakeAvroConverter());

private final String name;
public final Converter converter;

CustomSfConverter(String name, Converter converter) {
this.name = name;
this.converter = converter;
}

public String toString() {
return this.name;
}
}
// custom snowflake converters, not currently allowed for streaming
public static final List<Converter> CUSTOM_SNOWFLAKE_CONVERTERS =
Arrays.asList(
new com.snowflake.kafka.connector.records.SnowflakeJsonConverter(),
new com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry(),
new com.snowflake.kafka.connector.records.SnowflakeAvroConverter());

@Test
public void testConfig() {
Expand Down Expand Up @@ -705,19 +671,21 @@ public void testValidKeyAndValueConvertersForStreamingSnowpipe() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});
COMMUNITY_CONVERTER_SUBSET.forEach(
converter -> {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());
Utils.validateConfig(config);
});

Arrays.stream(CommunityConverterSubset.values())
.forEach(
converter -> {
config.put(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);
Utils.validateConfig(config);
});
COMMUNITY_CONVERTER_SUBSET.forEach(
converter -> {
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());
Utils.validateConfig(config);
});
}

@Test
Expand All @@ -728,22 +696,23 @@ public void testInvalidKeyConvertersForStreamingSnowpipe() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD, converter.name);
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
CUSTOM_SNOWFLAKE_CONVERTERS.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
});
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
Expand All @@ -754,23 +723,23 @@ public void testInvalidValueConvertersForStreamingSnowpipe() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

Arrays.stream(CustomSfConverter.values())
.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, converter.name);

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
});
CUSTOM_SNOWFLAKE_CONVERTERS.forEach(
converter -> {
try {
config.put(
SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
converter.getClass().toString());

Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD);
}
});
}

@Test
Expand Down
Loading