-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1692749: no schema evolution it + refactor #960
SNOW-1692749: no schema evolution it + refactor #960
Conversation
void shouldInsertRecords(String description, String message, boolean withSchema) | ||
throws Exception { | ||
service.insert( | ||
Arrays.asList( | ||
createKafkaRecord(message, 0, withSchema), createKafkaRecord(message, 1, withSchema))); | ||
waitForOffset(2); | ||
service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema))); | ||
waitForOffset(3); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do not compare the values still, will do in next PR (smaller PRs). There is some small bug i think in ingestSDK and the int64 is not properly saved.
+ RECORD_CONTENT_OBJECT_SCHEMA); | ||
} | ||
|
||
private static final String RECORD_CONTENT_OBJECT_SCHEMA = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we move it to the top?
waitForOffset(3); | ||
} | ||
|
||
private static Stream<Arguments> prepareData() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't have strict guidelines on that, but what do you think about keeping the @MethoudSource parameters just above the test method so we have a clear view of the data and the test scenario?
service.insert( | ||
Arrays.asList( | ||
createKafkaRecord(message, 0, withSchema), createKafkaRecord(message, 1, withSchema))); | ||
waitForOffset(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a question, as we inserted messages with offsets 0
and 1
, why do we wait for the offset 2
?
According to the java doc com.snowflake.kafka.connector.internal.SnowflakeSinkService#getOffset:
retrieve offset of last loaded record for given pipe name
so I would expect to see 1
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the consumer offsets points to the offset it should start reading. This is always +1 bigger that the offset consumed.
.../java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few ordering suggestions
Overview
SNOW-1692749
Pre-review checklist
snowflake.ingestion.method
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected