From 394ee4f8c46272a267df57b7384e665850aa1d16 Mon Sep 17 00:00:00 2001 From: Fang Yong Date: Fri, 19 Jan 2024 10:55:13 +0800 Subject: [PATCH] [core][branch] Add procedure in flink for create/delete branch (#2725) * [core][branch] Add procedure in flink for create/delete branch --- .../procedure/CreateBranchProcedure.java | 55 +++++++++++++ .../procedure/DeleteBranchProcedure.java | 50 ++++++++++++ .../org.apache.paimon.factories.Factory | 2 + .../flink/action/BranchActionITCase.java | 81 +++++++++++++++++++ 4 files changed, 188 insertions(+) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java new file mode 100644 index 000000000000..b870f088bf82 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Create branch procedure for given tag. Usage: + * + *

+ *  CALL sys.create_branch('tableId', 'branchName', 'tagName')
+ * 
+ */ +public class CreateBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "create_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call( + ProcedureContext procedureContext, String tableId, String branchName, String tagName) + throws Catalog.TableNotExistException { + return innerCall(tableId, branchName, tagName); + } + + private String[] innerCall(String tableId, String branchName, String tagName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.createBranch(branchName, tagName); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java new file mode 100644 index 000000000000..c4d04b5a1a41 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Delete branch procedure. Usage: + * + *

+ *  CALL sys.delete_branch('tableId', 'branchName')
+ * 
+ */ +public class DeleteBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "delete_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.deleteBranch(branchName); + + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 51c0f1ad1d4e..634bd2768a98 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -32,6 +32,8 @@ org.apache.paimon.flink.procedure.CompactDatabaseProcedure org.apache.paimon.flink.procedure.CompactProcedure org.apache.paimon.flink.procedure.CreateTagProcedure org.apache.paimon.flink.procedure.DeleteTagProcedure +org.apache.paimon.flink.procedure.CreateBranchProcedure +org.apache.paimon.flink.procedure.DeleteBranchProcedure org.apache.paimon.flink.procedure.DropPartitionProcedure org.apache.paimon.flink.procedure.MergeIntoProcedure org.apache.paimon.flink.procedure.ResetConsumerProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java new file mode 100644 index 000000000000..8d445ab95b07 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.TagManager; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for branch management actions. */ +class BranchActionITCase extends ActionITCaseBase { + @Test + void testCreateAndDeleteBranch() throws Exception { + + init(warehouse); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("branch_name")).isTrue(); + + callProcedure( + String.format( + "CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName)); + assertThat(branchManager.branchExists("branch_name")).isFalse(); + } +}