From d1f2acb06d654028368ee93663d45900386a44a4 Mon Sep 17 00:00:00 2001 From: Fang Yong Date: Mon, 19 Feb 2024 14:08:26 +0800 Subject: [PATCH] [core][branch] Add branches system table (#2759) --- .../org/apache/paimon/branch/TableBranch.java | 52 +++++ .../paimon/table/system/BranchesTable.java | 216 ++++++++++++++++++ .../table/system/SystemTableLoader.java | 3 + .../apache/paimon/table/system/TagsTable.java | 8 +- .../apache/paimon/utils/BranchManager.java | 45 ++-- .../org/apache/paimon/utils/TagManager.java | 23 -- .../table/system/BranchesTableTest.java | 93 ++++++++ .../paimon/table/system/TagsTableTest.java | 2 +- 8 files changed, 402 insertions(+), 40 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java new file mode 100644 index 000000000000..4b24c866faf5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/branch/TableBranch.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.branch; + +/** {@link TableBranch} has branch relevant information for table. */ +public class TableBranch { + private final String branchName; + private final String createdFromTag; + private final Long createdFromSnapshot; + + private final long createTime; + + public TableBranch( + String branchName, String createdFromTag, Long createdFromSnapshot, long createTime) { + this.branchName = branchName; + this.createdFromTag = createdFromTag; + this.createdFromSnapshot = createdFromSnapshot; + this.createTime = createTime; + } + + public String getBranchName() { + return branchName; + } + + public String getCreatedFromTag() { + return createdFromTag; + } + + public Long getCreatedFromSnapshot() { + return createdFromSnapshot; + } + + public long getCreateTime() { + return createTime; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java new file mode 100644 index 000000000000..da568aaff06a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.branch.TableBranch; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.ReadonlyTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.SerializationUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; + +/** A {@link Table} for showing branches of table. */ +public class BranchesTable implements ReadonlyTable { + + private static final long serialVersionUID = 1L; + + public static final String BRANCHES = "branches"; + + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField( + 0, "branch_name", SerializationUtils.newStringType(false)), + new DataField( + 1, "created_from_tag", SerializationUtils.newStringType(false)), + new DataField(2, "created_from_snapshot", new BigIntType(false)), + new DataField(3, "create_time", new TimestampType(false, 3)))); + + private final FileIO fileIO; + private final Path location; + + public BranchesTable(FileIO fileIO, Path location) { + this.fileIO = fileIO; + this.location = location; + } + + @Override + public String name() { + return location.getName() + SYSTEM_TABLE_SPLITTER + BRANCHES; + } + + @Override + public RowType rowType() { + return TABLE_TYPE; + } + + @Override + public List primaryKeys() { + return Arrays.asList("branch_name", "tag_name"); + } + + @Override + public InnerTableScan newScan() { + return new BranchesScan(); + } + + @Override + public InnerTableRead newRead() { + return new BranchesRead(fileIO); + } + + @Override + public Table copy(Map dynamicOptions) { + return new BranchesTable(fileIO, location); + } + + private class BranchesScan extends ReadOnceTableScan { + + @Override + public InnerTableScan withFilter(Predicate predicate) { + // TODO + return this; + } + + @Override + public Plan innerPlan() { + return () -> Collections.singletonList(new BranchesSplit(fileIO, location)); + } + } + + private static class BranchesSplit implements Split { + private static final long serialVersionUID = 1L; + + private final FileIO fileIO; + private final Path location; + + private BranchesSplit(FileIO fileIO, Path location) { + this.fileIO = fileIO; + this.location = location; + } + + @Override + public long rowCount() { + FileStoreTable table = FileStoreTableFactory.create(fileIO, location); + return table.branchManager().branchCount(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BranchesSplit that = (BranchesSplit) o; + return Objects.equals(location, that.location); + } + + @Override + public int hashCode() { + return Objects.hash(location); + } + } + + private static class BranchesRead implements InnerTableRead { + + private final FileIO fileIO; + private int[][] projection; + + public BranchesRead(FileIO fileIO) { + this.fileIO = fileIO; + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + // TODO + return this; + } + + @Override + public InnerTableRead withProjection(int[][] projection) { + this.projection = projection; + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return this; + } + + @Override + public RecordReader createReader(Split split) { + if (!(split instanceof BranchesSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + Path location = ((BranchesSplit) split).location; + FileStoreTable table = FileStoreTableFactory.create(fileIO, location); + List branches = table.branchManager().branches(); + Iterator rows = Iterators.transform(branches.iterator(), this::toRow); + if (projection != null) { + rows = + Iterators.transform( + rows, row -> ProjectedRow.from(projection).replaceRow(row)); + } + return new IteratorRecordReader<>(rows); + } + + private InternalRow toRow(TableBranch branch) { + return GenericRow.of( + BinaryString.fromString(branch.getBranchName()), + BinaryString.fromString(branch.getCreatedFromTag()), + branch.getCreatedFromSnapshot(), + Timestamp.fromLocalDateTime( + DateTimeUtils.toLocalDateTime(branch.getCreateTime()))); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index 71be3a582656..32aa9fa3596a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -36,6 +36,7 @@ import static org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION; import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS; import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG; +import static org.apache.paimon.table.system.BranchesTable.BRANCHES; import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; import static org.apache.paimon.table.system.FilesTable.FILES; @@ -73,6 +74,8 @@ public static Table load(String type, FileIO fileIO, FileStoreTable dataTable) { return new FilesTable(dataTable); case TAGS: return new TagsTable(fileIO, location); + case BRANCHES: + return new BranchesTable(fileIO, location); case CONSUMERS: return new ConsumersTable(fileIO, location); case READ_OPTIMIZED: diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 28786dc4a0fc..8027da2f6d67 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -213,10 +213,12 @@ public RecordReader createReader(Split split) { table.branchManager() .branches() .forEach( - (branch, tag) -> + branch -> tagBranches - .computeIfAbsent(tag, key -> new ArrayList<>()) - .add(branch)); + .computeIfAbsent( + branch.getCreatedFromTag(), + key -> new ArrayList<>()) + .add(branch.getBranchName())); Iterator rows = Iterators.transform( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 2bf167fe2d54..6564bd4e56dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -19,18 +19,20 @@ package org.apache.paimon.utils; import org.apache.paimon.Snapshot; +import org.apache.paimon.branch.TableBranch; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.SortedMap; import java.util.stream.Collectors; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; @@ -146,23 +148,40 @@ public boolean branchExists(String branchName) { return fileExists(branchPath); } - /** Get branch->tag pair. */ - public Map branches() { - Map branchTags = new HashMap<>(); + /** Get branch count for the table. */ + public long branchCount() { + try { + return listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX).count(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** Get all branches for the table. */ + public List branches() { try { - List paths = + List> paths = listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX) - .map(FileStatus::getPath) + .map(status -> Pair.of(status.getPath(), status.getModificationTime())) .collect(Collectors.toList()); - for (Path path : paths) { - String branchName = path.getName().substring(BRANCH_PREFIX.length()); - branchTags.put(branchName, tagManager.branchTags(branchName).get(0)); + List branches = new ArrayList<>(); + for (Pair path : paths) { + String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length()); + FileStoreTable branchTable = + FileStoreTableFactory.create( + fileIO, new Path(getBranchPath(tablePath, branchName))); + SortedMap> snapshotTags = branchTable.tagManager().tags(); + checkArgument(!snapshotTags.isEmpty()); + Snapshot snapshot = snapshotTags.firstKey(); + List tags = snapshotTags.get(snapshot); + checkArgument(tags.size() == 1); + branches.add( + new TableBranch(branchName, tags.get(0), snapshot.id(), path.getValue())); } + + return branches; } catch (IOException e) { throw new RuntimeException(e); } - - return branchTags; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 3f9599431540..a29a3e151c76 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -69,34 +69,11 @@ public Path tagPath(String tagName) { return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); } - /** Return the path of tag directory in branch. */ - public Path branchTagDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag"); - } - /** Return the path of a tag in branch. */ public Path branchTagPath(String branchName, String tagName) { return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); } - public List branchTags(String branchName) { - try { - List tagPaths = - listVersionedFileStatus( - fileIO, - new Path(getBranchPath(tablePath, branchName) + "/tag/"), - TAG_PREFIX) - .map(FileStatus::getPath) - .collect(Collectors.toList()); - checkArgument(tagPaths.size() > 0, "There should be at least one tag in the branch."); - return tagPaths.stream() - .map(p -> p.getName().substring(TAG_PREFIX.length())) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - /** Create a tag from given snapshot and save it in the storage. */ public void createTag(Snapshot snapshot, String tagName, List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java new file mode 100644 index 000000000000..f1fbbe0177c1 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/BranchesTableTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link BranchesTable}. */ +class BranchesTableTest extends TableTestBase { + private static final String tableName = "MyTable"; + private FileStoreTable table; + private BranchesTable branchesTable; + + @BeforeEach + void before() throws Exception { + Identifier identifier = identifier(tableName); + Schema schema = + Schema.newBuilder() + .column("product_id", DataTypes.INT()) + .column("price", DataTypes.INT()) + .column("sales", DataTypes.INT()) + .primaryKey("product_id") + .option("tag.automatic-creation", "watermark") + .option("tag.creation-period", "daily") + .option("tag.num-retained-max", "3") + .build(); + catalog.createTable(identifier, schema, true); + table = (FileStoreTable) catalog.getTable(identifier); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + commit.commit( + new ManifestCommittable( + 0, + Timestamp.fromLocalDateTime(LocalDateTime.parse("2023-07-18T12:00:01")) + .getMillisecond())); + commit.commit( + new ManifestCommittable( + 1, + Timestamp.fromLocalDateTime(LocalDateTime.parse("2023-07-19T12:00:01")) + .getMillisecond())); + branchesTable = (BranchesTable) catalog.getTable(identifier(tableName + "$branches")); + } + + @Test + void testEmptyBranches() throws Exception { + assertThat(read(branchesTable)).isEmpty(); + } + + @Test + void testBranches() throws Exception { + table.createBranch("my_branch1", "2023-07-17"); + table.createBranch("my_branch2", "2023-07-18"); + table.createBranch("my_branch3", "2023-07-18"); + List branches = read(branchesTable); + assertThat(branches.size()).isEqualTo(3); + assertThat( + branches.stream() + .map(v -> v.getString(0).toString()) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3"); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index 51cfcd702f8c..1f2b88047a8e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -108,7 +108,7 @@ void testTagBranchesTable() throws Exception { if (tag.equals("2023-07-17")) { return Collections.singletonList("2023-07-17-branch1"); } else if (tag.equals("2023-07-18")) { - return Arrays.asList("2023-07-18-branch1", "2023-07-18-branch2"); + return Arrays.asList("2023-07-18-branch2", "2023-07-18-branch1"); } else { return new ArrayList<>(); }