Skip to content

Commit

Permalink
[Core] Support create and delete branch (apache#2703)
Browse files Browse the repository at this point in the history
* [Core]Support create and delete branch
  • Loading branch information
TaoZex authored Jan 17, 2024
1 parent 7df96bb commit 996eda6
Show file tree
Hide file tree
Showing 14 changed files with 372 additions and 0 deletions.
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ default void overwriteFileUtf8(Path path, String content) throws IOException {
}
}

/**
* Read file to UTF_8 decoding, then write content to one file atomically, initially writes to
* temp hidden file and only renames to the target file once temp file is closed.
*
* @return false if targetPath file exists
*/
default boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOException {
String content = readFileUtf8(sourcePath);
return writeFileUtf8(targetPath, content);
}

/** Read file from {@link #overwriteFileUtf8} file. */
default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
int retryNumber = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;

Expand Down Expand Up @@ -462,6 +463,14 @@ public TableSchema schema(long id) {
}
}

public static TableSchema fromPath(FileIO fileIO, Path path) {
try {
return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path), TableSchema.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private Path schemaDirectory() {
return new Path(tableRoot + "/schema");
}
Expand All @@ -471,6 +480,11 @@ public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}

public Path branchSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
}

/**
* Delete schema with specific id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -379,6 +380,16 @@ public void deleteTag(String tagName) {
tagManager().deleteTag(tagName, store().newTagDeletion(), snapshotManager());
}

@Override
public void createBranch(String branchName, String tagName) {
branchManager().createBranch(branchName, tagName);
}

@Override
public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down Expand Up @@ -409,6 +420,11 @@ public TagManager tagManager() {
return new TagManager(fileIO, path);
}

@Override
public BranchManager branchManager() {
return new BranchManager(fileIO, path, snapshotManager(), tagManager(), schemaManager());
}

private RollbackHelper rollbackHelper() {
return new RollbackHelper(
snapshotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand All @@ -36,6 +37,8 @@ public interface DataTable extends InnerTable {

TagManager tagManager();

BranchManager branchManager();

Path location();

FileIO fileIO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,20 @@ default void rollbackTo(String tagName) {
"Readonly Table %s does not support rollbackTo tag.",
this.getClass().getSimpleName()));
}

@Override
default void createBranch(String branchName, String tagName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createBranch.",
this.getClass().getSimpleName()));
}

@Override
default void deleteBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support deleteBranch.",
this.getClass().getSimpleName()));
}
}
8 changes: 8 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ public interface Table extends Serializable {
@Experimental
void rollbackTo(String tagName);

/** Create a branch from given tag. */
@Experimental
void createBranch(String branchName, String tagName);

/** Delete a branch by branchName. */
@Experimental
void deleteBranch(String branchName);

// =============== Read & Write Operations ==================

/** Returns a new read builder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -160,6 +161,11 @@ public TagManager tagManager() {
return dataTable.tagManager();
}

@Override
public BranchManager branchManager() {
return dataTable.branchManager();
}

@Override
public InnerTableRead newRead() {
return new AuditLogRead(dataTable.newRead());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -117,6 +118,11 @@ public TagManager tagManager() {
return wrapped.tagManager();
}

@Override
public BranchManager branchManager() {
return wrapped.branchManager();
}

@Override
public String name() {
return "__internal_buckets_" + wrapped.location().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -104,6 +105,11 @@ public TagManager tagManager() {
return wrapped.tagManager();
}

@Override
public BranchManager branchManager() {
return wrapped.branchManager();
}

@Override
public String name() {
return "__internal_file_monitor_" + wrapped.location().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.source.InnerTableScanImpl;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -136,6 +137,11 @@ public TagManager tagManager() {
return dataTable.tagManager();
}

@Override
public BranchManager branchManager() {
return dataTable.branchManager();
}

@Override
public InnerTableRead newRead() {
return dataTable.newRead();
Expand Down
136 changes: 136 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Manager for {@code Branch}. */
public class BranchManager {

private static final Logger LOG = LoggerFactory.getLogger(BranchManager.class);

public static final String BRANCH_PREFIX = "branch-";

private final FileIO fileIO;
private final Path tablePath;
private final SnapshotManager snapshotManager;
private final TagManager tagManager;
private final SchemaManager schemaManager;

public BranchManager(
FileIO fileIO,
Path path,
SnapshotManager snapshotManager,
TagManager tagManager,
SchemaManager schemaManager) {
this.fileIO = fileIO;
this.tablePath = path;
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.schemaManager = schemaManager;
}

/** Return the root Directory of branch. */
public Path branchDirectory() {
return new Path(tablePath + "/branch");
}

/** Return the path string of a branch. */
public static String getBranchPath(Path tablePath, String branchName) {
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
}

/** Return the path of a branch. */
public Path branchPath(String branchName) {
return new Path(getBranchPath(tablePath, branchName));
}

public void createBranch(String branchName, String tagName) {
checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);

Snapshot snapshot = tagManager.taggedSnapshot(tagName);

try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName));
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
e);
}
}

public void deleteBranch(String branchName) {
checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName);
try {
// Delete branch directory
fileIO.delete(branchPath(branchName), true);
} catch (IOException e) {
LOG.info(
String.format(
"Deleting the branch failed due to an exception in deleting the directory %s. Please try again.",
getBranchPath(tablePath, branchName)),
e);
}
}

/** Check if path exists. */
public boolean fileExists(Path path) {
try {
if (fileIO.exists(path)) {
return true;
}
return false;
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to determine if path '%s' exists.", path), e);
}
}

/** Check if a branch exists. */
public boolean branchExists(String branchName) {
Path branchPath = branchPath(branchName);
return fileExists(branchPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;

/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
Expand Down Expand Up @@ -80,6 +81,11 @@ public Path snapshotPath(long snapshotId) {
return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
}

public Path branchSnapshotPath(String branchName, long snapshotId) {
return new Path(
getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
}

public Snapshot snapshot(long snapshotId) {
return Snapshot.fromPath(fileIO, snapshotPath(snapshotId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -66,6 +67,11 @@ public Path tagPath(String tagName) {
return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName);
}

/** Return the path of a tag in branch. */
public Path branchTagPath(String branchName, String tagName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
}

/** Create a tag from given snapshot and save it in the storage. */
public void createTag(Snapshot snapshot, String tagName, List<TagCallback> callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
Expand Down
Loading

0 comments on commit 996eda6

Please sign in to comment.