Skip to content

Commit

Permalink
Manage main branch for paimon
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed May 8, 2024
1 parent 4b86018 commit 8e4cb76
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ public Optional<TableSchema> latest() {
}

public Optional<TableSchema> latest(String branchName) {
Path directoryPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? schemaDirectory()
: branchSchemaDirectory(branchName);
Path directoryPath = schemaDirectory(branchName);
try {
return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
.reduce(Math::max)
Expand Down Expand Up @@ -498,21 +495,24 @@ public static TableSchema fromPath(FileIO fileIO, Path path) {
}

private Path schemaDirectory() {
return new Path(tableRoot + "/schema");
return schemaDirectory(DEFAULT_MAIN_BRANCH);
}

@VisibleForTesting
public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
return toSchemaPath(DEFAULT_MAIN_BRANCH, id);
}

public Path branchSchemaDirectory(String branchName) {
return new Path(getBranchPath(tableRoot, branchName) + "/schema");
public Path schemaDirectory(String branchName) {
return new Path(getBranchPath(fileIO, tableRoot, branchName) + "/schema");
}

public Path branchSchemaPath(String branchName, long schemaId) {
public Path toSchemaPath(String branchName, long schemaId) {
return new Path(
getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
getBranchPath(fileIO, tableRoot, branchName)
+ "/schema/"
+ SCHEMA_PREFIX
+ schemaId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,21 @@ public void deleteBranch(String branchName) {
branchManager().deleteBranch(branchName);
}

@Override
public void cleanMainBranchFile() {
branchManager().cleanMainBranchFile();
}

@Override
public void replaceMainBranch(String branchName) {
branchManager().commitMainBranch(branchName);
}

@Override
public void mainBranch() {
branchManager().mainBranch();
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,30 @@ default void deleteBranch(String branchName) {
this.getClass().getSimpleName()));
}

@Override
default void mainBranch() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mainBranch.",
this.getClass().getSimpleName()));
}

@Override
default void cleanMainBranchFile() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support cleanMainBranchFile.",
this.getClass().getSimpleName()));
}

@Override
default void replaceMainBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support setMainBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
8 changes: 8 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public interface Table extends Serializable {
@Experimental
void deleteBranch(String branchName);

/** Replace main branch. */
@Experimental
void replaceMainBranch(String branchName);

void cleanMainBranchFile();

void mainBranch();

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class BranchManager {

public static final String BRANCH_PREFIX = "branch-";
public static final String DEFAULT_MAIN_BRANCH = "main";
public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH";

private final FileIO fileIO;
private final Path tablePath;
Expand All @@ -75,13 +76,69 @@ public Path branchDirectory() {
}

/** Return the path string of a branch. */
public static String getBranchPath(Path tablePath, String branchName) {
public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) {
if (branchName.equals(DEFAULT_MAIN_BRANCH)) {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
String data = fileIO.readFileUtf8(path);
if (StringUtils.isBlank(data)) {
return tablePath.toString();
} else {
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data;
}
} else {
return tablePath.toString();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName;
}

/** Get main branch. */
public String mainBranch() {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
return fileIO.readFileUtf8(path);
} else {
return DEFAULT_MAIN_BRANCH;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Return the path of a branch. */
public Path branchPath(String branchName) {
return new Path(getBranchPath(tablePath, branchName));
return new Path(getBranchPath(fileIO, tablePath, branchName));
}

/** Replace main by specify branch. */
public void commitMainBranch(String branchName) {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
try {
fileIO.delete(mainBranchFile, false);
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when set main branch '%s' (directory in %s).",
branchName, tablePath.toString()),
e);
}
}

/** Clean the main branch file and use default. */
public void cleanMainBranchFile() {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
try {
fileIO.delete(mainBranchFile, false);
} catch (IOException e) {
throw new RuntimeException("Exception occurs when clean main branch file.", e);
}
}

/** Create empty branch. */
Expand All @@ -101,12 +158,12 @@ public void createBranch(String branchName) {
TableSchema latestSchema = schemaManager.latest().get();
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(latestSchema.id()),
schemaManager.branchSchemaPath(branchName, latestSchema.id()));
schemaManager.toSchemaPath(branchName, latestSchema.id()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand All @@ -133,12 +190,12 @@ public void createBranch(String branchName, long snapshotId) {
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand All @@ -162,18 +219,18 @@ public void createBranch(String branchName, String tagName) {
try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName));
tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Exception occurs when create branch '%s' (directory in %s).",
branchName, getBranchPath(tablePath, branchName)),
branchName, getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand All @@ -187,7 +244,7 @@ public void deleteBranch(String branchName) {
LOG.info(
String.format(
"Deleting the branch failed due to an exception in deleting the directory %s. Please try again.",
getBranchPath(tablePath, branchName)),
getBranchPath(fileIO, tablePath, branchName)),
e);
}
}
Expand Down Expand Up @@ -239,8 +296,7 @@ public List<TableBranch> branches() {
}
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(getBranchPath(tablePath, branchName)));

fileIO, new Path(getBranchPath(fileIO, tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,15 @@ public Path snapshotPath(long snapshotId) {
}

public Path branchSnapshotDirectory(String branchName) {
return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot");
}

public Path branchSnapshotPath(String branchName, long snapshotId) {
return new Path(
getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
getBranchPath(fileIO, tablePath, branchName)
+ "/snapshot/"
+ SNAPSHOT_PREFIX
+ snapshotId);
}

public Path snapshotPathByBranch(String branchName, long snapshotId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -66,7 +67,11 @@ public TagManager(FileIO fileIO, Path tablePath) {

/** Return the root Directory of tags. */
public Path tagDirectory() {
return new Path(tablePath + "/tag");
return tagDirectory(DEFAULT_MAIN_BRANCH);
}

public Path tagDirectory(String branchName) {
return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag");
}

/** Return the path of a tag. */
Expand All @@ -75,8 +80,9 @@ public Path tagPath(String tagName) {
}

/** Return the path of a tag in branch. */
public Path branchTagPath(String branchName, String tagName) {
return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
public Path tagPath(String branchName, String tagName) {
return new Path(
getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
}

/** Create a tag from given snapshot and save it in the storage. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ public void testCreateBranch() throws Exception {
// verify test-tag in test-branch is equal to snapshot 2
Snapshot branchTag =
Snapshot.fromPath(
new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag"));
new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag"));
assertThat(branchTag.equals(snapshot2)).isTrue();

// verify snapshot in test-branch is equal to snapshot 2
Expand All @@ -1034,7 +1034,7 @@ public void testCreateBranch() throws Exception {
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
TableSchema branchSchema =
SchemaManager.fromPath(
new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0));
new TraceableFileIO(), schemaManager.toSchemaPath("test-branch", 0));
TableSchema schema0 = schemaManager.schema(0);
assertThat(branchSchema.equals(schema0)).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Clean main branch procedure. Usage:
*
* <pre><code>
* CALL sys.clean_main_branch('tableId')
* </code></pre>
*/
public class CleanMainBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "clean_main_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.cleanMainBranchFile();

return new String[] {"Success"};
}
}
Loading

0 comments on commit 8e4cb76

Please sign in to comment.