-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PROD-39429 Implement migrate sys func from new channel(Format V2) to old channel (V1) - Push to Main #751
PROD-39429 Implement migrate sys func from new channel(Format V2) to old channel (V1) - Push to Main #751
Conversation
} | ||
if (migrateOffsetTokenResultFromSysFunc == null) { | ||
LOGGER.warn( | ||
"No result found in Migrating OffsetToken through System Function for tableName:{}," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean by no result found? No destination channel or source channel found? or both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not expected, should we throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not expected and I thought we decided to not throw any exceptions/swallow all and continue using old channel.
I can see the concern though but at this point I feel it is about whether we want to halt ingestion or atleast ignore the exception and continue moving forward with old. Halting ingestion could be better and it helps us know something is wrong rather than continuing with old channel with ramifications we dont.
WDYT Toby?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that its better to swallow this exception here, but we should have something to track an error here - maybe an alert on the server side if we don't return an offset on the func call, or a new telemetry event similar to reportKafkaFatalError (if we aren't expecting too many hits on this error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, both..
I think we should throw an exception if this is not expected.. I can add a server side incident and also add report telemetry..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For an unexpected exception, I am throwing a runtime exception. PTAL! It's better to fail here IMO. Thanks folks!
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java
Outdated
Show resolved
Hide resolved
"Migrating OffsetToken for a SourceChannel:{} in table:{} failed due to:{}", | ||
sourceChannelName, | ||
fullyQualifiedTableName, | ||
e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getMessage might be NULL for some exceptions, could we do better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth logging stacktrace too. let me add that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Show resolved
Hide resolved
@@ -88,17 +88,17 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception { | |||
new TopicPartitionChannel( | |||
mockStreamingClient, | |||
topicPartition, | |||
SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any tests that can actually test the end2end migration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all end to end tests will eventually go through this path right? since it is default to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be hard to test this scenario in e2e since we wouldn't be able to upgrade the version. Could we add an IT (maybe in SnowflakeSinkServiceV2IT) that explicitly opens and ingests to a channel with V2 name, shuts it down, then creates a new sink and runs through the channel migration?
Not 100% sure this is possible, but maybe we can stick a mock stub for the client to hard code the channel name during open?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea to ingest through topicpartition channel through v2 and then let it go through migration.. I will add IT tests..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I see the issue, the IT tests are showing up other PR: https://github.com/snowflakedb/snowflake-kafka-connector/pull/750/files
I might have messed up with merge conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ported the IT tests here, added few more tests in connectionIT tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm, but i would like an IT or e2e test that explictly opens and ingests to a V2 channel and confirms that is migrated later on
public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT = true; | ||
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC = | ||
"This config is used to enable/disable streaming channel offset migration logic. If true, we" | ||
+ " will migrate offset token from channel name format V2 to name format v1."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: lets add something about how V2 is deprecated. Otherwise customers may disable this because V2 sounds fancier than V1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, let me add!
} | ||
if (migrateOffsetTokenResultFromSysFunc == null) { | ||
LOGGER.warn( | ||
"No result found in Migrating OffsetToken through System Function for tableName:{}," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that its better to swallow this exception here, but we should have something to track an error here - maybe an alert on the server side if we don't return an offset on the func call, or a new telemetry event similar to reportKafkaFatalError (if we aren't expecting too many hits on this error)
...a/com/snowflake/kafka/connector/internal/streaming/ChannelMigrateOffsetTokenResponseDTO.java
Show resolved
Hide resolved
@@ -278,6 +285,14 @@ public TopicPartitionChannel( | |||
|
|||
this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission; | |||
|
|||
this.channelNameFormatV2 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets move this channelNameFormatV2 into the if statement? I don't think V2 is needed after the channel is migrated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was private final so had to put it outside in ctor. Let me think if this is needed as instance variable. (i had plans to modify it in tests but havent added any IT tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
@@ -88,17 +88,17 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception { | |||
new TopicPartitionChannel( | |||
mockStreamingClient, | |||
topicPartition, | |||
SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be hard to test this scenario in e2e since we wouldn't be able to upgrade the version. Could we add an IT (maybe in SnowflakeSinkServiceV2IT) that explicitly opens and ingests to a channel with V2 name, shuts it down, then creates a new sink and runs through the channel migration?
Not 100% sure this is possible, but maybe we can stick a mock stub for the client to hard code the channel name during open?
Sorry, the merge conflicts didnt show IT tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, PTAL, otherwise LGTM!
LOGGER.info( | ||
"Migrate OffsetToken response for table:{}, sourceChannel:{}, destinationChannel:{}" | ||
+ " is:{}", | ||
tableName, | ||
sourceChannelName, | ||
destinationChannelName, | ||
channelMigrateOffsetTokenResponseDTO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log might be confusing to customer, could we only log when there is actually a migration being done? Or do we even need this client side log given that we have a bunch of server side logging in place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fine, this is only once per channel during open partition, not very often. I would like to keep this which helps us in debugging any customer issue.
...main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrationResponseCode.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java
Show resolved
Hide resolved
...main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrationResponseCode.java
Show resolved
Hide resolved
…ion system function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change! The ITs are pretty cool - but small request to try ingestion after the migration as well
|
||
String migrateOffsetTokenResultFromSysFunc = null; | ||
if (resultSet.next()) { | ||
migrateOffsetTokenResultFromSysFunc = resultSet.getString(1 /*Only one column*/); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of scope for this PR - do we have any guarantees that this DTO won't change on the server side? a comment, test or something server side to warn against changes since we now throw the exception on failure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i think the json exception will help with that.. I think I have the comment on server side saying be cautious to make changes in the interface. but let me try to mimick and see what happens if response is changed. I could probably handle new fields in the object mapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the method to its own method so that we can test it. If the response changes from server side which shouldnt, this tests methods will start failing
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java
Show resolved
Hide resolved
0b2b4a6
to
b9b1754
Compare
b9b1754
to
e54b031
Compare
…old channel (V1) - Push to Main (snowflakedb#751)
…old channel (V1) - Push to Main (snowflakedb#751)
…old channel (V1) - Push to Main (snowflakedb#751)
…old channel (V1) - Push to Main (snowflakedb#751)
…old channel (V1) - Push to Main (snowflakedb#751)
PR #750 but pushed in main.
Copying it as is.
This is a long term implementation for potential data duplication introduced because of a new channel name format.
channelNameFormatV2
Added tests in
TopicPartitionChannel
andTopicPartitionChannel
Notes:
End to End tests:
Use 2.1.0
Use two connectors:
Uses channel Name v2
Stop
Replace jar (2.1.1) and restart.
Does the migration and you can only see old channel format
Stop again and restart in 2.1.1
Nothing happens and we do get a valid response saying newchannelFormatV2 doesnt exist