-
Notifications
You must be signed in to change notification settings - Fork 990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support RollbackAction and Procedure with timestamp #4333
Conversation
@@ -51,7 +56,16 @@ public void run() throws Exception { | |||
} | |||
|
|||
if (version.chars().allMatch(Character::isDigit)) { | |||
table.rollbackTo(Long.parseLong(version)); | |||
if (isTimestamp != null && isTimestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here may not be boolean, because this argument may be null.
fileStoreTable | ||
.snapshotManager() | ||
.earlierOrEqualTimeMills(Long.parseLong(version)) | ||
.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method invocation 'id' may produce 'NullPointerException'
@@ -30,6 +30,8 @@ public class RollbackToActionFactory implements ActionFactory { | |||
|
|||
private static final String VERSION = "version"; | |||
|
|||
private static final String IS_TIMESTAMP = "is_timestamp"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should also modify doc.
} else { | ||
FileStoreTable fileStoreTable = (FileStoreTable) table; | ||
fileStoreTable.rollbackTo( | ||
fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp).id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method invocation 'id' may produce 'NullPointerException'
@@ -32,14 +33,18 @@ public class RollbackToAction extends TableActionBase { | |||
|
|||
private final String version; | |||
|
|||
private final Boolean isTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use long timestamp uniformly in flink action && flink procedure && spark procedure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly I also want to change it like you suggested, but consider older version spark user their procedure sql would not be compatible, so keep it can use in older version first. @wwj6591812
@@ -51,7 +56,16 @@ public void run() throws Exception { | |||
} | |||
|
|||
if (version.chars().allMatch(Character::isDigit)) { | |||
table.rollbackTo(Long.parseLong(version)); | |||
if (isTimestamp != null && isTimestamp) { | |||
FileStoreTable fileStoreTable = (FileStoreTable) table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move line 60 from here to line 57 like spark procedure.
@@ -68,17 +68,31 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { | |||
inputData.addData((2, "b")) | |||
stream.processAllAvailable() | |||
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) | |||
val ts = System.currentTimeMillis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline this
Row(true) :: Nil) | ||
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) | ||
|
||
// rollback to timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rollback with timestamp
Id you finish, please delete "[WIP]" in title. |
Thanks @wwj6591812 had addressed. |
+1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just introduce a rollback_to_timestamp
procedure?
OK it is a better way,would change it @JingsongLi thanks |
Had link this pr to #4410, close the pr firstly~ @JingsongLi @wwj6591812 |
Purpose
Currently Rollback cannot rollback according timestamp which can make user more easy to rollback,this pr is aim to support it.
due to difference in spark and flink args do two ways to support it.
Linked issue: close #xxx
Tests
API and Format
Documentation