Skip to content

Commit

Permalink
[core][branch] Add branches in tags table (apache#2754)
Browse files Browse the repository at this point in the history
* [core][branch] Add branches in tags table
  • Loading branch information
FangYongs authored Feb 7, 2024
1 parent 14cce88 commit e8aa707
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 17 deletions.
12 changes: 6 additions & 6 deletions docs/content/how-to/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ and some historical information of the snapshots. You can also get all tag names
SELECT * FROM MyTable$tags;

/*
+----------+-------------+-----------+-------------------------+--------------+
| tag_name | snapshot_id | schema_id | commit_time | record_count |
+----------+-------------+-----------+-------------------------+--------------+
| tag1 | 1 | 0 | 2023-06-28 14:55:29.344 | 3 |
| tag3 | 3 | 0 | 2023-06-28 14:58:24.691 | 7 |
+----------+-------------+-----------+-------------------------+--------------+
+----------+-------------+-----------+-------------------------+--------------+--------------+
| tag_name | snapshot_id | schema_id | commit_time | record_count | branches |
+----------+-------------+-----------+-------------------------+--------------+--------------+
| tag1 | 1 | 0 | 2023-06-28 14:55:29.344 | 3 | [] |
| tag3 | 3 | 0 | 2023-06-28 14:58:24.691 | 7 | [branch-1] |
+----------+-------------+-----------+-------------------------+--------------+--------------+
2 rows in set
*/
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.table.system;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
Expand All @@ -26,8 +27,11 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand All @@ -47,8 +51,10 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -72,7 +78,8 @@ public class TagsTable implements ReadonlyTable {
new DataField(1, "snapshot_id", new BigIntType(false)),
new DataField(2, "schema_id", new BigIntType(false)),
new DataField(3, "commit_time", new TimestampType(false, 3)),
new DataField(4, "record_count", new BigIntType(true))));
new DataField(4, "record_count", new BigIntType(true)),
new DataField(5, "branches", SerializationUtils.newStringType(true))));

private final FileIO fileIO;
private final Path location;
Expand Down Expand Up @@ -192,16 +199,28 @@ public RecordReader<InternalRow> createReader(Split split) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((TagsSplit) split).location;
SortedMap<Snapshot, List<String>> tags = new TagManager(fileIO, location).tags();
Options options = new Options();
options.set(CoreOptions.PATH, location.toUri().toString());
FileStoreTable table = FileStoreTableFactory.create(fileIO, options);
SortedMap<Snapshot, List<String>> tags = table.tagManager().tags();
Map<String, Snapshot> nameToSnapshot = new LinkedHashMap<>();
for (Map.Entry<Snapshot, List<String>> tag : tags.entrySet()) {
for (String tagName : tag.getValue()) {
nameToSnapshot.put(tagName, tag.getKey());
}
}
Map<String, List<String>> tagBranches = new HashMap<>();
table.branchManager()
.branches()
.forEach(
(branch, tag) ->
tagBranches
.computeIfAbsent(tag, key -> new ArrayList<>())
.add(branch));

Iterator<InternalRow> rows =
Iterators.transform(nameToSnapshot.entrySet().iterator(), this::toRow);
Iterators.transform(
nameToSnapshot.entrySet().iterator(), tag -> toRow(tag, tagBranches));
if (projection != null) {
rows =
Iterators.transform(
Expand All @@ -210,15 +229,18 @@ public RecordReader<InternalRow> createReader(Split split) {
return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(Map.Entry<String, Snapshot> tag) {
private InternalRow toRow(
Map.Entry<String, Snapshot> tag, Map<String, List<String>> tagBranches) {
Snapshot snapshot = tag.getValue();
List<String> branches = tagBranches.get(tag.getKey());
return GenericRow.of(
BinaryString.fromString(tag.getKey()),
snapshot.id(),
snapshot.schemaId(),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
snapshot.totalRecordCount());
snapshot.totalRecordCount(),
BinaryString.fromString(branches == null ? "[]" : branches.toString()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Manager for {@code Branch}. */
Expand Down Expand Up @@ -133,4 +139,24 @@ public boolean branchExists(String branchName) {
Path branchPath = branchPath(branchName);
return fileExists(branchPath);
}

/** Get branch->tag pair. */
public Map<String, String> branches() {
Map<String, String> branchTags = new HashMap<>();

try {
List<Path> paths =
listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX)
.map(FileStatus::getPath)
.collect(Collectors.toList());
for (Path path : paths) {
String branchName = path.getName().substring(BRANCH_PREFIX.length());
branchTags.put(branchName, tagManager.branchTags(branchName).get(0));
}
} catch (IOException e) {
throw new RuntimeException(e);
}

return branchTags;
}
}
18 changes: 18 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ public Path branchTagPath(String branchName, String tagName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
}

public List<String> branchTags(String branchName) {
try {
List<Path> tagPaths =
listVersionedFileStatus(
fileIO,
new Path(getBranchPath(tablePath, branchName) + "/tag/"),
TAG_PREFIX)
.map(FileStatus::getPath)
.collect(Collectors.toList());
checkArgument(tagPaths.size() > 0, "There should be at least one tag in the branch.");
return tagPaths.stream()
.map(p -> p.getName().substring(TAG_PREFIX.length()))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName, List<TagCallback> callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataTypes;
Expand All @@ -38,20 +39,23 @@

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;

/** Unit tests for {@link TagsTable}. */
public class TagsTableTest extends TableTestBase {
class TagsTableTest extends TableTestBase {

private static final String tableName = "MyTable";
private TagsTable tagsTable;
private TagManager tagManager;

@BeforeEach
public void before() throws Exception {
void before() throws Exception {
Identifier identifier = identifier(tableName);
Schema schema =
Schema.newBuilder()
Expand Down Expand Up @@ -82,13 +86,39 @@ public void before() throws Exception {
}

@Test
public void testTagsTable() throws Exception {
List<InternalRow> expectRow = getExceptedResult();
void testTagsTable() throws Exception {
List<InternalRow> expectRow =
getExceptedResult(
key -> {
return new ArrayList<>();
});
List<InternalRow> result = read(tagsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}

private List<InternalRow> getExceptedResult() {
@Test
void testTagBranchesTable() throws Exception {
Table table = catalog.getTable(identifier(tableName));
table.createBranch("2023-07-17-branch1", "2023-07-17");
table.createBranch("2023-07-18-branch1", "2023-07-18");
table.createBranch("2023-07-18-branch2", "2023-07-18");
List<InternalRow> expectRow =
getExceptedResult(
tag -> {
if (tag.equals("2023-07-17")) {
return Collections.singletonList("2023-07-17-branch1");
} else if (tag.equals("2023-07-18")) {
return Arrays.asList("2023-07-18-branch1", "2023-07-18-branch2");
} else {
return new ArrayList<>();
}
});
List<InternalRow> result = read(tagsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}

private List<InternalRow> getExceptedResult(
Function<String, List<String>> tagBranchesFunction) {
List<InternalRow> internalRows = new ArrayList<>();
for (Map.Entry<Snapshot, List<String>> tag : tagManager.tags().entrySet()) {
Snapshot snapshot = tag.getKey();
Expand All @@ -100,7 +130,9 @@ private List<InternalRow> getExceptedResult() {
snapshot.schemaId(),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(snapshot.timeMillis())),
snapshot.totalRecordCount()));
snapshot.totalRecordCount(),
BinaryString.fromString(
tagBranchesFunction.apply(tagName).toString())));
}
}
return internalRows;
Expand Down

0 comments on commit e8aa707

Please sign in to comment.