Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support create a empty branch and create a branch based on snapshotId #2938

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ public class TableBranch {

private final long createTime;

public TableBranch(String branchName, Long createdFromSnapshot, long createTime) {
this.branchName = branchName;
this.createdFromTag = null;
this.createdFromSnapshot = createdFromSnapshot;
this.createTime = createTime;
}

public TableBranch(String branchName, long createTime) {
this.branchName = branchName;
this.createdFromTag = null;
this.createdFromSnapshot = null;
this.createTime = createTime;
}

public TableBranch(
String branchName, String createdFromTag, Long createdFromSnapshot, long createTime) {
this.branchName = branchName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,16 @@ public void deleteTag(String tagName) {
store().createTagCallbacks());
}

@Override
public void createBranch(String branchName) {
branchManager().createBranch(branchName);
}

@Override
public void createBranch(String branchName, long snapshotId) {
branchManager().createBranch(branchName, snapshotId);
}

@Override
public void createBranch(String branchName, String tagName) {
branchManager().createBranch(branchName, tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ default void rollbackTo(String tagName) {
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support create empty branch.",
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName, long snapshotId) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createBranch with snapshotId.",
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName, String tagName) {
throw new UnsupportedOperationException(
Expand Down
8 changes: 8 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public interface Table extends Serializable {
@Experimental
void rollbackTo(String tagName);

/** Create a empty branch. */
@Experimental
void createBranch(String branchName);

/** Create a branch from given snapshot. */
@Experimental
void createBranch(String branchName, long snapshotId);

/** Create a branch from given tag. */
@Experimental
void createBranch(String branchName, String tagName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;

Expand All @@ -33,6 +34,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.SortedMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,6 +84,65 @@ public Path branchPath(String branchName) {
return new Path(getBranchPath(tablePath, branchName));
}

/** Create empty branch. */
public void createBranch(String branchName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
try {
TableSchema latestSchema = schemaManager.latest().get();
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(latestSchema.id()),
schemaManager.branchSchemaPath(branchName, latestSchema.id()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
e);
}
}

public void createBranch(String branchName, long snapshotId) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);

Snapshot snapshot = snapshotManager.snapshot(snapshotId);

try {
// Copy the corresponding snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshotId),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
e);
}
}

public void createBranch(String branchName, String tagName) {
checkArgument(
!branchName.equals(DEFAULT_MAIN_BRANCH),
Expand Down Expand Up @@ -170,15 +231,34 @@ public List<TableBranch> branches() {
new PriorityQueue<>(Comparator.comparingLong(TableBranch::getCreateTime));
for (Pair<Path, Long> path : paths) {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
Optional<TableSchema> tableSchema = schemaManager.latest(branchName);
if (!tableSchema.isPresent()) {
// Support empty branch.
pq.add(new TableBranch(branchName, path.getValue()));
continue;
}
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath, branchName)));

SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
checkArgument(!snapshotTags.isEmpty());
Snapshot snapshot = snapshotTags.firstKey();
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
pq.add(new TableBranch(branchName, tags.get(0), snapshot.id(), path.getValue()));
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
// Create based on snapshotId.
pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue()));
} else {
Snapshot snapshot = snapshotTags.firstKey();
if (earliestSnapshotId == snapshot.id()) {
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
pq.add(
new TableBranch(
branchName, tags.get(0), snapshot.id(), path.getValue()));
} else {
// Create based on snapshotId.
pq.add(new TableBranch(branchName, earliestSnapshotId, path.getValue()));
}
}
}

List<TableBranch> branches = new ArrayList<>(pq.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;

/**
Expand All @@ -43,13 +44,30 @@ public String identifier() {
public String[] call(
ProcedureContext procedureContext, String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, tagName);
return innerCall(tableId, branchName, tagName, 0);
}

private String[] innerCall(String tableId, String branchName, String tagName)
public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
throws Catalog.TableNotExistException {
schnappi17 marked this conversation as resolved.
Show resolved Hide resolved
return innerCall(tableId, branchName, null, 0);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String branchName, long snapshotId)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, null, snapshotId);
}

private String[] innerCall(String tableId, String branchName, String tagName, long snapshotId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.createBranch(branchName, tagName);
if (!StringUtils.isBlank(tagName)) {
table.createBranch(branchName, tagName);
} else if (snapshotId > 0) {
table.createBranch(branchName, snapshotId);
} else {
table.createBranch(branchName);
}
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
schnappi17 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,84 @@ void testCreateAndDeleteBranch() throws Exception {
"CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName));
assertThat(branchManager.branchExists("branch_name")).isFalse();
}

@Test
void testCreateAndDeleteBranchWithSnapshotId() throws Exception {

init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

BranchManager branchManager = table.branchManager();

callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name_with_snapshotId', 2)",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue();
branchManager.branches();

callProcedure(
String.format(
"CALL sys.delete_branch('%s.%s', 'branch_name_with_snapshotId')",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();
}

@Test
void testCreateAndDeleteEmptyBranch() throws Exception {

init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

BranchManager branchManager = table.branchManager();
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'empty_branch_name')",
database, tableName));
assertThat(branchManager.branchExists("empty_branch_name")).isTrue();

callProcedure(
String.format(
"CALL sys.delete_branch('%s.%s', 'empty_branch_name')",
database, tableName));
assertThat(branchManager.branchExists("empty_branch_name")).isFalse();
}
}
Loading