Skip to content

Commit

Permalink
[core][branch] Add branches system table (#2759)
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs authored Feb 19, 2024
1 parent 9bd6020 commit d1f2acb
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.branch;

/** {@link TableBranch} has branch relevant information for table. */
public class TableBranch {
private final String branchName;
private final String createdFromTag;
private final Long createdFromSnapshot;

private final long createTime;

public TableBranch(
String branchName, String createdFromTag, Long createdFromSnapshot, long createTime) {
this.branchName = branchName;
this.createdFromTag = createdFromTag;
this.createdFromSnapshot = createdFromSnapshot;
this.createTime = createTime;
}

public String getBranchName() {
return branchName;
}

public String getCreatedFromTag() {
return createdFromTag;
}

public Long getCreatedFromSnapshot() {
return createdFromSnapshot;
}

public long getCreateTime() {
return createTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.system;

import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

/** A {@link Table} for showing branches of table. */
public class BranchesTable implements ReadonlyTable {

private static final long serialVersionUID = 1L;

public static final String BRANCHES = "branches";

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(
0, "branch_name", SerializationUtils.newStringType(false)),
new DataField(
1, "created_from_tag", SerializationUtils.newStringType(false)),
new DataField(2, "created_from_snapshot", new BigIntType(false)),
new DataField(3, "create_time", new TimestampType(false, 3))));

private final FileIO fileIO;
private final Path location;

public BranchesTable(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
}

@Override
public String name() {
return location.getName() + SYSTEM_TABLE_SPLITTER + BRANCHES;
}

@Override
public RowType rowType() {
return TABLE_TYPE;
}

@Override
public List<String> primaryKeys() {
return Arrays.asList("branch_name", "tag_name");
}

@Override
public InnerTableScan newScan() {
return new BranchesScan();
}

@Override
public InnerTableRead newRead() {
return new BranchesRead(fileIO);
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new BranchesTable(fileIO, location);
}

private class BranchesScan extends ReadOnceTableScan {

@Override
public InnerTableScan withFilter(Predicate predicate) {
// TODO
return this;
}

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new BranchesSplit(fileIO, location));
}
}

private static class BranchesSplit implements Split {
private static final long serialVersionUID = 1L;

private final FileIO fileIO;
private final Path location;

private BranchesSplit(FileIO fileIO, Path location) {
this.fileIO = fileIO;
this.location = location;
}

@Override
public long rowCount() {
FileStoreTable table = FileStoreTableFactory.create(fileIO, location);
return table.branchManager().branchCount();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BranchesSplit that = (BranchesSplit) o;
return Objects.equals(location, that.location);
}

@Override
public int hashCode() {
return Objects.hash(location);
}
}

private static class BranchesRead implements InnerTableRead {

private final FileIO fileIO;
private int[][] projection;

public BranchesRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
return this;
}

@Override
public InnerTableRead withProjection(int[][] projection) {
this.projection = projection;
return this;
}

@Override
public TableRead withIOManager(IOManager ioManager) {
return this;
}

@Override
public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof BranchesSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((BranchesSplit) split).location;
FileStoreTable table = FileStoreTableFactory.create(fileIO, location);
List<TableBranch> branches = table.branchManager().branches();
Iterator<InternalRow> rows = Iterators.transform(branches.iterator(), this::toRow);
if (projection != null) {
rows =
Iterators.transform(
rows, row -> ProjectedRow.from(projection).replaceRow(row));
}
return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(TableBranch branch) {
return GenericRow.of(
BinaryString.fromString(branch.getBranchName()),
BinaryString.fromString(branch.getCreatedFromTag()),
branch.getCreatedFromSnapshot(),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(branch.getCreateTime())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG;
import static org.apache.paimon.table.system.BranchesTable.BRANCHES;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS;
import static org.apache.paimon.table.system.FilesTable.FILES;
Expand Down Expand Up @@ -73,6 +74,8 @@ public static Table load(String type, FileIO fileIO, FileStoreTable dataTable) {
return new FilesTable(dataTable);
case TAGS:
return new TagsTable(fileIO, location);
case BRANCHES:
return new BranchesTable(fileIO, location);
case CONSUMERS:
return new ConsumersTable(fileIO, location);
case READ_OPTIMIZED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,12 @@ public RecordReader<InternalRow> createReader(Split split) {
table.branchManager()
.branches()
.forEach(
(branch, tag) ->
branch ->
tagBranches
.computeIfAbsent(tag, key -> new ArrayList<>())
.add(branch));
.computeIfAbsent(
branch.getCreatedFromTag(),
key -> new ArrayList<>())
.add(branch.getBranchName()));

Iterator<InternalRow> rows =
Iterators.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
package org.apache.paimon.utils;

import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
Expand Down Expand Up @@ -146,23 +148,40 @@ public boolean branchExists(String branchName) {
return fileExists(branchPath);
}

/** Get branch->tag pair. */
public Map<String, String> branches() {
Map<String, String> branchTags = new HashMap<>();
/** Get branch count for the table. */
public long branchCount() {
try {
return listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX).count();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Get all branches for the table. */
public List<TableBranch> branches() {
try {
List<Path> paths =
List<Pair<Path, Long>> paths =
listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX)
.map(FileStatus::getPath)
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
.collect(Collectors.toList());
for (Path path : paths) {
String branchName = path.getName().substring(BRANCH_PREFIX.length());
branchTags.put(branchName, tagManager.branchTags(branchName).get(0));
List<TableBranch> branches = new ArrayList<>();
for (Pair<Path, Long> path : paths) {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
checkArgument(!snapshotTags.isEmpty());
Snapshot snapshot = snapshotTags.firstKey();
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
branches.add(
new TableBranch(branchName, tags.get(0), snapshot.id(), path.getValue()));
}

return branches;
} catch (IOException e) {
throw new RuntimeException(e);
}

return branchTags;
}
}
Loading

0 comments on commit d1f2acb

Please sign in to comment.