Skip to content

Commit

Permalink
[core][branch] Add procedure in flink for create/delete branch (#2725)
Browse files Browse the repository at this point in the history
* [core][branch] Add procedure in flink for create/delete branch
  • Loading branch information
FangYongs authored Jan 19, 2024
1 parent 0050167 commit 394ee4f
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Create branch procedure for given tag. Usage:
*
* <pre><code>
* CALL sys.create_branch('tableId', 'branchName', 'tagName')
* </code></pre>
*/
public class CreateBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "create_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(
ProcedureContext procedureContext, String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
return innerCall(tableId, branchName, tagName);
}

private String[] innerCall(String tableId, String branchName, String tagName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.createBranch(branchName, tagName);
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Delete branch procedure. Usage:
*
* <pre><code>
* CALL sys.delete_branch('tableId', 'branchName')
* </code></pre>
*/
public class DeleteBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "delete_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.deleteBranch(branchName);

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ org.apache.paimon.flink.procedure.CompactDatabaseProcedure
org.apache.paimon.flink.procedure.CompactProcedure
org.apache.paimon.flink.procedure.CreateTagProcedure
org.apache.paimon.flink.procedure.DeleteTagProcedure
org.apache.paimon.flink.procedure.CreateBranchProcedure
org.apache.paimon.flink.procedure.DeleteBranchProcedure
org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.TagManager;

import org.junit.jupiter.api.Test;

import java.util.Collections;

import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for branch management actions. */
class BranchActionITCase extends ActionITCaseBase {
@Test
void testCreateAndDeleteBranch() throws Exception {

init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

TagManager tagManager = new TagManager(table.fileIO(), table.location());
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
assertThat(tagManager.tagExists("tag2")).isTrue();

BranchManager branchManager = table.branchManager();
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')",
database, tableName));
assertThat(branchManager.branchExists("branch_name")).isTrue();

callProcedure(
String.format(
"CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName));
assertThat(branchManager.branchExists("branch_name")).isFalse();
}
}

0 comments on commit 394ee4f

Please sign in to comment.