From 23d53bd87cefaba4238753aded638f8a5f0f1011 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 25 Jul 2024 17:03:20 +0800 Subject: [PATCH 1/6] [core] Support reading partition from fallback branch when not found in current branch --- docs/content/maintenance/manage-branches.md | 69 ++++ .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 8 + .../privilege/PrivilegedFileStoreTable.java | 74 +--- .../paimon/table/AbstractFileStoreTable.java | 6 + .../paimon/table/DelegatedFileStoreTable.java | 268 +++++++++++++++ .../table/FallbackReadFileStoreTable.java | 322 ++++++++++++++++++ .../paimon/table/FileStoreTableFactory.java | 25 +- .../apache/paimon/table/ReadonlyTable.java | 9 + .../java/org/apache/paimon/table/Table.java | 9 +- .../table/source/AbstractDataTableScan.java | 6 + .../paimon/table/source/InnerTableScan.java | 6 + .../table/source/snapshot/SnapshotReader.java | 2 + .../source/snapshot/SnapshotReaderImpl.java | 6 + .../paimon/table/system/AuditLogTable.java | 12 + .../apache/paimon/utils/BranchManager.java | 42 +++ .../procedure/CreateBranchProcedure.java | 23 ++ .../apache/paimon/flink/BranchSqlITCase.java | 59 +++- 18 files changed, 878 insertions(+), 74 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index 4343214f2c28..c54121f73e38 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -159,3 +159,72 @@ Run the following command: {{< /tab >}} {{< /tabs >}} + +### Batch Reading from Fallback Branch + +You can set the table option `scan.fallback-branch` +so that when a batch job reads from the current branch, if a partition does not exist, +the reader will try to read this partition from the fallback branch. +For streaming read jobs, this feature is currently not supported, and will only produce results from the current branch. + +What's the use case of this feature? Say you have created a Paimon table partitioned by date. +You have a long-running streaming job which inserts records into Paimon, so that today's data can be queried in time. +You also have a batch job which runs at every night to insert corrected records of yesterday into Paimon, +so that the preciseness of the data can be promised. + +When you query from this Paimon table, you would like to first read from the results of batch job. +But if a partition (for example, today's partition) does not exist in its result, +then you would like to read from the results of streaming job. +In this case, you can create a branch for streaming job, and set `scan.fallback-branch` to this streaming branch. + +Let's look at an example. + +{{< tabs "read-fallback-branch" >}} + +{{< tab "Flink" >}} + +```sql +-- create Paimon table +CREATE TABLE T ( + dt STRING NOT NULL, + name STRING NOT NULL, + amount BIGINT +) PARTITIONED BY (dt); + +-- create a branch for streaming job +-- you can even specify primary keys and bucket number, even if the original branch has no primary key +-- in this example, we create a new branch `test`, uses `dt` and `name` as primary keys, has a bucket number of 2, and will copy the table options from the original branch +CALL sys.create_branch('default.T', 'test', 'dt, name', 2, true); + +-- set fallback branch +ALTER TABLE T SET ( + 'scan.fallback-branch' = 'test' +); + +-- set changelog producer for the streaming branch, in case a streaming job would like to read from it in the future +ALTER TABLE `T$branch_test` SET ( + 'changelog-producer' = 'lookup' +); + +-- write records into the streaming branch +INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240726', 'cherry', 3), ('20240726', 'pear', 6); + +-- write records into the default branch +INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7); + +SELECT * FROM T; +/* ++------------------+------------------+--------+ +| dt | name | amount | ++------------------+------------------+--------+ +| 20240725 | apple | 5 | +| 20240725 | banana | 7 | +| 20240726 | cherry | 3 | +| 20240726 | pear | 6 | ++------------------+------------------+--------+ +*/ +``` + +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index d6eb502353f5..446931acacda 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -581,6 +581,12 @@ Long End condition "watermark" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered. + +
scan.fallback-branch
+ (none) + String + When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch. +
scan.file-creation-time-millis
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 685634e2650a..17bff3653229 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1275,6 +1275,14 @@ public class CoreOptions implements Serializable { "The maximum number of concurrent deleting files. " + "By default is the number of processors available to the Java virtual machine."); + public static final ConfigOption SCAN_FALLBACK_BRANCH = + key("scan.fallback-branch") + .stringType() + .noDefaultValue() + .withDescription( + "When a batch job queries from a table, if a partition does not exist in the current branch, " + + "the reader will try to get this partition from this fallback branch."); + private final Options options; public CoreOptions(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 45ce6f0bf540..d590eb3708df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -18,20 +18,15 @@ package org.apache.paimon.privilege; -import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.Statistics; -import org.apache.paimon.table.BucketMode; -import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.DelegatedFileStoreTable; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.query.LocalTableQuery; -import org.apache.paimon.table.sink.RowKeyExtractor; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.sink.WriteSelector; @@ -40,55 +35,32 @@ import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; -import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; /** {@link FileStoreTable} with privilege checks. */ -public class PrivilegedFileStoreTable implements FileStoreTable { +public class PrivilegedFileStoreTable extends DelegatedFileStoreTable { - private final FileStoreTable wrapped; private final PrivilegeChecker privilegeChecker; private final Identifier identifier; public PrivilegedFileStoreTable( FileStoreTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) { - this.wrapped = wrapped; + super(wrapped); this.privilegeChecker = privilegeChecker; this.identifier = identifier; } - @Override - public String name() { - return wrapped.name(); - } - - @Override - public String fullName() { - return wrapped.fullName(); - } - @Override public SnapshotReader newSnapshotReader() { privilegeChecker.assertCanSelect(identifier); return wrapped.newSnapshotReader(); } - @Override - public CoreOptions coreOptions() { - return wrapped.coreOptions(); - } - - @Override - public SnapshotManager snapshotManager() { - return wrapped.snapshotManager(); - } - @Override public TagManager tagManager() { privilegeChecker.assertCanInsert(identifier); @@ -102,36 +74,11 @@ public BranchManager branchManager() { return wrapped.branchManager(); } - @Override - public Path location() { - return wrapped.location(); - } - - @Override - public FileIO fileIO() { - return wrapped.fileIO(); - } - - @Override - public TableSchema schema() { - return wrapped.schema(); - } - @Override public FileStore store() { return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker, identifier); } - @Override - public BucketMode bucketMode() { - return wrapped.bucketMode(); - } - - @Override - public CatalogEnvironment catalogEnvironment() { - return wrapped.catalogEnvironment(); - } - @Override public Optional statistics() { privilegeChecker.assertCanSelect(identifier); @@ -144,11 +91,6 @@ public FileStoreTable copy(Map dynamicOptions) { wrapped.copy(dynamicOptions), privilegeChecker, identifier); } - @Override - public OptionalLong latestSnapshotId() { - return wrapped.latestSnapshotId(); - } - @Override public FileStoreTable copy(TableSchema newTableSchema) { return new PrivilegedFileStoreTable( @@ -299,16 +241,6 @@ public LocalTableQuery newLocalTableQuery() { return wrapped.newLocalTableQuery(); } - @Override - public boolean supportStreamingReadOverwrite() { - return wrapped.supportStreamingReadOverwrite(); - } - - @Override - public RowKeyExtractor createRowKeyExtractor() { - return wrapped.createRowKeyExtractor(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d30fd73081a1..2aac9c4ceb94 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -562,6 +562,12 @@ public void createBranch(String branchName) { branchManager().createBranch(branchName); } + @Override + public void createBranch( + String branchName, List primaryKeys, int bucket, boolean copyOptions) { + branchManager().createBranch(branchName, primaryKeys, bucket, copyOptions); + } + @Override public void createBranch(String branchName, long snapshotId) { branchManager().createBranch(branchName, snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java new file mode 100644 index 000000000000..22b314f2aa2a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestCacheFilter; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.stats.Statistics; +import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.RowKeyExtractor; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.table.sink.WriteSelector; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.StreamDataTableScan; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; + +/** Delegated {@link FileStoreTable}. */ +public abstract class DelegatedFileStoreTable implements FileStoreTable { + + protected final FileStoreTable wrapped; + + public DelegatedFileStoreTable(FileStoreTable wrapped) { + this.wrapped = wrapped; + } + + @Override + public String name() { + return wrapped.name(); + } + + @Override + public String fullName() { + return wrapped.fullName(); + } + + @Override + public SnapshotReader newSnapshotReader() { + return wrapped.newSnapshotReader(); + } + + @Override + public CoreOptions coreOptions() { + return wrapped.coreOptions(); + } + + @Override + public SnapshotManager snapshotManager() { + return wrapped.snapshotManager(); + } + + @Override + public TagManager tagManager() { + return wrapped.tagManager(); + } + + @Override + public BranchManager branchManager() { + return wrapped.branchManager(); + } + + @Override + public Path location() { + return wrapped.location(); + } + + @Override + public FileIO fileIO() { + return wrapped.fileIO(); + } + + @Override + public TableSchema schema() { + return wrapped.schema(); + } + + @Override + public FileStore store() { + return wrapped.store(); + } + + @Override + public BucketMode bucketMode() { + return wrapped.bucketMode(); + } + + @Override + public CatalogEnvironment catalogEnvironment() { + return wrapped.catalogEnvironment(); + } + + @Override + public Optional statistics() { + return wrapped.statistics(); + } + + @Override + public OptionalLong latestSnapshotId() { + return wrapped.latestSnapshotId(); + } + + @Override + public void rollbackTo(long snapshotId) { + wrapped.rollbackTo(snapshotId); + } + + @Override + public void createTag(String tagName) { + wrapped.createTag(tagName); + } + + @Override + public void createTag(String tagName, long fromSnapshotId) { + wrapped.createTag(tagName, fromSnapshotId); + } + + @Override + public void createTag(String tagName, Duration timeRetained) { + wrapped.createTag(tagName, timeRetained); + } + + @Override + public void createTag(String tagName, long fromSnapshotId, Duration timeRetained) { + wrapped.createTag(tagName, fromSnapshotId, timeRetained); + } + + @Override + public void deleteTag(String tagName) { + wrapped.deleteTag(tagName); + } + + @Override + public void rollbackTo(String tagName) { + wrapped.rollbackTo(tagName); + } + + @Override + public void createBranch(String branchName) { + wrapped.createBranch(branchName); + } + + @Override + public void createBranch( + String branchName, List primaryKeys, int bucket, boolean copyOptions) { + wrapped.createBranch(branchName, primaryKeys, bucket, copyOptions); + } + + @Override + public void createBranch(String branchName, long snapshotId) { + wrapped.createBranch(branchName, snapshotId); + } + + @Override + public void createBranch(String branchName, String tagName) { + wrapped.createBranch(branchName, tagName); + } + + @Override + public void deleteBranch(String branchName) { + wrapped.deleteBranch(branchName); + } + + @Override + public void fastForward(String branchName) { + wrapped.fastForward(branchName); + } + + @Override + public ExpireSnapshots newExpireSnapshots() { + return wrapped.newExpireSnapshots(); + } + + @Override + public ExpireSnapshots newExpireChangelog() { + return wrapped.newExpireChangelog(); + } + + @Override + public DataTableScan newScan() { + return wrapped.newScan(); + } + + @Override + public StreamDataTableScan newStreamScan() { + return wrapped.newStreamScan(); + } + + @Override + public InnerTableRead newRead() { + return wrapped.newRead(); + } + + @Override + public Optional newWriteSelector() { + return wrapped.newWriteSelector(); + } + + @Override + public TableWriteImpl newWrite(String commitUser) { + return wrapped.newWrite(commitUser); + } + + @Override + public TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + return wrapped.newWrite(commitUser, manifestFilter); + } + + @Override + public TableCommitImpl newCommit(String commitUser) { + return wrapped.newCommit(commitUser); + } + + @Override + public LocalTableQuery newLocalTableQuery() { + return wrapped.newLocalTableQuery(); + } + + @Override + public boolean supportStreamingReadOverwrite() { + return wrapped.supportStreamingReadOverwrite(); + } + + @Override + public RowKeyExtractor createRowKeyExtractor() { + return wrapped.createRowKeyExtractor(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DelegatedFileStoreTable that = (DelegatedFileStoreTable) o; + return Objects.equals(wrapped, that.wrapped); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java new file mode 100644 index 000000000000..d254c6880613 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.DataFilePlan; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A {@link FileStoreTable} which mainly read from the current branch. However, if the current + * branch does not have a partition, it will read that partition from the fallback branch. + */ +public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { + + private final FileStoreTable fallback; + + public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback) { + super(main); + this.fallback = fallback; + + String mainBranch = main.coreOptions().branch(); + String fallbackBranch = fallback.coreOptions().branch(); + RowType mainRowType = main.schema().logicalRowType(); + RowType fallbackRowType = fallback.schema().logicalRowType(); + Preconditions.checkArgument( + mainRowType.equals(fallbackRowType), + "Branch %s and %s does not have the same row type.\n" + + "Row type of branch %s is %s.\n" + + "Row type of branch %s is %s.", + mainBranch, + fallbackBranch, + mainBranch, + mainRowType, + fallbackBranch, + fallbackRowType); + + List mainPrimaryKeys = main.schema().primaryKeys(); + List fallbackPrimaryKeys = fallback.schema().primaryKeys(); + if (!mainPrimaryKeys.isEmpty()) { + if (fallbackPrimaryKeys.isEmpty()) { + throw new IllegalArgumentException( + "Branch " + + mainBranch + + " has primary keys while fallback branch " + + fallbackBranch + + " does not. This is not allowed."); + } + Preconditions.checkArgument( + mainPrimaryKeys.equals(fallbackPrimaryKeys), + "Branch %s and %s both have primary keys but are not the same.\n" + + "Primary keys of %s are %s.\n" + + "Primary keys of %s are %s.", + mainBranch, + fallbackBranch, + mainBranch, + mainPrimaryKeys, + fallbackBranch, + fallbackPrimaryKeys); + } + } + + @Override + public FileStoreTable copy(Map dynamicOptions) { + return new FallbackReadFileStoreTable( + wrapped.copy(dynamicOptions), + fallback.copy(rewriteFallbackOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copy(TableSchema newTableSchema) { + return new FallbackReadFileStoreTable( + wrapped.copy(newTableSchema), + fallback.copy( + newTableSchema.copy(rewriteFallbackOptions(newTableSchema.options())))); + } + + @Override + public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + return new FallbackReadFileStoreTable( + wrapped.copyWithoutTimeTravel(dynamicOptions), + fallback.copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copyWithLatestSchema() { + return new FallbackReadFileStoreTable( + wrapped.copyWithLatestSchema(), fallback.copyWithLatestSchema()); + } + + private Map rewriteFallbackOptions(Map options) { + Map result = new HashMap<>(options); + + // snapshot ids may be different between the main branch and the fallback branch, + // so we need to convert main branch snapshot id to millisecond, + // then convert millisecond to fallback branch snapshot id + String scanSnapshotIdOptionKey = CoreOptions.SCAN_SNAPSHOT_ID.key(); + if (options.containsKey(scanSnapshotIdOptionKey)) { + long id = Long.parseLong(options.get(scanSnapshotIdOptionKey)); + long millis = wrapped.snapshotManager().snapshot(id).timeMillis(); + Snapshot fallbackSnapshot = fallback.snapshotManager().earlierOrEqualTimeMills(millis); + long fallbackId; + if (fallbackSnapshot == null) { + fallbackId = Snapshot.FIRST_SNAPSHOT_ID; + } else { + fallbackId = fallbackSnapshot.id(); + } + result.put(scanSnapshotIdOptionKey, String.valueOf(fallbackId)); + } + + // bucket number of main branch and fallback branch are very likely different, + // so we remove bucket in options to use fallback branch's bucket number + result.remove(CoreOptions.BUCKET.key()); + + return result; + } + + @Override + public DataTableScan newScan() { + return new Scan(); + } + + private class Scan implements DataTableScan { + + private final DataTableScan mainScan; + private final DataTableScan fallbackScan; + + private Scan() { + this.mainScan = wrapped.newScan(); + this.fallbackScan = fallback.newScan(); + } + + @Override + public Scan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { + mainScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + fallbackScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + return this; + } + + @Override + public Scan withFilter(Predicate predicate) { + mainScan.withFilter(predicate); + fallbackScan.withFilter(predicate); + return this; + } + + @Override + public Scan withLimit(int limit) { + mainScan.withLimit(limit); + fallbackScan.withLimit(limit); + return this; + } + + @Override + public Scan withPartitionFilter(Map partitionSpec) { + mainScan.withPartitionFilter(partitionSpec); + fallbackScan.withPartitionFilter(partitionSpec); + return this; + } + + @Override + public Scan withPartitionFilter(List partitions) { + mainScan.withPartitionFilter(partitions); + fallbackScan.withPartitionFilter(partitions); + return this; + } + + @Override + public Scan withBucketFilter(Filter bucketFilter) { + mainScan.withBucketFilter(bucketFilter); + fallbackScan.withBucketFilter(bucketFilter); + return this; + } + + @Override + public Scan withLevelFilter(Filter levelFilter) { + mainScan.withLevelFilter(levelFilter); + fallbackScan.withLevelFilter(levelFilter); + return this; + } + + @Override + public Scan withMetricsRegistry(MetricRegistry metricRegistry) { + mainScan.withMetricsRegistry(metricRegistry); + fallbackScan.withMetricsRegistry(metricRegistry); + return this; + } + + @Override + public TableScan.Plan plan() { + List splits = new ArrayList<>(); + Set completePartitions = new HashSet<>(); + for (Split split : mainScan.plan().splits()) { + DataSplit dataSplit = (DataSplit) split; + splits.add(dataSplit); + completePartitions.add(dataSplit.partition()); + } + + List remainingPartitions = + fallbackScan.listPartitions().stream() + .filter(p -> !completePartitions.contains(p)) + .collect(Collectors.toList()); + if (!remainingPartitions.isEmpty()) { + fallbackScan.withPartitionFilter(remainingPartitions); + for (Split split : fallbackScan.plan().splits()) { + splits.add((DataSplit) split); + } + } + return new DataFilePlan(splits); + } + + @Override + public List listPartitions() { + Set partitions = new LinkedHashSet<>(mainScan.listPartitions()); + partitions.addAll(fallbackScan.listPartitions()); + return new ArrayList<>(partitions); + } + } + + @Override + public InnerTableRead newRead() { + return new Read(); + } + + private class Read implements InnerTableRead { + + private final InnerTableRead mainRead; + private final InnerTableRead fallbackRead; + + private Read() { + this.mainRead = wrapped.newRead(); + this.fallbackRead = fallback.newRead(); + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + mainRead.withFilter(predicate); + fallbackRead.withFilter(predicate); + return this; + } + + @Override + public InnerTableRead withProjection(int[][] projection) { + mainRead.withProjection(projection); + fallbackRead.withProjection(projection); + return this; + } + + @Override + public InnerTableRead forceKeepDelete() { + mainRead.forceKeepDelete(); + fallbackRead.forceKeepDelete(); + return this; + } + + @Override + public TableRead executeFilter() { + mainRead.executeFilter(); + fallbackRead.executeFilter(); + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + mainRead.withIOManager(ioManager); + fallbackRead.withIOManager(ioManager); + return this; + } + + @Override + public RecordReader createReader(Split split) throws IOException { + DataSplit dataSplit = (DataSplit) split; + if (!dataSplit.dataFiles().isEmpty() + && dataSplit.dataFiles().get(0).minKey().getFieldCount() > 0) { + return fallbackRead.createReader(split); + } else { + return mainRead.createReader(split); + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index e124fbb27cdf..74cc9fd35d78 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -25,6 +25,8 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; import java.io.IOException; import java.io.UncheckedIOException; @@ -89,6 +91,27 @@ public static FileStoreTable create( fileIO, tablePath, tableSchema, catalogEnvironment) : new PrimaryKeyFileStoreTable( fileIO, tablePath, tableSchema, catalogEnvironment); - return table.copy(dynamicOptions.toMap()); + table = table.copy(dynamicOptions.toMap()); + + Options options = new Options(table.options()); + String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH); + if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) { + Options branchOptions = new Options(); + branchOptions.set(CoreOptions.BRANCH, fallbackBranch); + branchOptions.set(CoreOptions.SCAN_FALLBACK_BRANCH, ""); + FileStoreTable fallbackTable = + FileStoreTableFactory.create( + fileIO, + tablePath, + new SchemaManager(fileIO, tablePath, fallbackBranch).latest().get(), + branchOptions, + catalogEnvironment); + + Preconditions.checkArgument(!(table instanceof FallbackReadFileStoreTable)); + Preconditions.checkArgument(!(fallbackTable instanceof FallbackReadFileStoreTable)); + table = new FallbackReadFileStoreTable(table, fallbackTable); + } + + return table; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index b9eeba398400..d6eabd3e7113 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -176,6 +176,15 @@ default void createBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void createBranch( + String branchName, List primaryKeys, int bucket, boolean copyOptions) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support create empty branch.", + this.getClass().getSimpleName())); + } + @Override default void createBranch(String branchName, long snapshotId) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 708e25c1e621..a5284df99beb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -112,10 +112,17 @@ default void deleteTags(String tagNames) { @Experimental void rollbackTo(String tagName); - /** Create a empty branch. */ + /** Create an empty branch. */ @Experimental void createBranch(String branchName); + /** + * Create an empty branch. Primary keys and bucket number can be different from the original + * branch. + */ + @Experimental + void createBranch(String branchName, List primaryKeys, int bucket, boolean copyOptions); + /** Create a branch from given snapshot. */ @Experimental void createBranch(String branchName, long snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 426620b5189b..531c4945be22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -85,6 +85,12 @@ public AbstractDataTableScan withPartitionFilter(Map partitionSp return this; } + @Override + public AbstractDataTableScan withPartitionFilter(List partitions) { + snapshotReader.withPartitionFilter(partitions); + return this; + } + @Override public AbstractDataTableScan withLevelFilter(Filter levelFilter) { snapshotReader.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 613b2efc26ef..00a4fc0cde18 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -18,10 +18,12 @@ package org.apache.paimon.table.source; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.List; import java.util.Map; /** Inner {@link TableScan} contains filter push down. */ @@ -37,6 +39,10 @@ default InnerTableScan withPartitionFilter(Map partitionSpec) { return this; } + default InnerTableScan withPartitionFilter(List partitions) { + return this; + } + default InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 8db3effd1a89..c2439de55035 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -56,6 +56,8 @@ public interface SnapshotReader { SnapshotReader withPartitionFilter(Predicate predicate); + SnapshotReader withPartitionFilter(List partitions); + SnapshotReader withMode(ScanMode scanMode); SnapshotReader withLevelFilter(Filter levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 1b58fea9167c..25c3ffa96dfc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -157,6 +157,12 @@ public SnapshotReader withPartitionFilter(Predicate predicate) { return this; } + @Override + public SnapshotReader withPartitionFilter(List partitions) { + scan.withPartitionFilter(partitions); + return this; + } + @Override public SnapshotReader withFilter(Predicate predicate) { List partitionKeys = tableSchema.partitionKeys(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 0b922d77b831..7192c36303db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -253,6 +253,12 @@ public SnapshotReader withPartitionFilter(Predicate predicate) { return this; } + @Override + public SnapshotReader withPartitionFilter(List partitions) { + snapshotReader.withPartitionFilter(partitions); + return this; + } + @Override public SnapshotReader withMode(ScanMode scanMode) { snapshotReader.withMode(scanMode); @@ -358,6 +364,12 @@ public InnerTableScan withPartitionFilter(Map partitionSpec) { return this; } + @Override + public InnerTableScan withPartitionFilter(List partitions) { + batchScan.withPartitionFilter(partitions); + return this; + } + @Override public InnerTableScan withBucketFilter(Filter bucketFilter) { batchScan.withBucketFilter(bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 6ff8d4c2a2e0..9704d5d1eca3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -18,11 +18,13 @@ package org.apache.paimon.utils; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.branch.TableBranch; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; @@ -32,11 +34,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -133,6 +139,42 @@ branchName, branchPath(tablePath, branchName)), } } + public void createBranch( + String branchName, List primaryKeys, int bucket, boolean copyOptions) { + Path path = schemaManager.copyWithBranch(branchName).toSchemaPath(0); + TableSchema oldSchema = schemaManager.latest().get(); + + Set fieldNames = new HashSet<>(oldSchema.fieldNames()); + for (String primaryKey : primaryKeys) { + Preconditions.checkArgument( + fieldNames.contains(primaryKey), + "Field " + primaryKey + " does not exist in the table"); + } + + Options newOptions = new Options(); + newOptions.set(CoreOptions.BUCKET, bucket); + if (copyOptions) { + for (Map.Entry entry : oldSchema.options().entrySet()) { + newOptions.set(entry.getKey(), entry.getValue()); + } + } + + TableSchema schema = + new TableSchema( + 0, + oldSchema.fields(), + oldSchema.highestFieldId(), + oldSchema.partitionKeys(), + primaryKeys, + newOptions.toMap(), + oldSchema.comment()); + try { + fileIO.overwriteFileUtf8(path, schema.toString()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public void createBranch(String branchName, long snapshotId) { checkArgument( !isMainBranch(branchName), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java index 3d8ae49cce03..259f6338da04 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java @@ -25,6 +25,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.Arrays; +import java.util.stream.Collectors; + /** * Create branch procedure for given tag. Usage: * @@ -70,4 +73,24 @@ private String[] innerCall(String tableId, String branchName, String tagName, lo } return new String[] {"Success"}; } + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String branchName, + String primaryKeys, + int bucket, + boolean copyOptions) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.createBranch( + branchName, + Arrays.stream(primaryKeys.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()), + bucket, + copyOptions); + return new String[] {"Success"}; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index cddfaf936620..8fd76616cbe6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -32,13 +32,13 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** IT cases for table with branches using SQL. */ public class BranchSqlITCase extends CatalogITCaseBase { @Test public void testAlterBranchTable() throws Exception { - sql( "CREATE TABLE T (" + " pt INT" @@ -313,6 +313,63 @@ public void testBranchFastForward() throws Exception { checkSnapshots(snapshotManager, 1, 2); } + @Test + public void testFallbackBranchBatchRead() throws Exception { + sql( + "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); + sql("CALL sys.create_branch('default.t', 'pk', 'pt, k', 2, true)"); + sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); + + sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')"); + sql("INSERT INTO `t$branch_pk` VALUES (1, 10, 'cat'), (1, 20, 'dog')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); + + sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[apple, 10]", "+I[banana, 20]", "+I[tiger, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM t WHERE pt = 1")) + .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); + assertThat(collectResult("SELECT v, k FROM t WHERE pt = 2")) + .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]"); + + sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'lion')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]", "+I[wolf, 20]"); + } + + @Test + public void testInvalidPrimaryKeys() { + sql( + "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); + + try { + sql("CALL sys.create_branch('default.t', 'pk', 'pt, invalid', 2, true)"); + fail("Expecting exceptions"); + } catch (Exception e) { + assertThat(e).hasRootCauseMessage("Field invalid does not exist in the table"); + } + } + + @Test + public void testDifferentRowTypes() throws Exception { + sql( + "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); + sql("CALL sys.create_branch('default.t', 'pk', 'pt, k', 2, true)"); + sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); + sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)"); + + try { + sql("INSERT INTO t VALUES (1, 10, 'apple')"); + fail("Expecting exceptions"); + } catch (Exception e) { + assertThat(e) + .hasMessageContaining("Branch main and pk does not have the same row type"); + } + } + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) { From 5844a365cc33abb5fd1de9d2a550116d558cfce4 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 25 Jul 2024 17:29:44 +0800 Subject: [PATCH 2/6] [fix] Update example --- docs/content/maintenance/manage-branches.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index c54121f73e38..d1a45e962262 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -207,7 +207,7 @@ ALTER TABLE `T$branch_test` SET ( ); -- write records into the streaming branch -INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240726', 'cherry', 3), ('20240726', 'pear', 6); +INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6); -- write records into the default branch INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7); From 6fb332bafca2720133a730e90dc6bf88c71e3469 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 29 Jul 2024 13:55:52 +0800 Subject: [PATCH 3/6] [fix] Fix comments --- docs/content/maintenance/manage-branches.md | 14 ++++++++++++++ .../paimon/table/FallbackReadFileStoreTable.java | 3 +++ .../apache/paimon/table/FileStoreTableFactory.java | 4 ---- .../org/apache/paimon/flink/BranchSqlITCase.java | 4 ++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index d1a45e962262..87d0279719fb 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -223,6 +223,20 @@ SELECT * FROM T; | 20240726 | pear | 6 | +------------------+------------------+--------+ */ + +-- reset fallback branch +ALTER TABLE T RESET ( 'scan.fallback-branch' ); + +-- now it only reads from default branch +SELECT * FROM T; +/* ++------------------+------------------+--------+ +| dt | name | amount | ++------------------+------------------+--------+ +| 20240725 | apple | 5 | +| 20240725 | banana | 7 | ++------------------+------------------+--------+ +*/ ``` {{< /tab >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index d254c6880613..d26ce955a597 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -60,6 +60,9 @@ public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback) super(main); this.fallback = fallback; + Preconditions.checkArgument(!(main instanceof FallbackReadFileStoreTable)); + Preconditions.checkArgument(!(fallback instanceof FallbackReadFileStoreTable)); + String mainBranch = main.coreOptions().branch(); String fallbackBranch = fallback.coreOptions().branch(); RowType mainRowType = main.schema().logicalRowType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 74cc9fd35d78..a8764a751cc6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -25,7 +25,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; import java.io.IOException; @@ -106,9 +105,6 @@ public static FileStoreTable create( new SchemaManager(fileIO, tablePath, fallbackBranch).latest().get(), branchOptions, catalogEnvironment); - - Preconditions.checkArgument(!(table instanceof FallbackReadFileStoreTable)); - Preconditions.checkArgument(!(fallbackTable instanceof FallbackReadFileStoreTable)); table = new FallbackReadFileStoreTable(table, fallbackTable); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 8fd76616cbe6..6ba0e4ebb93e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -338,6 +338,10 @@ public void testFallbackBranchBatchRead() throws Exception { assertThat(collectResult("SELECT v, k FROM t")) .containsExactlyInAnyOrder( "+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]", "+I[wolf, 20]"); + + sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); } @Test From 9ef845b33b6d06794024a0a2319a068480ee3ee0 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Wed, 31 Jul 2024 10:38:04 +0800 Subject: [PATCH 4/6] [fix] Remove new create branch interface because we can change immutable options of empty branches --- docs/content/maintenance/manage-branches.md | 10 +++-- .../paimon/table/AbstractFileStoreTable.java | 6 --- .../paimon/table/DelegatedFileStoreTable.java | 7 ---- .../apache/paimon/table/ReadonlyTable.java | 9 ---- .../java/org/apache/paimon/table/Table.java | 7 ---- .../apache/paimon/utils/BranchManager.java | 42 ------------------- .../procedure/CreateBranchProcedure.java | 23 ---------- .../apache/paimon/flink/BranchSqlITCase.java | 33 +++++++++++++-- 8 files changed, 36 insertions(+), 101 deletions(-) diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index 87d0279719fb..3610de8f6acb 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -192,9 +192,13 @@ CREATE TABLE T ( ) PARTITIONED BY (dt); -- create a branch for streaming job --- you can even specify primary keys and bucket number, even if the original branch has no primary key --- in this example, we create a new branch `test`, uses `dt` and `name` as primary keys, has a bucket number of 2, and will copy the table options from the original branch -CALL sys.create_branch('default.T', 'test', 'dt, name', 2, true); +CALL sys.create_branch('default.T', 'test'); + +-- set primary key and bucket number for the branch +ALTER TABLE `T$branch_test` SET ( + 'primary-key' = 'dt,name', + 'bucket' = '2' +); -- set fallback branch ALTER TABLE T SET ( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 2aac9c4ceb94..d30fd73081a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -562,12 +562,6 @@ public void createBranch(String branchName) { branchManager().createBranch(branchName); } - @Override - public void createBranch( - String branchName, List primaryKeys, int bucket, boolean copyOptions) { - branchManager().createBranch(branchName, primaryKeys, bucket, copyOptions); - } - @Override public void createBranch(String branchName, long snapshotId) { branchManager().createBranch(branchName, snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 22b314f2aa2a..6a8e942401ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -39,7 +39,6 @@ import org.apache.paimon.utils.TagManager; import java.time.Duration; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -168,12 +167,6 @@ public void createBranch(String branchName) { wrapped.createBranch(branchName); } - @Override - public void createBranch( - String branchName, List primaryKeys, int bucket, boolean copyOptions) { - wrapped.createBranch(branchName, primaryKeys, bucket, copyOptions); - } - @Override public void createBranch(String branchName, long snapshotId) { wrapped.createBranch(branchName, snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index d6eabd3e7113..b9eeba398400 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -176,15 +176,6 @@ default void createBranch(String branchName) { this.getClass().getSimpleName())); } - @Override - default void createBranch( - String branchName, List primaryKeys, int bucket, boolean copyOptions) { - throw new UnsupportedOperationException( - String.format( - "Readonly Table %s does not support create empty branch.", - this.getClass().getSimpleName())); - } - @Override default void createBranch(String branchName, long snapshotId) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index a5284df99beb..62207f882a77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -116,13 +116,6 @@ default void deleteTags(String tagNames) { @Experimental void createBranch(String branchName); - /** - * Create an empty branch. Primary keys and bucket number can be different from the original - * branch. - */ - @Experimental - void createBranch(String branchName, List primaryKeys, int bucket, boolean copyOptions); - /** Create a branch from given snapshot. */ @Experimental void createBranch(String branchName, long snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 9704d5d1eca3..6ff8d4c2a2e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -18,13 +18,11 @@ package org.apache.paimon.utils; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.branch.TableBranch; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; @@ -34,15 +32,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; -import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -139,42 +133,6 @@ branchName, branchPath(tablePath, branchName)), } } - public void createBranch( - String branchName, List primaryKeys, int bucket, boolean copyOptions) { - Path path = schemaManager.copyWithBranch(branchName).toSchemaPath(0); - TableSchema oldSchema = schemaManager.latest().get(); - - Set fieldNames = new HashSet<>(oldSchema.fieldNames()); - for (String primaryKey : primaryKeys) { - Preconditions.checkArgument( - fieldNames.contains(primaryKey), - "Field " + primaryKey + " does not exist in the table"); - } - - Options newOptions = new Options(); - newOptions.set(CoreOptions.BUCKET, bucket); - if (copyOptions) { - for (Map.Entry entry : oldSchema.options().entrySet()) { - newOptions.set(entry.getKey(), entry.getValue()); - } - } - - TableSchema schema = - new TableSchema( - 0, - oldSchema.fields(), - oldSchema.highestFieldId(), - oldSchema.partitionKeys(), - primaryKeys, - newOptions.toMap(), - oldSchema.comment()); - try { - fileIO.overwriteFileUtf8(path, schema.toString()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - public void createBranch(String branchName, long snapshotId) { checkArgument( !isMainBranch(branchName), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java index 259f6338da04..3d8ae49cce03 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java @@ -25,9 +25,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.procedure.ProcedureContext; -import java.util.Arrays; -import java.util.stream.Collectors; - /** * Create branch procedure for given tag. Usage: * @@ -73,24 +70,4 @@ private String[] innerCall(String tableId, String branchName, String tagName, lo } return new String[] {"Success"}; } - - public String[] call( - ProcedureContext procedureContext, - String tableId, - String branchName, - String primaryKeys, - int bucket, - boolean copyOptions) - throws Catalog.TableNotExistException { - Table table = catalog.getTable(Identifier.fromString(tableId)); - table.createBranch( - branchName, - Arrays.stream(primaryKeys.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()), - bucket, - copyOptions); - return new String[] {"Success"}; - } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 6ba0e4ebb93e..f0ce66fbfac1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -317,31 +317,56 @@ public void testBranchFastForward() throws Exception { public void testFallbackBranchBatchRead() throws Exception { sql( "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); - sql("CALL sys.create_branch('default.t', 'pk', 'pt, k', 2, true)"); + sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')"); + + sql("CALL sys.create_branch('default.t', 'pk')"); + sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )"); sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); - sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')"); - sql("INSERT INTO `t$branch_pk` VALUES (1, 10, 'cat'), (1, 20, 'dog')"); + sql("INSERT INTO `t$branch_pk` VALUES (1, 20, 'cat'), (1, 30, 'dog')"); assertThat(collectResult("SELECT v, k FROM t")) .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')"); assertThat(collectResult("SELECT v, k FROM t")) .containsExactlyInAnyOrder( "+I[apple, 10]", "+I[banana, 20]", "+I[tiger, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[tiger, 10]", "+I[wolf, 20]"); assertThat(collectResult("SELECT v, k FROM t WHERE pt = 1")) .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 1")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); assertThat(collectResult("SELECT v, k FROM t WHERE pt = 2")) .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 2")) + .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]"); sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'lion')"); assertThat(collectResult("SELECT v, k FROM t")) .containsExactlyInAnyOrder( "+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"); + + sql("INSERT OVERWRITE t PARTITION (pt = 1) VALUES (10, 'pear'), (20, 'mango')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[pear, 10]", "+I[mango, 20]", "+I[lion, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"); sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )"); assertThat(collectResult("SELECT v, k FROM t")) - .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); + .containsExactlyInAnyOrder("+I[pear, 10]", "+I[mango, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"); } @Test From e5c7376d66125f8c13f98c56d2f81d747a0458f4 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Wed, 31 Jul 2024 10:52:21 +0800 Subject: [PATCH 5/6] [fix] Fix failed tests --- .../apache/paimon/flink/BranchSqlITCase.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index f0ce66fbfac1..80ca03d8ce95 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -369,26 +369,14 @@ public void testFallbackBranchBatchRead() throws Exception { "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"); } - @Test - public void testInvalidPrimaryKeys() { - sql( - "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); - - try { - sql("CALL sys.create_branch('default.t', 'pk', 'pt, invalid', 2, true)"); - fail("Expecting exceptions"); - } catch (Exception e) { - assertThat(e).hasRootCauseMessage("Field invalid does not exist in the table"); - } - } - @Test public void testDifferentRowTypes() throws Exception { sql( "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); - sql("CALL sys.create_branch('default.t', 'pk', 'pt, k', 2, true)"); - sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); + sql("CALL sys.create_branch('default.t', 'pk')"); + sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )"); sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)"); + sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); try { sql("INSERT INTO t VALUES (1, 10, 'apple')"); From b23f17ba326c224d7e708ff9a3f61c23789ebeb5 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Wed, 31 Jul 2024 13:45:19 +0800 Subject: [PATCH 6/6] [fix] Fix comments --- docs/content/maintenance/manage-branches.md | 8 ++---- .../paimon/table/FileStoreTableFactory.java | 26 +++++++++++++------ 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index 3610de8f6acb..22ca9b8507d0 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -197,7 +197,8 @@ CALL sys.create_branch('default.T', 'test'); -- set primary key and bucket number for the branch ALTER TABLE `T$branch_test` SET ( 'primary-key' = 'dt,name', - 'bucket' = '2' + 'bucket' = '2', + 'changelog-producer' = 'lookup' ); -- set fallback branch @@ -205,11 +206,6 @@ ALTER TABLE T SET ( 'scan.fallback-branch' = 'test' ); --- set changelog producer for the streaming branch, in case a streaming job would like to read from it in the future -ALTER TABLE `T$branch_test` SET ( - 'changelog-producer' = 'lookup' -); - -- write records into the streaming branch INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index a8764a751cc6..58449c9d7656 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -85,21 +85,16 @@ public static FileStoreTable create( Options dynamicOptions, CatalogEnvironment catalogEnvironment) { FileStoreTable table = - tableSchema.primaryKeys().isEmpty() - ? new AppendOnlyFileStoreTable( - fileIO, tablePath, tableSchema, catalogEnvironment) - : new PrimaryKeyFileStoreTable( - fileIO, tablePath, tableSchema, catalogEnvironment); - table = table.copy(dynamicOptions.toMap()); + createWithoutFallbackBranch( + fileIO, tablePath, tableSchema, dynamicOptions, catalogEnvironment); Options options = new Options(table.options()); String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH); if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) { Options branchOptions = new Options(); branchOptions.set(CoreOptions.BRANCH, fallbackBranch); - branchOptions.set(CoreOptions.SCAN_FALLBACK_BRANCH, ""); FileStoreTable fallbackTable = - FileStoreTableFactory.create( + createWithoutFallbackBranch( fileIO, tablePath, new SchemaManager(fileIO, tablePath, fallbackBranch).latest().get(), @@ -110,4 +105,19 @@ public static FileStoreTable create( return table; } + + private static FileStoreTable createWithoutFallbackBranch( + FileIO fileIO, + Path tablePath, + TableSchema tableSchema, + Options dynamicOptions, + CatalogEnvironment catalogEnvironment) { + FileStoreTable table = + tableSchema.primaryKeys().isEmpty() + ? new AppendOnlyFileStoreTable( + fileIO, tablePath, tableSchema, catalogEnvironment) + : new PrimaryKeyFileStoreTable( + fileIO, tablePath, tableSchema, catalogEnvironment); + return table.copy(dynamicOptions.toMap()); + } }