Skip to content

Commit

Permalink
[core] Support reading partition from fallback branch when not found …
Browse files Browse the repository at this point in the history
…in current branch (#3816)
  • Loading branch information
tsreaper authored Jul 31, 2024
1 parent 6d66fc1 commit e917af0
Show file tree
Hide file tree
Showing 14 changed files with 823 additions and 73 deletions.
83 changes: 83 additions & 0 deletions docs/content/maintenance/manage-branches.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,86 @@ Run the following command:
{{< /tab >}}
{{< /tabs >}}
### Batch Reading from Fallback Branch
You can set the table option `scan.fallback-branch`
so that when a batch job reads from the current branch, if a partition does not exist,
the reader will try to read this partition from the fallback branch.
For streaming read jobs, this feature is currently not supported, and will only produce results from the current branch.
What's the use case of this feature? Say you have created a Paimon table partitioned by date.
You have a long-running streaming job which inserts records into Paimon, so that today's data can be queried in time.
You also have a batch job which runs at every night to insert corrected records of yesterday into Paimon,
so that the preciseness of the data can be promised.
When you query from this Paimon table, you would like to first read from the results of batch job.
But if a partition (for example, today's partition) does not exist in its result,
then you would like to read from the results of streaming job.
In this case, you can create a branch for streaming job, and set `scan.fallback-branch` to this streaming branch.
Let's look at an example.
{{< tabs "read-fallback-branch" >}}
{{< tab "Flink" >}}
```sql
-- create Paimon table
CREATE TABLE T (
dt STRING NOT NULL,
name STRING NOT NULL,
amount BIGINT
) PARTITIONED BY (dt);
-- create a branch for streaming job
CALL sys.create_branch('default.T', 'test');
-- set primary key and bucket number for the branch
ALTER TABLE `T$branch_test` SET (
'primary-key' = 'dt,name',
'bucket' = '2',
'changelog-producer' = 'lookup'
);
-- set fallback branch
ALTER TABLE T SET (
'scan.fallback-branch' = 'test'
);
-- write records into the streaming branch
INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);
-- write records into the default branch
INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);
SELECT * FROM T;
/*
+------------------+------------------+--------+
| dt | name | amount |
+------------------+------------------+--------+
| 20240725 | apple | 5 |
| 20240725 | banana | 7 |
| 20240726 | cherry | 3 |
| 20240726 | pear | 6 |
+------------------+------------------+--------+
*/
-- reset fallback branch
ALTER TABLE T RESET ( 'scan.fallback-branch' );
-- now it only reads from default branch
SELECT * FROM T;
/*
+------------------+------------------+--------+
| dt | name | amount |
+------------------+------------------+--------+
| 20240725 | apple | 5 |
| 20240725 | banana | 7 |
+------------------+------------------+--------+
*/
```
{{< /tab >}}
{{< /tabs >}}
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,12 @@
<td>Long</td>
<td>End condition "watermark" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered.</td>
</tr>
<tr>
<td><h5>scan.fallback-branch</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.</td>
</tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,14 @@ public class CoreOptions implements Serializable {
"The maximum number of concurrent deleting files. "
+ "By default is the number of processors available to the Java virtual machine.");

public static final ConfigOption<String> SCAN_FALLBACK_BRANCH =
key("scan.fallback-branch")
.stringType()
.noDefaultValue()
.withDescription(
"When a batch job queries from a table, if a partition does not exist in the current branch, "
+ "the reader will try to get this partition from this fallback branch.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@

package org.apache.paimon.privilege;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;
Expand All @@ -40,55 +35,32 @@
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;

/** {@link FileStoreTable} with privilege checks. */
public class PrivilegedFileStoreTable implements FileStoreTable {
public class PrivilegedFileStoreTable extends DelegatedFileStoreTable {

private final FileStoreTable wrapped;
private final PrivilegeChecker privilegeChecker;
private final Identifier identifier;

public PrivilegedFileStoreTable(
FileStoreTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) {
this.wrapped = wrapped;
super(wrapped);
this.privilegeChecker = privilegeChecker;
this.identifier = identifier;
}

@Override
public String name() {
return wrapped.name();
}

@Override
public String fullName() {
return wrapped.fullName();
}

@Override
public SnapshotReader newSnapshotReader() {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newSnapshotReader();
}

@Override
public CoreOptions coreOptions() {
return wrapped.coreOptions();
}

@Override
public SnapshotManager snapshotManager() {
return wrapped.snapshotManager();
}

@Override
public TagManager tagManager() {
privilegeChecker.assertCanInsert(identifier);
Expand All @@ -102,36 +74,11 @@ public BranchManager branchManager() {
return wrapped.branchManager();
}

@Override
public Path location() {
return wrapped.location();
}

@Override
public FileIO fileIO() {
return wrapped.fileIO();
}

@Override
public TableSchema schema() {
return wrapped.schema();
}

@Override
public FileStore<?> store() {
return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker, identifier);
}

@Override
public BucketMode bucketMode() {
return wrapped.bucketMode();
}

@Override
public CatalogEnvironment catalogEnvironment() {
return wrapped.catalogEnvironment();
}

@Override
public Optional<Statistics> statistics() {
privilegeChecker.assertCanSelect(identifier);
Expand All @@ -144,11 +91,6 @@ public FileStoreTable copy(Map<String, String> dynamicOptions) {
wrapped.copy(dynamicOptions), privilegeChecker, identifier);
}

@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new PrivilegedFileStoreTable(
Expand Down Expand Up @@ -299,16 +241,6 @@ public LocalTableQuery newLocalTableQuery() {
return wrapped.newLocalTableQuery();
}

@Override
public boolean supportStreamingReadOverwrite() {
return wrapped.supportStreamingReadOverwrite();
}

@Override
public RowKeyExtractor createRowKeyExtractor() {
return wrapped.createRowKeyExtractor();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit e917af0

Please sign in to comment.