From c95c3e6f9132dc89fd39cf0427d9b5fba544476c Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Thu, 14 Nov 2024 10:57:02 +0800 Subject: [PATCH] [core] supports using dynamic parameters to query the system table of a specified branch. (#4527) --- .../table/system/AggregationFieldsTable.java | 18 +++---- .../paimon/table/system/BranchesTable.java | 13 +++-- .../paimon/table/system/ConsumersTable.java | 18 +++---- .../paimon/table/system/OptionsTable.java | 18 +++---- .../paimon/table/system/SchemasTable.java | 18 +++---- .../paimon/table/system/SnapshotsTable.java | 17 ++---- .../apache/paimon/table/system/TagsTable.java | 18 +++---- .../apache/paimon/flink/BranchSqlITCase.java | 52 +++++++++++++++++++ 8 files changed, 97 insertions(+), 75 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java index a88bde9e5d72..10a046ca70b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java @@ -80,17 +80,13 @@ public class AggregationFieldsTable implements ReadonlyTable { private final Path location; private final String branch; - public AggregationFieldsTable(FileStoreTable dataTable) { - this( - dataTable.fileIO(), - dataTable.location(), - CoreOptions.branch(dataTable.schema().options())); - } + private final FileStoreTable dataTable; - public AggregationFieldsTable(FileIO fileIO, Path location, String branchName) { - this.fileIO = fileIO; - this.location = location; - this.branch = branchName; + public AggregationFieldsTable(FileStoreTable dataTable) { + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.branch = CoreOptions.branch(dataTable.schema().options()); + this.dataTable = dataTable; } @Override @@ -120,7 +116,7 @@ public InnerTableRead newRead() { @Override public Table copy(Map dynamicOptions) { - return new AggregationFieldsTable(fileIO, location, branch); + return new AggregationFieldsTable(dataTable.copy(dynamicOptions)); } private class SchemasScan extends ReadOnceTableScan { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index f523f20e9d20..384a2eee92c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -81,13 +81,12 @@ public class BranchesTable implements ReadonlyTable { private final FileIO fileIO; private final Path location; - public BranchesTable(FileStoreTable dataTable) { - this(dataTable.fileIO(), dataTable.location()); - } + private final FileStoreTable dataTable; - public BranchesTable(FileIO fileIO, Path location) { - this.fileIO = fileIO; - this.location = location; + public BranchesTable(FileStoreTable dataTable) { + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.dataTable = dataTable; } @Override @@ -117,7 +116,7 @@ public InnerTableRead newRead() { @Override public Table copy(Map dynamicOptions) { - return new BranchesTable(fileIO, location); + return new BranchesTable(dataTable.copy(dynamicOptions)); } private class BranchesScan extends ReadOnceTableScan { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java index 9f7d12961e2f..7e4816b13510 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java @@ -74,17 +74,13 @@ public class ConsumersTable implements ReadonlyTable { private final Path location; private final String branch; - public ConsumersTable(FileStoreTable dataTable) { - this( - dataTable.fileIO(), - dataTable.location(), - CoreOptions.branch(dataTable.schema().options())); - } + private final FileStoreTable dataTable; - public ConsumersTable(FileIO fileIO, Path location, String branchName) { - this.fileIO = fileIO; - this.location = location; - this.branch = branchName; + public ConsumersTable(FileStoreTable dataTable) { + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.branch = CoreOptions.branch(dataTable.schema().options()); + this.dataTable = dataTable; } @Override @@ -114,7 +110,7 @@ public InnerTableRead newRead() { @Override public Table copy(Map dynamicOptions) { - return new ConsumersTable(fileIO, location, branch); + return new ConsumersTable(dataTable.copy(dynamicOptions)); } private class ConsumersScan extends ReadOnceTableScan { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java index b4a3b82a2f5f..c7dec03343d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java @@ -72,17 +72,13 @@ public class OptionsTable implements ReadonlyTable { private final Path location; private final String branch; - public OptionsTable(FileStoreTable dataTable) { - this( - dataTable.fileIO(), - dataTable.location(), - CoreOptions.branch(dataTable.schema().options())); - } + private final FileStoreTable dataTable; - public OptionsTable(FileIO fileIO, Path location, String branchName) { - this.fileIO = fileIO; - this.location = location; - this.branch = branchName; + public OptionsTable(FileStoreTable dataTable) { + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.branch = CoreOptions.branch(dataTable.schema().options()); + this.dataTable = dataTable; } @Override @@ -112,7 +108,7 @@ public InnerTableRead newRead() { @Override public Table copy(Map dynamicOptions) { - return new OptionsTable(fileIO, location, branch); + return new OptionsTable(dataTable.copy(dynamicOptions)); } private class OptionsScan extends ReadOnceTableScan { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java index 86e2598c609c..d0df75b34f51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java @@ -102,17 +102,13 @@ public class SchemasTable implements ReadonlyTable { private final Path location; private final String branch; - public SchemasTable(FileStoreTable dataTable) { - this( - dataTable.fileIO(), - dataTable.location(), - CoreOptions.branch(dataTable.schema().options())); - } + private final FileStoreTable dataTable; - public SchemasTable(FileIO fileIO, Path location, String branchName) { - this.fileIO = fileIO; - this.location = location; - this.branch = branchName; + public SchemasTable(FileStoreTable dataTable) { + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.branch = CoreOptions.branch(dataTable.schema().options()); + this.dataTable = dataTable; } @Override @@ -142,7 +138,7 @@ public InnerTableRead newRead() { @Override public Table copy(Map dynamicOptions) { - return new SchemasTable(fileIO, location, branch); + return new SchemasTable(dataTable.copy(dynamicOptions)); } private class SchemasScan extends ReadOnceTableScan { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index a95843219440..10e5b691acc3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -116,19 +116,10 @@ public class SnapshotsTable implements ReadonlyTable { private final FileStoreTable dataTable; public SnapshotsTable(FileStoreTable dataTable) { - this( - dataTable.fileIO(), - dataTable.location(), - dataTable, - CoreOptions.branch(dataTable.schema().options())); - } - - public SnapshotsTable( - FileIO fileIO, Path location, FileStoreTable dataTable, String branchName) { - this.fileIO = fileIO; - this.location = location; + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.branch = CoreOptions.branch(dataTable.schema().options()); this.dataTable = dataTable; - this.branch = branchName; } @Override @@ -158,7 +149,7 @@ public InnerTableRead newRead() { @Override public Table copy(Map dynamicOptions) { - return new SnapshotsTable(fileIO, location, dataTable.copy(dynamicOptions), branch); + return new SnapshotsTable(dataTable.copy(dynamicOptions)); } private class SnapshotsScan extends ReadOnceTableScan { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 4d1b4e22ab18..9aafdb5983fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -95,17 +95,13 @@ public class TagsTable implements ReadonlyTable { private final Path location; private final String branch; - public TagsTable(FileStoreTable dataTable) { - this( - dataTable.fileIO(), - dataTable.location(), - CoreOptions.branch(dataTable.schema().options())); - } + private final FileStoreTable dataTable; - public TagsTable(FileIO fileIO, Path location, String branchName) { - this.fileIO = fileIO; - this.location = location; - this.branch = branchName; + public TagsTable(FileStoreTable dataTable) { + this.fileIO = dataTable.fileIO(); + this.location = dataTable.location(); + this.branch = CoreOptions.branch(dataTable.schema().options()); + this.dataTable = dataTable; } @Override @@ -135,7 +131,7 @@ public InnerTableRead newRead() { @Override public Table copy(Map dynamicOptions) { - return new TagsTable(fileIO, location, branch); + return new TagsTable(dataTable.copy(dynamicOptions)); } private class TagsScan extends ReadOnceTableScan { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 1d33a9e8a6f2..c25d99cb4459 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -347,6 +347,11 @@ public void testBranchOptionsTable() throws Exception { "+I[bucket, 2]", "+I[snapshot.time-retained, 1 h]", "+I[scan.infer-parallelism, false]"); + assertThat(collectResult("SELECT * FROM t$options /*+ OPTIONS('branch'='test') */")) + .containsExactlyInAnyOrder( + "+I[bucket, 2]", + "+I[snapshot.time-retained, 1 h]", + "+I[scan.infer-parallelism, false]"); } @Test @@ -360,6 +365,10 @@ public void testBranchSchemasTable() throws Exception { sql("ALTER TABLE t$branch_b1 SET ('snapshot.time-retained' = '5 h')"); assertThat(collectResult("SELECT schema_id FROM t$branch_b1$schemas order by schema_id")) .containsExactlyInAnyOrder("+I[0]", "+I[1]"); + assertThat( + collectResult( + "SELECT schema_id FROM t$schemas /*+ OPTIONS('branch'='b1') */ order by schema_id")) + .containsExactlyInAnyOrder("+I[0]", "+I[1]"); } @Test @@ -373,6 +382,8 @@ public void testBranchAuditLogTable() throws Exception { sql("INSERT INTO t$branch_b1 VALUES (3, 4)"); assertThat(collectResult("SELECT * FROM t$branch_b1$audit_log")) .containsExactlyInAnyOrder("+I[+I, 3, 4]"); + assertThat(collectResult("SELECT * FROM t$audit_log /*+ OPTIONS('branch'='b1') */")) + .containsExactlyInAnyOrder("+I[+I, 3, 4]"); } @Test @@ -385,6 +396,8 @@ public void testBranchReadOptimizedTable() throws Exception { sql("INSERT INTO t$branch_b1 VALUES (3, 4)"); assertThat(collectResult("SELECT * FROM t$branch_b1$ro")) .containsExactlyInAnyOrder("+I[3, 4]"); + assertThat(collectResult("SELECT * FROM t$ro /*+ OPTIONS('branch'='b1') */")) + .containsExactlyInAnyOrder("+I[3, 4]"); } @Test @@ -400,6 +413,10 @@ public void testBranchFilesTable() throws Exception { .containsExactlyInAnyOrder("+I[{a=1, b=2}]"); assertThat(collectResult("SELECT min_value_stats FROM t$branch_b1$files")) .containsExactlyInAnyOrder("+I[{a=3, b=4}]", "+I[{a=5, b=6}]"); + assertThat( + collectResult( + "SELECT min_value_stats FROM t$files /*+ OPTIONS('branch'='b1') */")) + .containsExactlyInAnyOrder("+I[{a=3, b=4}]", "+I[{a=5, b=6}]"); } @Test @@ -416,6 +433,10 @@ public void testBranchTagsTable() throws Exception { .containsExactlyInAnyOrder("+I[tag1, 1, 1]"); assertThat(collectResult("SELECT tag_name,snapshot_id,record_count FROM t$branch_b1$tags")) .containsExactlyInAnyOrder("+I[tag1, 1, 1]", "+I[tag2, 2, 2]"); + assertThat( + collectResult( + "SELECT tag_name,snapshot_id,record_count FROM t$tags /*+ OPTIONS('branch'='b1') */")) + .containsExactlyInAnyOrder("+I[tag1, 1, 1]", "+I[tag2, 2, 2]"); } @Test @@ -435,6 +456,8 @@ public void testBranchConsumersTable() throws Exception { assertThat(collectResult("SELECT * FROM t$consumers")).isEmpty(); assertThat(collectResult("SELECT * FROM t$branch_b1$consumers")) .containsExactlyInAnyOrder("+I[id1, 2]"); + assertThat(collectResult("SELECT * FROM t$consumers /*+ OPTIONS('branch'='b1') */")) + .containsExactlyInAnyOrder("+I[id1, 2]"); } @Test @@ -458,6 +481,31 @@ public void testBranchManifestsTable() { .isTrue(); assertThat((long) row.getField(2)).isGreaterThan(0L); }); + List dynamicOptionRes = + sql( + "SELECT schema_id, file_name, file_size FROM t$manifests /*+ OPTIONS('branch'='b1') */"); + assertThat(dynamicOptionRes).containsExactlyInAnyOrderElementsOf(res); + } + + @Test + public void testBranchSnapshotsTable() throws Exception { + sql("CREATE TABLE t (a INT, b INT)"); + sql("INSERT INTO t VALUES (1, 2)"); + + sql("CALL sys.create_branch('default.t', 'b1')"); + sql("INSERT INTO t$branch_b1 VALUES (3, 4)"); + sql("INSERT INTO t$branch_b1 VALUES (5, 6)"); + + assertThat(collectResult("SELECT snapshot_id, schema_id, commit_kind FROM t$snapshots")) + .containsExactlyInAnyOrder("+I[1, 0, APPEND]"); + assertThat( + collectResult( + "SELECT snapshot_id, schema_id, commit_kind FROM t$branch_b1$snapshots")) + .containsExactlyInAnyOrder("+I[1, 0, APPEND]", "+I[2, 0, APPEND]"); + assertThat( + collectResult( + "SELECT snapshot_id, schema_id, commit_kind FROM t$snapshots /*+ OPTIONS('branch'='b1') */")) + .containsExactlyInAnyOrder("+I[1, 0, APPEND]", "+I[2, 0, APPEND]"); } @Test @@ -479,6 +527,10 @@ public void testBranchPartitionsTable() throws Exception { collectResult( "SELECT `partition`, record_count, file_count FROM t$branch_b1$partitions")) .containsExactlyInAnyOrder("+I[[1], 2, 2]", "+I[[2], 3, 2]"); + assertThat( + collectResult( + "SELECT `partition`, record_count, file_count FROM t$partitions /*+ OPTIONS('branch'='b1') */")) + .containsExactlyInAnyOrder("+I[[1], 2, 2]", "+I[[2], 3, 2]"); } @Test