Skip to content

Commit

Permalink
[fix] Optimize BranchManager#branches
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Aug 5, 2024
1 parent 726fb52 commit 1b69e9c
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 161 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.table.system;

import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
Expand All @@ -28,6 +28,8 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.ReadonlyTable;
Expand All @@ -42,21 +44,33 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.apache.paimon.utils.BranchManager.BRANCH_PREFIX;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A {@link Table} for showing branches of table. */
public class BranchesTable implements ReadonlyTable {
Expand Down Expand Up @@ -189,25 +203,78 @@ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof BranchesSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}

Path location = ((BranchesSplit) split).location;
FileStoreTable table = FileStoreTableFactory.create(fileIO, location);
List<TableBranch> branches = table.branchManager().branches();
Iterator<InternalRow> rows = Iterators.transform(branches.iterator(), this::toRow);
Iterator<InternalRow> rows;
try {
rows = branches(table).iterator();
} catch (IOException e) {
throw new UncheckedIOException(e);
}

if (projection != null) {
rows =
Iterators.transform(
rows, row -> ProjectedRow.from(projection).replaceRow(row));
}

return new IteratorRecordReader<>(rows);
}

private InternalRow toRow(TableBranch branch) {
return GenericRow.of(
BinaryString.fromString(branch.getBranchName()),
BinaryString.fromString(branch.getCreatedFromTag()),
branch.getCreatedFromSnapshot(),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(branch.getCreateTime())));
private List<InternalRow> branches(FileStoreTable table) throws IOException {
BranchManager branchManager = table.branchManager();
SchemaManager schemaManager = new SchemaManager(fileIO, table.location());

List<Pair<Path, Long>> paths =
listVersionedDirectories(fileIO, branchManager.branchDirectory(), BRANCH_PREFIX)
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
.collect(Collectors.toList());
List<InternalRow> result = new ArrayList<>();

for (Pair<Path, Long> path : paths) {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
String basedTag = null;
Long basedSnapshotId = null;
long creationTime = path.getRight();

Optional<TableSchema> tableSchema =
schemaManager.copyWithBranch(branchName).latest();
if (tableSchema.isPresent()) {
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(branchPath(table.location(), branchName)));
SortedMap<Snapshot, List<String>> snapshotTags =
branchTable.tagManager().tags();
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
// create based on snapshotId
basedSnapshotId = earliestSnapshotId;
} else {
Snapshot snapshot = snapshotTags.firstKey();
if (Objects.equals(earliestSnapshotId, snapshot.id())) {
// create based on tag
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
basedTag = tags.get(0);
basedSnapshotId = snapshot.id();
} else {
// create based on snapshotId
basedSnapshotId = earliestSnapshotId;
}
}
}

result.add(
GenericRow.of(
BinaryString.fromString(branchName),
BinaryString.fromString(basedTag),
basedSnapshotId,
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(creationTime))));
}

return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,17 @@
package org.apache.paimon.utils;

import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -315,51 +307,11 @@ public boolean branchExists(String branchName) {
}

/** Get all branches for the table. */
public List<TableBranch> branches() {
public List<String> branches() {
try {
List<Pair<Path, Long>> paths =
listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX)
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
.collect(Collectors.toList());
PriorityQueue<TableBranch> pq =
new PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime));
for (Pair<Path, Long> path : paths) {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
Optional<TableSchema> tableSchema =
schemaManager.copyWithBranch(branchName).latest();
if (!tableSchema.isPresent()) {
// Support empty branch.
pq.add(new TableBranch(branchName, path.getValue()));
continue;
}
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(branchPath(tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
// Create based on snapshotId.
pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue()));
} else {
Snapshot snapshot = snapshotTags.firstKey();
if (earliestSnapshotId == snapshot.id()) {
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
pq.add(
new TableBranch(
branchName, tags.get(0), snapshot.id(), path.getValue()));
} else {
// Create based on snapshotId.
pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue()));
}
}
}

List<TableBranch> branches = new ArrayList<>(pq.size());
while (!pq.isEmpty()) {
branches.add(pq.poll());
}
return branches;
return listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX)
.map(status -> status.getPath().getName().substring(BRANCH_PREFIX.length()))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.flink;

import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

Expand Down Expand Up @@ -137,8 +135,7 @@ public void testCreateBranchFromTag() throws Exception {
}

@Test
public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException {

public void testCreateBranchFromSnapshot() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
Expand All @@ -158,12 +155,8 @@ public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException
sql("CALL sys.create_branch('default.T', 'test', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");

FileStoreTable table = paimonTable("T");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L);
assertThat(collectResult("SELECT created_from_snapshot FROM `T$branches`"))
.containsExactlyInAnyOrder("+I[1]", "+I[2]");

assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1))
.isEqualTo(true);
Expand Down Expand Up @@ -223,25 +216,17 @@ public void testDeleteBranchTable() throws Exception {
sql("CALL sys.create_branch('default.T', 'test', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");

FileStoreTable table = paimonTable("T");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L);

assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
.containsExactlyInAnyOrder("test", "test2");
assertThat(collectResult("SELECT branch_name, created_from_snapshot FROM `T$branches`"))
.containsExactlyInAnyOrder("+I[test, 1]", "+I[test2, 2]");

sql("CALL sys.delete_branch('default.T', 'test')");

assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
.containsExactlyInAnyOrder("test2");
assertThat(collectResult("SELECT branch_name, created_from_snapshot FROM `T$branches`"))
.containsExactlyInAnyOrder("+I[test2, 2]");
}

@Test
public void testBranchManagerGetBranchSnapshotsList()
throws Catalog.TableNotExistException, IOException {
public void testBranchManagerGetBranchSnapshotsList() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
Expand All @@ -263,10 +248,8 @@ public void testBranchManagerGetBranchSnapshotsList()
sql("CALL sys.create_branch('default.T', 'test2', 2)");
sql("CALL sys.create_branch('default.T', 'test3', 3)");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L, 3L);
assertThat(collectResult("SELECT created_from_snapshot FROM `T$branches`"))
.containsExactlyInAnyOrder("+I[1]", "+I[2]", "+I[3]");
}

@Test
Expand Down Expand Up @@ -370,7 +353,7 @@ public void testFallbackBranchBatchRead() throws Exception {
}

@Test
public void testDifferentRowTypes() throws Exception {
public void testDifferentRowTypes() {
sql(
"CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
sql("CALL sys.create_branch('default.t', 'pk')");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand Down Expand Up @@ -331,10 +330,7 @@ private boolean partitionExistsInOtherBranches(
getTable(
new Identifier(
identifier.getDatabaseName(), identifier.getTableName()));
List<String> branchNames =
mainTable.branchManager().branches().stream()
.map(TableBranch::getBranchName)
.collect(Collectors.toList());
List<String> branchNames = new ArrayList<>(mainTable.branchManager().branches());
branchNames.add(DEFAULT_MAIN_BRANCH);

for (String branchName : branchNames) {
Expand Down

0 comments on commit 1b69e9c

Please sign in to comment.