-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Support Flink read / write data branch #3029
[Feature] Support Flink read / write data branch #3029
Conversation
2a0fd59
to
2ab54f9
Compare
@FangYongs @schnappi17 PTAL cc @JingsongLi |
a395178
to
31894ce
Compare
60ec1f8
to
2796791
Compare
2796791
to
e16fa74
Compare
@FangYongs @schnappi17 When you have time, please take a look at this PR |
e16fa74
to
b4da7b7
Compare
@FangYongs @schnappi17 Do we need to send an email to discuss the part of this optimization ? |
36c1484
to
c9069e6
Compare
e9005ef
to
c98f80b
Compare
a98a702
to
ed4a620
Compare
} | ||
|
||
public static String branch(Map<String, String> options) { | ||
if (options.containsKey(BRANCH.key())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use options.get() will handle with the default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the previous description was incorrect, Most of the parameters for this method come from "org.apache.paimon.table.FileStoreTable #options", 'branch' parameter may be empty.
@@ -78,61 +79,75 @@ public class SchemaManager implements Serializable { | |||
|
|||
private final FileIO fileIO; | |||
private final Path tableRoot; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to do redundant modifications.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
@@ -104,7 +104,8 @@ public TableSchema( | |||
this.highestFieldId = highestFieldId; | |||
this.partitionKeys = partitionKeys; | |||
this.primaryKeys = primaryKeys; | |||
this.options = Collections.unmodifiableMap(options); | |||
Objects.requireNonNull(options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -139,13 +138,13 @@ public RowKeyExtractor createRowKeyExtractor() { | |||
|
|||
@Override | |||
public SnapshotReader newSnapshotReader() { | |||
return newSnapshotReader(DEFAULT_MAIN_BRANCH); | |||
return newSnapshotReader(CoreOptions.branch(options())); | |||
} | |||
|
|||
@Override | |||
public SnapshotReader newSnapshotReader(String branchName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the branchName here is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the branchName here is not used.
Yes, I will contribute a PR and delete it uniformly
01402fa
to
5fb55e0
Compare
@JingsongLi Please help to take a look at this, thanks a lot! |
5fb55e0
to
7714477
Compare
@FangYongs Please also help to take a look at~Thanks! |
e7fbe3e
to
734ecec
Compare
@@ -83,6 +86,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) { | |||
this.tableDefaultOptions = | |||
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX); | |||
this.catalogOptions = options; | |||
this.branchName = options.get(CoreOptions.BRANCH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the scope of branch option?
Catalog? Table? Why catalog need to know?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the scope of branch option? Catalog? Table? Why catalog need to know?
The scope is Table, but when using getTable() or manipulating the schema in the catalog, it is necessary to specify the branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scope is catalog, please respect your codes.
@@ -139,13 +138,13 @@ public RowKeyExtractor createRowKeyExtractor() { | |||
|
|||
@Override | |||
public SnapshotReader newSnapshotReader() { | |||
return newSnapshotReader(DEFAULT_MAIN_BRANCH); | |||
return newSnapshotReader(CoreOptions.branch(options())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use coreOptions()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use coreOptions()?
Yes, the table already specified the branch during initialization
@@ -85,55 +92,74 @@ public Path changelogDirectory() { | |||
return new Path(tablePath + "/changelog"); | |||
} | |||
|
|||
public Path longLivedChangelogPath(long snapshotId) { | |||
return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId); | |||
public Path changelogDirectory(String branchName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the design? Why do need to pass branch here when there is already a branch in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mostly opened up methods with and without branch parameters. The methods with branch parameters are mainly for BranchManager and test preparation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, please modify BranchManager
to use separate SnapshotManager
s instead of using this method.
1642152
to
ecbc3bc
Compare
@JingsongLi PTAL |
9a34daf
to
bc4543e
Compare
bc4543e
to
750797f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! @sunxiaojian thanks!
Purpose
Linked issue: close (#2861)
When writing and reading branch data for Flink, it was found that if the "TableCommitImpl newcommit (String commitUser, String branchName)" interface is added like this The parameter "branchName" has a wide range of impacts, and when adding features in the future, we will also consider whether to support the branch, which is a challenge for developers. Therefore, in order to make developers not need to worry about the branch function and not need to consider branch writing and reading when adding interfaces, I have made the following refactoring and optimization
The implementation logic of 'SnapshotManager snapshotManager()', 'TagManager tagManager()', and 'BranchManager branchManager()' directly specifies the default written branch, so that we no longer need to specify the 'branch' parameter when declaring new interfaces for subsequent interfaces.
Add necessary unit test tests, such as CDC testing, to support Flink CDC writing to branches
After this PR, remove interfaces with the branchName parameter such as' TableCommitImpl newcommit (String commitUser, String branchName) '
Tests
API and Format
Documentation