Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support reading partition from fallback branch when not found in current branch #3816

Merged
merged 6 commits into from
Jul 31, 2024

Conversation

tsreaper
Copy link
Contributor

@tsreaper tsreaper commented Jul 25, 2024

Purpose

In this PR we have supported reading partitions from fallback branch.

You can set the table option scan.fallback-branch so that when a batch job reads from the current branch, if a partition does not exist, the reader will try to read this partition from the fallback branch. For streaming read jobs, this feature is currently not supported, and will only produce results from the current branch.

What's the use case of this feature? Say you have created a Paimon table partitioned by date. You have a long-running streaming job which inserts records into Paimon, so that today's data can be queried in time. You also have a batch job which runs at every night to insert corrected records of yesterday into Paimon, so that the preciseness of the data can be promised.

When you query from this Paimon table, you would like to first read from the results of batch job. But if a partition (for example, today's partition) does not exist in its result, then you would like to read from the results of streaming job. In this case, you can create a branch for streaming job, and set scan.fallback-branch to this streaming branch.

Let's look at an example.

-- create Paimon table
CREATE TABLE T (
    dt STRING NOT NULL,
    name STRING NOT NULL,
    amount BIGINT
) PARTITIONED BY (dt);

-- create a branch for streaming job
CALL sys.create_branch('default.T', 'test');

-- set primary key and bucket number for the branch
ALTER TABLE `T$branch_test` SET (
    'primary-key' = 'dt,name',
    'bucket' = '2'
);

-- set fallback branch
ALTER TABLE T SET (
    'scan.fallback-branch' = 'test'
);

-- set changelog producer for the streaming branch, in case a streaming job would like to read from it in the future
ALTER TABLE `T$branch_test` SET (
    'changelog-producer' = 'lookup'
);

-- write records into the streaming branch
INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);

-- write records into the default branch
INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);

SELECT * FROM T;
/*
+------------------+------------------+--------+
|               dt |             name | amount |
+------------------+------------------+--------+
|         20240725 |            apple |      5 |
|         20240725 |           banana |      7 |
|         20240726 |           cherry |      3 |
|         20240726 |             pear |      6 |
+------------------+------------------+--------+
*/

-- reset fallback branch
ALTER TABLE T RESET ( 'scan.fallback-branch' );

-- now it only reads from default branch
SELECT * FROM T;
/*
+------------------+------------------+--------+
|               dt |             name | amount |
+------------------+------------------+--------+
|         20240725 |            apple |      5 |
|         20240725 |           banana |      7 |
+------------------+------------------+--------+
*/

Tests

IT cases.

API and Format

No changes in format.

Documentation

Yes. Document is also updated.

Copy link
Contributor

@JingGe JingGe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for driving it. Just left some comments, PTAL

You can set the table option `scan.fallback-branch`
so that when a batch job reads from the current branch, if a partition does not exist,
the reader will try to read this partition from the fallback branch.
For streaming read jobs, this feature is currently not supported, and will only produce results from the current branch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the expected behaviour if the feature for the streaming read had correctly implemented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't decided yet. We need to talk with the users about it.

+------------------+------------------+--------+
| dt | name | amount |
+------------------+------------------+--------+
| 20240725 | apple | 5 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better first run the query without setting scan.fallback-branch and return the two rows of 20240725, then set 'scan.fallback-branch' = 'test' and run the query again to return the result of 4 rows with the data from the fallback branch. WDYT?


-- set fallback branch
ALTER TABLE T SET (
'scan.fallback-branch' = 'test'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to reset it? 'scan.fallback-branch' = null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALTER TABLE T RESET ('scan.fallback-branch'), see Paimon docs.

branchOptions,
catalogEnvironment);

Preconditions.checkArgument(!(table instanceof FallbackReadFileStoreTable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move the two Preconditions checks to the constructor of FallbackReadFileStoreTable ?

Copy link
Contributor

@JingGe JingGe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for driving it. Just left some comments, PTAL

tEnv.executeSql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'lion')").await();
assertThat(collectResult(tEnv, "SELECT v, k FROM t"))
.containsExactlyInAnyOrder(
"+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]", "+I[wolf, 20]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: additional test when scan.fallback-branch is not set, i.e. only return data from main branch

-- create a branch for streaming job
-- you can even specify primary keys and bucket number, even if the original branch has no primary key
-- in this example, we create a new branch `test`, uses `dt` and `name` as primary keys, has a bucket number of 2, and will copy the table options from the original branch
CALL sys.create_branch('default.T', 'test', 'dt, name', 2, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the subsequent development, different branches may have significant differences and require a lot of options modifications. Can we consider directly supporting the modification of all options of an empty table?

);

-- set changelog producer for the streaming branch, in case a streaming job would like to read from it in the future
ALTER TABLE `T$branch_test` SET (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put two alter together.

if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
Options branchOptions = new Options();
branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
branchOptions.set(CoreOptions.SCAN_FALLBACK_BRANCH, "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can introduce private method to createNoWrappedTable.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit e917af0 into apache:master Jul 31, 2024
8 of 10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants