Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support reading partition from fallback branch when not found in current branch #3816

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions docs/content/maintenance/manage-branches.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,86 @@ Run the following command:
{{< /tab >}}

{{< /tabs >}}

### Batch Reading from Fallback Branch

You can set the table option `scan.fallback-branch`
so that when a batch job reads from the current branch, if a partition does not exist,
the reader will try to read this partition from the fallback branch.
For streaming read jobs, this feature is currently not supported, and will only produce results from the current branch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the expected behaviour if the feature for the streaming read had correctly implemented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't decided yet. We need to talk with the users about it.


What's the use case of this feature? Say you have created a Paimon table partitioned by date.
You have a long-running streaming job which inserts records into Paimon, so that today's data can be queried in time.
You also have a batch job which runs at every night to insert corrected records of yesterday into Paimon,
so that the preciseness of the data can be promised.

When you query from this Paimon table, you would like to first read from the results of batch job.
But if a partition (for example, today's partition) does not exist in its result,
then you would like to read from the results of streaming job.
In this case, you can create a branch for streaming job, and set `scan.fallback-branch` to this streaming branch.

Let's look at an example.

{{< tabs "read-fallback-branch" >}}

{{< tab "Flink" >}}

```sql
-- create Paimon table
CREATE TABLE T (
dt STRING NOT NULL,
name STRING NOT NULL,
amount BIGINT
) PARTITIONED BY (dt);

-- create a branch for streaming job
CALL sys.create_branch('default.T', 'test');

-- set primary key and bucket number for the branch
ALTER TABLE `T$branch_test` SET (
'primary-key' = 'dt,name',
'bucket' = '2',
'changelog-producer' = 'lookup'
);

-- set fallback branch
ALTER TABLE T SET (
'scan.fallback-branch' = 'test'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to reset it? 'scan.fallback-branch' = null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALTER TABLE T RESET ('scan.fallback-branch'), see Paimon docs.

);

-- write records into the streaming branch
INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);

-- write records into the default branch
INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);

SELECT * FROM T;
/*
+------------------+------------------+--------+
| dt | name | amount |
+------------------+------------------+--------+
| 20240725 | apple | 5 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better first run the query without setting scan.fallback-branch and return the two rows of 20240725, then set 'scan.fallback-branch' = 'test' and run the query again to return the result of 4 rows with the data from the fallback branch. WDYT?

| 20240725 | banana | 7 |
| 20240726 | cherry | 3 |
| 20240726 | pear | 6 |
+------------------+------------------+--------+
*/

-- reset fallback branch
ALTER TABLE T RESET ( 'scan.fallback-branch' );

-- now it only reads from default branch
SELECT * FROM T;
/*
+------------------+------------------+--------+
| dt | name | amount |
+------------------+------------------+--------+
| 20240725 | apple | 5 |
| 20240725 | banana | 7 |
+------------------+------------------+--------+
*/
```

{{< /tab >}}

{{< /tabs >}}
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,12 @@
<td>Long</td>
<td>End condition "watermark" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered.</td>
</tr>
<tr>
<td><h5>scan.fallback-branch</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When a batch job queries from a table, if a partition does not exist in the current branch, the reader will try to get this partition from this fallback branch.</td>
</tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,14 @@ public class CoreOptions implements Serializable {
"The maximum number of concurrent deleting files. "
+ "By default is the number of processors available to the Java virtual machine.");

public static final ConfigOption<String> SCAN_FALLBACK_BRANCH =
key("scan.fallback-branch")
.stringType()
.noDefaultValue()
.withDescription(
"When a batch job queries from a table, if a partition does not exist in the current branch, "
+ "the reader will try to get this partition from this fallback branch.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@

package org.apache.paimon.privilege;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;
Expand All @@ -40,55 +35,32 @@
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;

/** {@link FileStoreTable} with privilege checks. */
public class PrivilegedFileStoreTable implements FileStoreTable {
public class PrivilegedFileStoreTable extends DelegatedFileStoreTable {

private final FileStoreTable wrapped;
private final PrivilegeChecker privilegeChecker;
private final Identifier identifier;

public PrivilegedFileStoreTable(
FileStoreTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) {
this.wrapped = wrapped;
super(wrapped);
this.privilegeChecker = privilegeChecker;
this.identifier = identifier;
}

@Override
public String name() {
return wrapped.name();
}

@Override
public String fullName() {
return wrapped.fullName();
}

@Override
public SnapshotReader newSnapshotReader() {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newSnapshotReader();
}

@Override
public CoreOptions coreOptions() {
return wrapped.coreOptions();
}

@Override
public SnapshotManager snapshotManager() {
return wrapped.snapshotManager();
}

@Override
public TagManager tagManager() {
privilegeChecker.assertCanInsert(identifier);
Expand All @@ -102,36 +74,11 @@ public BranchManager branchManager() {
return wrapped.branchManager();
}

@Override
public Path location() {
return wrapped.location();
}

@Override
public FileIO fileIO() {
return wrapped.fileIO();
}

@Override
public TableSchema schema() {
return wrapped.schema();
}

@Override
public FileStore<?> store() {
return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker, identifier);
}

@Override
public BucketMode bucketMode() {
return wrapped.bucketMode();
}

@Override
public CatalogEnvironment catalogEnvironment() {
return wrapped.catalogEnvironment();
}

@Override
public Optional<Statistics> statistics() {
privilegeChecker.assertCanSelect(identifier);
Expand All @@ -144,11 +91,6 @@ public FileStoreTable copy(Map<String, String> dynamicOptions) {
wrapped.copy(dynamicOptions), privilegeChecker, identifier);
}

@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new PrivilegedFileStoreTable(
Expand Down Expand Up @@ -299,16 +241,6 @@ public LocalTableQuery newLocalTableQuery() {
return wrapped.newLocalTableQuery();
}

@Override
public boolean supportStreamingReadOverwrite() {
return wrapped.supportStreamingReadOverwrite();
}

@Override
public RowKeyExtractor createRowKeyExtractor() {
return wrapped.createRowKeyExtractor();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading
Loading