Skip to content

Commit

Permalink
[core] Revert branchs in TagsTable (#3350)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed May 30, 2024
1 parent 0b63cb8 commit 3881d6a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@

package org.apache.paimon.table.system;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand All @@ -52,10 +48,8 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -80,10 +74,9 @@ public class TagsTable implements ReadonlyTable {
new DataField(2, "schema_id", new BigIntType(false)),
new DataField(3, "commit_time", new TimestampType(false, 3)),
new DataField(4, "record_count", new BigIntType(true)),
new DataField(5, "branches", SerializationUtils.newStringType(true)),
new DataField(6, "create_time", new TimestampType(true, 3)),
new DataField(5, "create_time", new TimestampType(true, 3)),
new DataField(
7, "time_retained", SerializationUtils.newStringType(true))));
6, "time_retained", SerializationUtils.newStringType(true))));

private final FileIO fileIO;
private final Path location;
Expand Down Expand Up @@ -140,6 +133,7 @@ public Plan innerPlan() {
}

private static class TagsSplit implements Split {

private static final long serialVersionUID = 1L;

private final long rowCount;
Expand Down Expand Up @@ -205,28 +199,14 @@ public RecordReader<InternalRow> createReader(Split split) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((TagsSplit) split).location;
Options options = new Options();
options.set(CoreOptions.PATH, location.toUri().toString());
FileStoreTable table = FileStoreTableFactory.create(fileIO, options);
List<Pair<Tag, String>> tags = table.tagManager().tagObjects();
List<Pair<Tag, String>> tags = new TagManager(fileIO, location).tagObjects();
Map<String, Tag> nameToSnapshot = new LinkedHashMap<>();
for (Pair<Tag, String> tag : tags) {
nameToSnapshot.put(tag.getValue(), tag.getKey());
}
Map<String, List<String>> tagBranches = new HashMap<>();
table.branchManager()
.branches()
.forEach(
branch ->
tagBranches
.computeIfAbsent(
branch.getCreatedFromTag(),
key -> new ArrayList<>())
.add(branch.getBranchName()));

Iterator<InternalRow> rows =
Iterators.transform(
nameToSnapshot.entrySet().iterator(), tag -> toRow(tag, tagBranches));
Iterators.transform(nameToSnapshot.entrySet().iterator(), this::toRow);
if (projection != null) {
rows =
Iterators.transform(
Expand All @@ -235,17 +215,14 @@ public RecordReader<InternalRow> createReader(Split split) {
return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(
Map.Entry<String, Tag> snapshot, Map<String, List<String>> tagBranches) {
private InternalRow toRow(Map.Entry<String, Tag> snapshot) {
Tag tag = snapshot.getValue();
List<String> branches = tagBranches.get(snapshot.getKey());
return GenericRow.of(
BinaryString.fromString(snapshot.getKey()),
tag.id(),
tag.schemaId(),
Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(tag.timeMillis())),
tag.totalRecordCount(),
BinaryString.fromString(branches == null ? "[]" : branches.toString()),
Optional.ofNullable(tag.getTagCreateTime())
.map(Timestamp::fromLocalDateTime)
.orElse(null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.tag.Tag;
Expand All @@ -40,10 +39,7 @@

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -87,38 +83,12 @@ void before() throws Exception {

@Test
void testTagsTable() throws Exception {
List<InternalRow> expectRow =
getExceptedResult(
key -> {
return new ArrayList<>();
});
List<InternalRow> expectRow = getExceptedResult();
List<InternalRow> result = read(tagsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}

@Test
void testTagBranchesTable() throws Exception {
Table table = catalog.getTable(identifier(tableName));
table.createBranch("2023-07-17-branch1", "2023-07-17");
table.createBranch("2023-07-18-branch1", "2023-07-18");
table.createBranch("2023-07-18-branch2", "2023-07-18");
List<InternalRow> expectRow =
getExceptedResult(
tag -> {
if (tag.equals("2023-07-17")) {
return Collections.singletonList("2023-07-17-branch1");
} else if (tag.equals("2023-07-18")) {
return Arrays.asList("2023-07-18-branch1", "2023-07-18-branch2");
} else {
return new ArrayList<>();
}
});
List<InternalRow> result = read(tagsTable);
assertThat(result).containsExactlyElementsOf(expectRow);
}

private List<InternalRow> getExceptedResult(
Function<String, List<String>> tagBranchesFunction) {
private List<InternalRow> getExceptedResult() {
List<InternalRow> internalRows = new ArrayList<>();
for (Pair<Tag, String> snapshot : tagManager.tagObjects()) {
Tag tag = snapshot.getKey();
Expand All @@ -131,7 +101,6 @@ private List<InternalRow> getExceptedResult(
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(tag.timeMillis())),
tag.totalRecordCount(),
BinaryString.fromString(tagBranchesFunction.apply(tagName).toString()),
tag.getTagCreateTime() == null
? null
: Timestamp.fromLocalDateTime(tag.getTagCreateTime()),
Expand Down

0 comments on commit 3881d6a

Please sign in to comment.