-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix tombstone ingestion with schematization #700
Changes from 16 commits
408dc10
3921ec5
f98645b
3fabaf8
b10e905
16b959a
190b1a6
1744ea7
323c9c9
e4a1a9f
6d02129
f035f98
3d3c538
ad3f516
8c6a708
2a9f42b
5c699b2
22d5284
6c5546c
8b94889
445d4ee
a2a6279
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,6 +275,13 @@ public Map<String, Object> getProcessedRecordForStreamingIngest(SinkRecord recor | |
private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node) | ||
throws JsonProcessingException { | ||
final Map<String, Object> streamingIngestRow = new HashMap<>(); | ||
|
||
// return empty if tombstone record | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably has tested this, but all columns will be NULL for that row? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if (node.size() == 0 | ||
&& this.behaviorOnNullValues == SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. crux of the schematization tombstone change |
||
return streamingIngestRow; | ||
} | ||
|
||
Iterator<String> columnNames = node.fieldNames(); | ||
while (columnNames.hasNext()) { | ||
String columnName = columnNames.next(); | ||
|
@@ -559,6 +566,7 @@ public boolean shouldSkipNullValue( | |
// get valueSchema | ||
Schema valueSchema = record.valueSchema(); | ||
if (valueSchema instanceof SnowflakeJsonSchema) { | ||
// TODO SNOW-916052: will not skip if record.value() == null | ||
// we can conclude this is a custom/KC defined converter. | ||
// i.e one of SFJson, SFAvro and SFAvroWithSchemaRegistry Converter | ||
if (record.value() instanceof SnowflakeRecordContent) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added additional null check to ensure we don't NPE in the next for loop