From e917af004d76366080e3ca014d49f683386aad98 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Wed, 31 Jul 2024 14:36:37 +0800 Subject: [PATCH] [core] Support reading partition from fallback branch when not found in current branch (#3816) --- docs/content/maintenance/manage-branches.md | 83 +++++ .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 8 + .../privilege/PrivilegedFileStoreTable.java | 74 +--- .../paimon/table/DelegatedFileStoreTable.java | 261 ++++++++++++++ .../table/FallbackReadFileStoreTable.java | 325 ++++++++++++++++++ .../paimon/table/FileStoreTableFactory.java | 29 ++ .../java/org/apache/paimon/table/Table.java | 2 +- .../table/source/AbstractDataTableScan.java | 6 + .../paimon/table/source/InnerTableScan.java | 6 + .../table/source/snapshot/SnapshotReader.java | 2 + .../source/snapshot/SnapshotReaderImpl.java | 6 + .../paimon/table/system/AuditLogTable.java | 12 + .../apache/paimon/flink/BranchSqlITCase.java | 76 +++- 14 files changed, 823 insertions(+), 73 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index 4343214f2c28..22ca9b8507d0 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -159,3 +159,86 @@ Run the following command: {{< /tab >}} {{< /tabs >}} + +### Batch Reading from Fallback Branch + +You can set the table option `scan.fallback-branch` +so that when a batch job reads from the current branch, if a partition does not exist, +the reader will try to read this partition from the fallback branch. +For streaming read jobs, this feature is currently not supported, and will only produce results from the current branch. + +What's the use case of this feature? Say you have created a Paimon table partitioned by date. +You have a long-running streaming job which inserts records into Paimon, so that today's data can be queried in time. +You also have a batch job which runs at every night to insert corrected records of yesterday into Paimon, +so that the preciseness of the data can be promised. + +When you query from this Paimon table, you would like to first read from the results of batch job. +But if a partition (for example, today's partition) does not exist in its result, +then you would like to read from the results of streaming job. +In this case, you can create a branch for streaming job, and set `scan.fallback-branch` to this streaming branch. + +Let's look at an example. + +{{< tabs "read-fallback-branch" >}} + +{{< tab "Flink" >}} + +```sql +-- create Paimon table +CREATE TABLE T ( + dt STRING NOT NULL, + name STRING NOT NULL, + amount BIGINT +) PARTITIONED BY (dt); + +-- create a branch for streaming job +CALL sys.create_branch('default.T', 'test'); + +-- set primary key and bucket number for the branch +ALTER TABLE `T$branch_test` SET ( + 'primary-key' = 'dt,name', + 'bucket' = '2', + 'changelog-producer' = 'lookup' +); + +-- set fallback branch +ALTER TABLE T SET ( + 'scan.fallback-branch' = 'test' +); + +-- write records into the streaming branch +INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6); + +-- write records into the default branch +INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7); + +SELECT * FROM T; +/* ++------------------+------------------+--------+ +| dt | name | amount | ++------------------+------------------+--------+ +| 20240725 | apple | 5 | +| 20240725 | banana | 7 | +| 20240726 | cherry | 3 | +| 20240726 | pear | 6 | ++------------------+------------------+--------+ +*/ + +-- reset fallback branch +ALTER TABLE T RESET ( 'scan.fallback-branch' ); + +-- now it only reads from default branch +SELECT * FROM T; +/* ++------------------+------------------+--------+ +| dt | name | amount | ++------------------+------------------+--------+ +| 20240725 | apple | 5 | +| 20240725 | banana | 7 | ++------------------+------------------+--------+ +*/ +``` + +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index d6eb502353f5..446931acacda 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -581,6 +581,12 @@ Long End condition "watermark" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered. + +
scan.fallback-branch
+ (none) + String + When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch. +
scan.file-creation-time-millis
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 685634e2650a..17bff3653229 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1275,6 +1275,14 @@ public class CoreOptions implements Serializable { "The maximum number of concurrent deleting files. " + "By default is the number of processors available to the Java virtual machine."); + public static final ConfigOption SCAN_FALLBACK_BRANCH = + key("scan.fallback-branch") + .stringType() + .noDefaultValue() + .withDescription( + "When a batch job queries from a table, if a partition does not exist in the current branch, " + + "the reader will try to get this partition from this fallback branch."); + private final Options options; public CoreOptions(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 45ce6f0bf540..d590eb3708df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -18,20 +18,15 @@ package org.apache.paimon.privilege; -import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.Statistics; -import org.apache.paimon.table.BucketMode; -import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.DelegatedFileStoreTable; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.query.LocalTableQuery; -import org.apache.paimon.table.sink.RowKeyExtractor; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.sink.WriteSelector; @@ -40,55 +35,32 @@ import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; -import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; /** {@link FileStoreTable} with privilege checks. */ -public class PrivilegedFileStoreTable implements FileStoreTable { +public class PrivilegedFileStoreTable extends DelegatedFileStoreTable { - private final FileStoreTable wrapped; private final PrivilegeChecker privilegeChecker; private final Identifier identifier; public PrivilegedFileStoreTable( FileStoreTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) { - this.wrapped = wrapped; + super(wrapped); this.privilegeChecker = privilegeChecker; this.identifier = identifier; } - @Override - public String name() { - return wrapped.name(); - } - - @Override - public String fullName() { - return wrapped.fullName(); - } - @Override public SnapshotReader newSnapshotReader() { privilegeChecker.assertCanSelect(identifier); return wrapped.newSnapshotReader(); } - @Override - public CoreOptions coreOptions() { - return wrapped.coreOptions(); - } - - @Override - public SnapshotManager snapshotManager() { - return wrapped.snapshotManager(); - } - @Override public TagManager tagManager() { privilegeChecker.assertCanInsert(identifier); @@ -102,36 +74,11 @@ public BranchManager branchManager() { return wrapped.branchManager(); } - @Override - public Path location() { - return wrapped.location(); - } - - @Override - public FileIO fileIO() { - return wrapped.fileIO(); - } - - @Override - public TableSchema schema() { - return wrapped.schema(); - } - @Override public FileStore store() { return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker, identifier); } - @Override - public BucketMode bucketMode() { - return wrapped.bucketMode(); - } - - @Override - public CatalogEnvironment catalogEnvironment() { - return wrapped.catalogEnvironment(); - } - @Override public Optional statistics() { privilegeChecker.assertCanSelect(identifier); @@ -144,11 +91,6 @@ public FileStoreTable copy(Map dynamicOptions) { wrapped.copy(dynamicOptions), privilegeChecker, identifier); } - @Override - public OptionalLong latestSnapshotId() { - return wrapped.latestSnapshotId(); - } - @Override public FileStoreTable copy(TableSchema newTableSchema) { return new PrivilegedFileStoreTable( @@ -299,16 +241,6 @@ public LocalTableQuery newLocalTableQuery() { return wrapped.newLocalTableQuery(); } - @Override - public boolean supportStreamingReadOverwrite() { - return wrapped.supportStreamingReadOverwrite(); - } - - @Override - public RowKeyExtractor createRowKeyExtractor() { - return wrapped.createRowKeyExtractor(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java new file mode 100644 index 000000000000..6a8e942401ba --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.FileStore; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestCacheFilter; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.stats.Statistics; +import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.RowKeyExtractor; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.table.sink.WriteSelector; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.StreamDataTableScan; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; + +/** Delegated {@link FileStoreTable}. */ +public abstract class DelegatedFileStoreTable implements FileStoreTable { + + protected final FileStoreTable wrapped; + + public DelegatedFileStoreTable(FileStoreTable wrapped) { + this.wrapped = wrapped; + } + + @Override + public String name() { + return wrapped.name(); + } + + @Override + public String fullName() { + return wrapped.fullName(); + } + + @Override + public SnapshotReader newSnapshotReader() { + return wrapped.newSnapshotReader(); + } + + @Override + public CoreOptions coreOptions() { + return wrapped.coreOptions(); + } + + @Override + public SnapshotManager snapshotManager() { + return wrapped.snapshotManager(); + } + + @Override + public TagManager tagManager() { + return wrapped.tagManager(); + } + + @Override + public BranchManager branchManager() { + return wrapped.branchManager(); + } + + @Override + public Path location() { + return wrapped.location(); + } + + @Override + public FileIO fileIO() { + return wrapped.fileIO(); + } + + @Override + public TableSchema schema() { + return wrapped.schema(); + } + + @Override + public FileStore store() { + return wrapped.store(); + } + + @Override + public BucketMode bucketMode() { + return wrapped.bucketMode(); + } + + @Override + public CatalogEnvironment catalogEnvironment() { + return wrapped.catalogEnvironment(); + } + + @Override + public Optional statistics() { + return wrapped.statistics(); + } + + @Override + public OptionalLong latestSnapshotId() { + return wrapped.latestSnapshotId(); + } + + @Override + public void rollbackTo(long snapshotId) { + wrapped.rollbackTo(snapshotId); + } + + @Override + public void createTag(String tagName) { + wrapped.createTag(tagName); + } + + @Override + public void createTag(String tagName, long fromSnapshotId) { + wrapped.createTag(tagName, fromSnapshotId); + } + + @Override + public void createTag(String tagName, Duration timeRetained) { + wrapped.createTag(tagName, timeRetained); + } + + @Override + public void createTag(String tagName, long fromSnapshotId, Duration timeRetained) { + wrapped.createTag(tagName, fromSnapshotId, timeRetained); + } + + @Override + public void deleteTag(String tagName) { + wrapped.deleteTag(tagName); + } + + @Override + public void rollbackTo(String tagName) { + wrapped.rollbackTo(tagName); + } + + @Override + public void createBranch(String branchName) { + wrapped.createBranch(branchName); + } + + @Override + public void createBranch(String branchName, long snapshotId) { + wrapped.createBranch(branchName, snapshotId); + } + + @Override + public void createBranch(String branchName, String tagName) { + wrapped.createBranch(branchName, tagName); + } + + @Override + public void deleteBranch(String branchName) { + wrapped.deleteBranch(branchName); + } + + @Override + public void fastForward(String branchName) { + wrapped.fastForward(branchName); + } + + @Override + public ExpireSnapshots newExpireSnapshots() { + return wrapped.newExpireSnapshots(); + } + + @Override + public ExpireSnapshots newExpireChangelog() { + return wrapped.newExpireChangelog(); + } + + @Override + public DataTableScan newScan() { + return wrapped.newScan(); + } + + @Override + public StreamDataTableScan newStreamScan() { + return wrapped.newStreamScan(); + } + + @Override + public InnerTableRead newRead() { + return wrapped.newRead(); + } + + @Override + public Optional newWriteSelector() { + return wrapped.newWriteSelector(); + } + + @Override + public TableWriteImpl newWrite(String commitUser) { + return wrapped.newWrite(commitUser); + } + + @Override + public TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + return wrapped.newWrite(commitUser, manifestFilter); + } + + @Override + public TableCommitImpl newCommit(String commitUser) { + return wrapped.newCommit(commitUser); + } + + @Override + public LocalTableQuery newLocalTableQuery() { + return wrapped.newLocalTableQuery(); + } + + @Override + public boolean supportStreamingReadOverwrite() { + return wrapped.supportStreamingReadOverwrite(); + } + + @Override + public RowKeyExtractor createRowKeyExtractor() { + return wrapped.createRowKeyExtractor(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DelegatedFileStoreTable that = (DelegatedFileStoreTable) o; + return Objects.equals(wrapped, that.wrapped); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java new file mode 100644 index 000000000000..d26ce955a597 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.DataFilePlan; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A {@link FileStoreTable} which mainly read from the current branch. However, if the current + * branch does not have a partition, it will read that partition from the fallback branch. + */ +public class FallbackReadFileStoreTable extends DelegatedFileStoreTable { + + private final FileStoreTable fallback; + + public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable fallback) { + super(main); + this.fallback = fallback; + + Preconditions.checkArgument(!(main instanceof FallbackReadFileStoreTable)); + Preconditions.checkArgument(!(fallback instanceof FallbackReadFileStoreTable)); + + String mainBranch = main.coreOptions().branch(); + String fallbackBranch = fallback.coreOptions().branch(); + RowType mainRowType = main.schema().logicalRowType(); + RowType fallbackRowType = fallback.schema().logicalRowType(); + Preconditions.checkArgument( + mainRowType.equals(fallbackRowType), + "Branch %s and %s does not have the same row type.\n" + + "Row type of branch %s is %s.\n" + + "Row type of branch %s is %s.", + mainBranch, + fallbackBranch, + mainBranch, + mainRowType, + fallbackBranch, + fallbackRowType); + + List mainPrimaryKeys = main.schema().primaryKeys(); + List fallbackPrimaryKeys = fallback.schema().primaryKeys(); + if (!mainPrimaryKeys.isEmpty()) { + if (fallbackPrimaryKeys.isEmpty()) { + throw new IllegalArgumentException( + "Branch " + + mainBranch + + " has primary keys while fallback branch " + + fallbackBranch + + " does not. This is not allowed."); + } + Preconditions.checkArgument( + mainPrimaryKeys.equals(fallbackPrimaryKeys), + "Branch %s and %s both have primary keys but are not the same.\n" + + "Primary keys of %s are %s.\n" + + "Primary keys of %s are %s.", + mainBranch, + fallbackBranch, + mainBranch, + mainPrimaryKeys, + fallbackBranch, + fallbackPrimaryKeys); + } + } + + @Override + public FileStoreTable copy(Map dynamicOptions) { + return new FallbackReadFileStoreTable( + wrapped.copy(dynamicOptions), + fallback.copy(rewriteFallbackOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copy(TableSchema newTableSchema) { + return new FallbackReadFileStoreTable( + wrapped.copy(newTableSchema), + fallback.copy( + newTableSchema.copy(rewriteFallbackOptions(newTableSchema.options())))); + } + + @Override + public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + return new FallbackReadFileStoreTable( + wrapped.copyWithoutTimeTravel(dynamicOptions), + fallback.copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copyWithLatestSchema() { + return new FallbackReadFileStoreTable( + wrapped.copyWithLatestSchema(), fallback.copyWithLatestSchema()); + } + + private Map rewriteFallbackOptions(Map options) { + Map result = new HashMap<>(options); + + // snapshot ids may be different between the main branch and the fallback branch, + // so we need to convert main branch snapshot id to millisecond, + // then convert millisecond to fallback branch snapshot id + String scanSnapshotIdOptionKey = CoreOptions.SCAN_SNAPSHOT_ID.key(); + if (options.containsKey(scanSnapshotIdOptionKey)) { + long id = Long.parseLong(options.get(scanSnapshotIdOptionKey)); + long millis = wrapped.snapshotManager().snapshot(id).timeMillis(); + Snapshot fallbackSnapshot = fallback.snapshotManager().earlierOrEqualTimeMills(millis); + long fallbackId; + if (fallbackSnapshot == null) { + fallbackId = Snapshot.FIRST_SNAPSHOT_ID; + } else { + fallbackId = fallbackSnapshot.id(); + } + result.put(scanSnapshotIdOptionKey, String.valueOf(fallbackId)); + } + + // bucket number of main branch and fallback branch are very likely different, + // so we remove bucket in options to use fallback branch's bucket number + result.remove(CoreOptions.BUCKET.key()); + + return result; + } + + @Override + public DataTableScan newScan() { + return new Scan(); + } + + private class Scan implements DataTableScan { + + private final DataTableScan mainScan; + private final DataTableScan fallbackScan; + + private Scan() { + this.mainScan = wrapped.newScan(); + this.fallbackScan = fallback.newScan(); + } + + @Override + public Scan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { + mainScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + fallbackScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + return this; + } + + @Override + public Scan withFilter(Predicate predicate) { + mainScan.withFilter(predicate); + fallbackScan.withFilter(predicate); + return this; + } + + @Override + public Scan withLimit(int limit) { + mainScan.withLimit(limit); + fallbackScan.withLimit(limit); + return this; + } + + @Override + public Scan withPartitionFilter(Map partitionSpec) { + mainScan.withPartitionFilter(partitionSpec); + fallbackScan.withPartitionFilter(partitionSpec); + return this; + } + + @Override + public Scan withPartitionFilter(List partitions) { + mainScan.withPartitionFilter(partitions); + fallbackScan.withPartitionFilter(partitions); + return this; + } + + @Override + public Scan withBucketFilter(Filter bucketFilter) { + mainScan.withBucketFilter(bucketFilter); + fallbackScan.withBucketFilter(bucketFilter); + return this; + } + + @Override + public Scan withLevelFilter(Filter levelFilter) { + mainScan.withLevelFilter(levelFilter); + fallbackScan.withLevelFilter(levelFilter); + return this; + } + + @Override + public Scan withMetricsRegistry(MetricRegistry metricRegistry) { + mainScan.withMetricsRegistry(metricRegistry); + fallbackScan.withMetricsRegistry(metricRegistry); + return this; + } + + @Override + public TableScan.Plan plan() { + List splits = new ArrayList<>(); + Set completePartitions = new HashSet<>(); + for (Split split : mainScan.plan().splits()) { + DataSplit dataSplit = (DataSplit) split; + splits.add(dataSplit); + completePartitions.add(dataSplit.partition()); + } + + List remainingPartitions = + fallbackScan.listPartitions().stream() + .filter(p -> !completePartitions.contains(p)) + .collect(Collectors.toList()); + if (!remainingPartitions.isEmpty()) { + fallbackScan.withPartitionFilter(remainingPartitions); + for (Split split : fallbackScan.plan().splits()) { + splits.add((DataSplit) split); + } + } + return new DataFilePlan(splits); + } + + @Override + public List listPartitions() { + Set partitions = new LinkedHashSet<>(mainScan.listPartitions()); + partitions.addAll(fallbackScan.listPartitions()); + return new ArrayList<>(partitions); + } + } + + @Override + public InnerTableRead newRead() { + return new Read(); + } + + private class Read implements InnerTableRead { + + private final InnerTableRead mainRead; + private final InnerTableRead fallbackRead; + + private Read() { + this.mainRead = wrapped.newRead(); + this.fallbackRead = fallback.newRead(); + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + mainRead.withFilter(predicate); + fallbackRead.withFilter(predicate); + return this; + } + + @Override + public InnerTableRead withProjection(int[][] projection) { + mainRead.withProjection(projection); + fallbackRead.withProjection(projection); + return this; + } + + @Override + public InnerTableRead forceKeepDelete() { + mainRead.forceKeepDelete(); + fallbackRead.forceKeepDelete(); + return this; + } + + @Override + public TableRead executeFilter() { + mainRead.executeFilter(); + fallbackRead.executeFilter(); + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + mainRead.withIOManager(ioManager); + fallbackRead.withIOManager(ioManager); + return this; + } + + @Override + public RecordReader createReader(Split split) throws IOException { + DataSplit dataSplit = (DataSplit) split; + if (!dataSplit.dataFiles().isEmpty() + && dataSplit.dataFiles().get(0).minKey().getFieldCount() > 0) { + return fallbackRead.createReader(split); + } else { + return mainRead.createReader(split); + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index e124fbb27cdf..58449c9d7656 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -25,6 +25,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.StringUtils; import java.io.IOException; import java.io.UncheckedIOException; @@ -83,6 +84,34 @@ public static FileStoreTable create( TableSchema tableSchema, Options dynamicOptions, CatalogEnvironment catalogEnvironment) { + FileStoreTable table = + createWithoutFallbackBranch( + fileIO, tablePath, tableSchema, dynamicOptions, catalogEnvironment); + + Options options = new Options(table.options()); + String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH); + if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) { + Options branchOptions = new Options(); + branchOptions.set(CoreOptions.BRANCH, fallbackBranch); + FileStoreTable fallbackTable = + createWithoutFallbackBranch( + fileIO, + tablePath, + new SchemaManager(fileIO, tablePath, fallbackBranch).latest().get(), + branchOptions, + catalogEnvironment); + table = new FallbackReadFileStoreTable(table, fallbackTable); + } + + return table; + } + + private static FileStoreTable createWithoutFallbackBranch( + FileIO fileIO, + Path tablePath, + TableSchema tableSchema, + Options dynamicOptions, + CatalogEnvironment catalogEnvironment) { FileStoreTable table = tableSchema.primaryKeys().isEmpty() ? new AppendOnlyFileStoreTable( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 708e25c1e621..62207f882a77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -112,7 +112,7 @@ default void deleteTags(String tagNames) { @Experimental void rollbackTo(String tagName); - /** Create a empty branch. */ + /** Create an empty branch. */ @Experimental void createBranch(String branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 426620b5189b..531c4945be22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -85,6 +85,12 @@ public AbstractDataTableScan withPartitionFilter(Map partitionSp return this; } + @Override + public AbstractDataTableScan withPartitionFilter(List partitions) { + snapshotReader.withPartitionFilter(partitions); + return this; + } + @Override public AbstractDataTableScan withLevelFilter(Filter levelFilter) { snapshotReader.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 613b2efc26ef..00a4fc0cde18 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -18,10 +18,12 @@ package org.apache.paimon.table.source; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.List; import java.util.Map; /** Inner {@link TableScan} contains filter push down. */ @@ -37,6 +39,10 @@ default InnerTableScan withPartitionFilter(Map partitionSpec) { return this; } + default InnerTableScan withPartitionFilter(List partitions) { + return this; + } + default InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 8db3effd1a89..c2439de55035 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -56,6 +56,8 @@ public interface SnapshotReader { SnapshotReader withPartitionFilter(Predicate predicate); + SnapshotReader withPartitionFilter(List partitions); + SnapshotReader withMode(ScanMode scanMode); SnapshotReader withLevelFilter(Filter levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 1b58fea9167c..25c3ffa96dfc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -157,6 +157,12 @@ public SnapshotReader withPartitionFilter(Predicate predicate) { return this; } + @Override + public SnapshotReader withPartitionFilter(List partitions) { + scan.withPartitionFilter(partitions); + return this; + } + @Override public SnapshotReader withFilter(Predicate predicate) { List partitionKeys = tableSchema.partitionKeys(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 0b922d77b831..7192c36303db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -253,6 +253,12 @@ public SnapshotReader withPartitionFilter(Predicate predicate) { return this; } + @Override + public SnapshotReader withPartitionFilter(List partitions) { + snapshotReader.withPartitionFilter(partitions); + return this; + } + @Override public SnapshotReader withMode(ScanMode scanMode) { snapshotReader.withMode(scanMode); @@ -358,6 +364,12 @@ public InnerTableScan withPartitionFilter(Map partitionSpec) { return this; } + @Override + public InnerTableScan withPartitionFilter(List partitions) { + batchScan.withPartitionFilter(partitions); + return this; + } + @Override public InnerTableScan withBucketFilter(Filter bucketFilter) { batchScan.withBucketFilter(bucketFilter); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index cddfaf936620..80ca03d8ce95 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -32,13 +32,13 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** IT cases for table with branches using SQL. */ public class BranchSqlITCase extends CatalogITCaseBase { @Test public void testAlterBranchTable() throws Exception { - sql( "CREATE TABLE T (" + " pt INT" @@ -313,6 +313,80 @@ public void testBranchFastForward() throws Exception { checkSnapshots(snapshotManager, 1, 2); } + @Test + public void testFallbackBranchBatchRead() throws Exception { + sql( + "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); + sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')"); + + sql("CALL sys.create_branch('default.t', 'pk')"); + sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )"); + sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); + + sql("INSERT INTO `t$branch_pk` VALUES (1, 20, 'cat'), (1, 30, 'dog')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); + + sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[apple, 10]", "+I[banana, 20]", "+I[tiger, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[tiger, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM t WHERE pt = 1")) + .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 1")) + .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]"); + assertThat(collectResult("SELECT v, k FROM t WHERE pt = 2")) + .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 2")) + .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]"); + + sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'lion')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"); + + sql("INSERT OVERWRITE t PARTITION (pt = 1) VALUES (10, 'pear'), (20, 'mango')"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder( + "+I[pear, 10]", "+I[mango, 20]", "+I[lion, 10]", "+I[wolf, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"); + + sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )"); + assertThat(collectResult("SELECT v, k FROM t")) + .containsExactlyInAnyOrder("+I[pear, 10]", "+I[mango, 20]"); + assertThat(collectResult("SELECT v, k FROM `t$branch_pk`")) + .containsExactlyInAnyOrder( + "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"); + } + + @Test + public void testDifferentRowTypes() throws Exception { + sql( + "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )"); + sql("CALL sys.create_branch('default.t', 'pk')"); + sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )"); + sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)"); + sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); + + try { + sql("INSERT INTO t VALUES (1, 10, 'apple')"); + fail("Expecting exceptions"); + } catch (Exception e) { + assertThat(e) + .hasMessageContaining("Branch main and pk does not have the same row type"); + } + } + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) {