Skip to content

Commit

Permalink
Manage main branch for paimon
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed May 28, 2024
1 parent 483a69c commit e683082
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ public void deleteBranch(String branchName) {
wrapped.deleteBranch(branchName);
}

/**
* Replace main branch.
*
* @param branchName
*/
@Override
public void replaceMainBranch(String branchName) {
privilegeChecker.assertCanInsert(identifier);
wrapped.replaceMainBranch(branchName);
}

@Override
public void cleanMainBranchFile() {
privilegeChecker.assertCanInsert(identifier);
wrapped.cleanMainBranchFile();
}

@Override
public void mainBranch() {
privilegeChecker.assertCanInsert(identifier);
wrapped.mainBranch();
}

@Override
public void replaceBranch(String fromBranch) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,20 @@ public void replaceBranch(String fromBranch) {
branchManager().replaceBranch(fromBranch);
}

public void cleanMainBranchFile() {
branchManager().cleanMainBranchFile();
}

@Override
public void replaceMainBranch(String branchName) {
branchManager().commitMainBranch(branchName);
}

@Override
public void mainBranch() {
branchManager().mainBranch();
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,29 @@ default void replaceBranch(String fromBranch) {
this.getClass().getSimpleName()));
}

default void mainBranch() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mainBranch.",
this.getClass().getSimpleName()));
}

@Override
default void cleanMainBranchFile() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support cleanMainBranchFile.",
this.getClass().getSimpleName()));
}

@Override
default void replaceMainBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support setMainBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
10 changes: 10 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ public interface Table extends Serializable {
@Experimental
void replaceBranch(String fromBranch);

/** Replace main branch. */
@Experimental
void replaceMainBranch(String branchName);

@Experimental
void cleanMainBranchFile();

@Experimental
void mainBranch();

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ public BranchManager(
}

/** Commit specify branch to main. */
public void commitMainBranch(String branchName) throws IOException {
public void commitMainBranch(String branchName) {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
try {
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Return the root Directory of branch. */
Expand Down Expand Up @@ -124,12 +128,35 @@ public String defaultMainBranch() {
throw new RuntimeException(e);
}
}
/** Get main branch. */
public String mainBranch() {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
return fileIO.readFileUtf8(path);
} else {
return DEFAULT_MAIN_BRANCH;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Return the path of a branch. */
public Path branchPath(String branchName) {
return new Path(getBranchPath(fileIO, tablePath, branchName));
}

/** Clean the main branch file and use default. */
public void cleanMainBranchFile() {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
try {
fileIO.delete(mainBranchFile, false);
} catch (IOException e) {
throw new RuntimeException("Exception occurs when clean main branch file.", e);
}
}

/** Create empty branch. */
public void createBranch(String branchName) {
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Clean main branch procedure. Usage:
*
* <pre><code>
* CALL sys.clean_main_branch('tableId')
* </code></pre>
*/
public class CleanMainBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "clean_main_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.cleanMainBranchFile();

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Replace main branch procedure. Usage:
*
* <pre><code>
* CALL sys.replace_main_branch('tableId', 'branchName')
* </code></pre>
*/
public class ReplaceMainBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "replace_main_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.replaceMainBranch(branchName);

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.ReplaceBranchProcedure
org.apache.paimon.flink.procedure.ReplaceMainBranchProcedure
org.apache.paimon.flink.procedure.CleanMainBranchProcedure
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,52 @@ void testReplaceBranch() throws Exception {
String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName));
assertThat(tagManager.tagExists("tag3")).isTrue();
}

@Test
void testReplaceMainBranchAndCleanMainBranch() throws Exception {

init(warehouse);
RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyList(),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

// Create tag2
TagManager tagManager = new TagManager(table.fileIO(), table.location());
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
assertThat(tagManager.tagExists("tag2")).isTrue();

BranchManager branchManager = table.branchManager();
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')",
database, tableName));
assertThat(branchManager.branchExists("branch_name")).isTrue();

callProcedure(
String.format(
"CALL sys.replace_main_branch('%s.%s', 'branch_name')",
database, tableName));
assertThat(branchManager.mainBranch()).isEqualTo("branch_name");

callProcedure(String.format("CALL sys.clean_main_branch('%s.%s')", database, tableName));
assertThat(branchManager.mainBranch()).isEqualTo("main");
}
}

0 comments on commit e683082

Please sign in to comment.