-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix tombstone ingestion with schematization #700
Conversation
@@ -152,7 +152,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column | |||
private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) { | |||
Map<String, String> schemaMap = new HashMap<>(); | |||
Schema schema = record.valueSchema(); | |||
if (schema != null) { | |||
if (schema != null && schema.fields() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added additional null check to ensure we don't NPE in the next for loop
|
||
// return empty if tombstone record | ||
if (node.size() == 0 | ||
&& this.behaviorOnNullValues == SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crux of the schematization tombstone change
"snowflake.database.name":"SNOWFLAKE_DATABASE", | ||
"snowflake.schema.name":"SNOWFLAKE_SCHEMA", | ||
"key.converter":"org.apache.kafka.connect.storage.StringConverter", | ||
"value.converter": "org.apache.kafka.connect.json.JsonConverter", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using community JsonConverter instead of Snowflake customer converter due to this bug
@@ -31,10 +31,19 @@ def send(self): | |||
print("Sending in Partition:" + str(p)) | |||
key = [] | |||
value = [] | |||
for e in range(self.recordNum): | |||
for e in range(self.recordNum - 1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are sending the tombstone record as the last record. The schema evolution tests rely on exactly recordNum rows being sent (throws NonRetryableError). Additionally a majority of the tests rely on row count as a buffer threshold, so sending the tombstone record as an additional row would increase test runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks.
why do we have to make changes in normal snowpipe streaming tests? we can do this change in tombstone specific tests right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes i could add a tombstone specific test, but i didn't want to extend the e2e runtime. plus tombstone ingestion is enabled by default, so i think it makes sense to put it in the normal e2e tests
happy to copy this into a new test if you prefer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, thanks for explanation, i missed it is default enabled. please add a comment wherever possible. Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point it is confusing, added comments to the tests
Can you add a description on what bug is fixed? |
Updated the description. The fixed bug is to enable tombstone ingestion with schematization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
Left a suggestion for adding comment about tombstone since it wont be obvious by reading test code why we expect one less record. But thanks for adding all those tests. Good stuff.
Lets also get an approval from @sfc-gh-tzhang related to schematization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
@@ -275,6 +275,13 @@ public Map<String, Object> getProcessedRecordForStreamingIngest(SinkRecord recor | |||
private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node) | |||
throws JsonProcessingException { | |||
final Map<String, Object> streamingIngestRow = new HashMap<>(); | |||
|
|||
// return empty if tombstone record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably has tested this, but all columns will be NULL for that row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to fix this as well |
I'll do this in a followup PR, I want to reduce existing tombstone behavior changes |
Test changes