From 5bd7ee99741998248f65c78bd66db684155d6868 Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Fri, 28 Jun 2024 17:25:57 +0800 Subject: [PATCH] [flink] Add flink actions for branch (#3622) --- docs/content/maintenance/manage-branches.md | 50 ++++++ .../flink/action/CreateBranchAction.java | 56 +++++++ .../action/CreateBranchActionFactory.java | 82 ++++++++++ .../flink/action/DeleteBranchAction.java | 42 +++++ .../action/DeleteBranchActionFactory.java | 63 ++++++++ .../flink/action/MergeBranchAction.java | 41 +++++ .../action/MergeBranchActionFactory.java | 63 ++++++++ .../org.apache.paimon.factories.Factory | 3 + .../flink/action/BranchActionITCase.java | 152 +++++++++++++++++- 9 files changed, 549 insertions(+), 3 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index 264ea506c932..2e9dead9be33 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -54,6 +54,24 @@ CALL sys.create_branch('default.T', 'branch1'); ``` {{< /tab >}} +{{< tab "Flink Action Jar" >}} + +Run the following command: + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + create_branch \ + --warehouse \ + --database \ + --table \ + --branch_name \ + [--tag_name ] \ + [--snapshot ] \ + [--catalog_conf [--catalog_conf ...]] +``` +{{< /tab >}} + {{< /tabs >}} ## Delete Branches @@ -71,6 +89,22 @@ CALL sys.delete_branch('default.T', 'branch1'); ``` {{< /tab >}} +{{< tab "Flink Action Jar" >}} + +Run the following command: + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + delete_branch \ + --warehouse \ + --database \ + --table \ + --branch_name \ + [--catalog_conf [--catalog_conf ...]] +``` +{{< /tab >}} + {{< /tabs >}} ## Read / Write With Branch @@ -107,4 +141,20 @@ CALL sys.merge_branch('default.T', 'branch1'); {{< /tab >}} +{{< tab "Flink Action Jar" >}} + +Run the following command: + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + merge_branch \ + --warehouse \ + --database \ + --table \ + --branch_name \ + [--catalog_conf [--catalog_conf ...]] +``` +{{< /tab >}} + {{< /tabs >}} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java new file mode 100644 index 000000000000..504f493ef6f9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchAction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; + +/** Create branch action for Flink. */ +public class CreateBranchAction extends TableActionBase { + private final String branchName; + private final String tagName; + + private final Long snapshotId; + + public CreateBranchAction( + String warehouse, + String databaseName, + String tableName, + Map catalogConfig, + String branchName, + String tagName, + Long snapshotId) { + super(warehouse, databaseName, tableName, catalogConfig); + this.branchName = branchName; + this.tagName = tagName; + this.snapshotId = snapshotId; + } + + @Override + public void run() throws Exception { + if (!StringUtils.isBlank(tagName)) { + table.createBranch(branchName, tagName); + } else if (snapshotId != null) { + table.createBranch(branchName, snapshotId); + } else { + table.createBranch(branchName); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java new file mode 100644 index 000000000000..2a093e16d8aa --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateBranchActionFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link CreateBranchAction}. */ +public class CreateBranchActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "create_branch"; + + private static final String TAG_NAME = "tag_name"; + private static final String BRANCH_NAME = "branch_name"; + private static final String SNAPSHOT = "snapshot"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + checkRequiredArgument(params, BRANCH_NAME); + + Tuple3 tablePath = getTablePath(params); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + + Long snapshot = null; + if (params.has(SNAPSHOT)) { + snapshot = Long.parseLong(params.get(SNAPSHOT)); + } + + String tagName = null; + if (params.has(TAG_NAME)) { + tagName = params.get(TAG_NAME); + } + + String branchName = params.get(BRANCH_NAME); + + CreateBranchAction action = + new CreateBranchAction( + tablePath.f0, + tablePath.f1, + tablePath.f2, + catalogConfig, + branchName, + tagName, + snapshot); + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println("Action \"create_branch\" create a branch from given tag."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " create_branch --warehouse --database " + + "--table --branch_name [--tag_name ] [--snapshot ]"); + System.out.println(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java new file mode 100644 index 000000000000..b167f7b969b6 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchAction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; + +/** Delete branch action for Flink. */ +public class DeleteBranchAction extends TableActionBase { + + private final String branchName; + + public DeleteBranchAction( + String warehouse, + String databaseName, + String tableName, + Map catalogConfig, + String branchName) { + super(warehouse, databaseName, tableName, catalogConfig); + this.branchName = branchName; + } + + @Override + public void run() throws Exception { + table.deleteBranch(branchName); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java new file mode 100644 index 000000000000..33f1c7990683 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteBranchActionFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link DeleteBranchAction}. */ +public class DeleteBranchActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "delete_branch"; + + private static final String BRANCH_NAME = "branch_name"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + checkRequiredArgument(params, BRANCH_NAME); + + Tuple3 tablePath = getTablePath(params); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + String branchName = params.get(BRANCH_NAME); + + DeleteBranchAction action = + new DeleteBranchAction( + tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, branchName); + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println("Action \"delete_branch\" delete a branch by name."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " delete_branch --warehouse --database " + + "--table --branch_name "); + System.out.println(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java new file mode 100644 index 000000000000..405f80f8fa9d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; + +/** Merge branch action for Flink. */ +public class MergeBranchAction extends TableActionBase { + private final String branchName; + + public MergeBranchAction( + String warehouse, + String databaseName, + String tableName, + Map catalogConfig, + String branchName) { + super(warehouse, databaseName, tableName, catalogConfig); + this.branchName = branchName; + } + + @Override + public void run() throws Exception { + table.mergeBranch(branchName); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java new file mode 100644 index 000000000000..0916079a728f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeBranchActionFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link MergeBranchAction}. */ +public class MergeBranchActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "merge_branch"; + + private static final String BRANCH_NAME = "branch_name"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + checkRequiredArgument(params, BRANCH_NAME); + + Tuple3 tablePath = getTablePath(params); + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + String branchName = params.get(BRANCH_NAME); + + MergeBranchAction action = + new MergeBranchAction( + tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, branchName); + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println("Action \"merge_branch\" merge a branch by name."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " merge_branch --warehouse --database " + + "--table --branch_name "); + System.out.println(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index f61df27a6acb..6d6eb7aa2d7a 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -30,6 +30,9 @@ org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory org.apache.paimon.flink.action.QueryServiceActionFactory org.apache.paimon.flink.action.ExpirePartitionsActionFactory org.apache.paimon.flink.action.MarkPartitionDoneActionFactory +org.apache.paimon.flink.action.CreateBranchActionFactory +org.apache.paimon.flink.action.DeleteBranchActionFactory +org.apache.paimon.flink.action.MergeBranchActionFactory ### procedure factories org.apache.paimon.flink.procedure.CompactDatabaseProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index f3d8d391b169..abbddfe39797 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -87,6 +87,36 @@ void testCreateAndDeleteBranch() throws Exception { String.format( "CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName)); assertThat(branchManager.branchExists("branch_name")).isFalse(); + + createAction( + CreateBranchAction.class, + "create_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "branch_name", + "--tag_name", + "tag2") + .run(); + assertThat(branchManager.branchExists("branch_name")).isTrue(); + + createAction( + DeleteBranchAction.class, + "delete_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "branch_name") + .run(); + assertThat(branchManager.branchExists("branch_name")).isFalse(); } @Test @@ -128,6 +158,36 @@ void testCreateAndDeleteBranchWithSnapshotId() throws Exception { "CALL sys.delete_branch('%s.%s', 'branch_name_with_snapshotId')", database, tableName)); assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse(); + + createAction( + CreateBranchAction.class, + "create_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "branch_name_with_snapshotId", + "--snapshot", + "2") + .run(); + assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isTrue(); + + createAction( + DeleteBranchAction.class, + "delete_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "branch_name_with_snapshotId") + .run(); + assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse(); } @Test @@ -168,6 +228,34 @@ void testCreateAndDeleteEmptyBranch() throws Exception { "CALL sys.delete_branch('%s.%s', 'empty_branch_name')", database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); + + createAction( + CreateBranchAction.class, + "create_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "empty_branch_name") + .run(); + assertThat(branchManager.branchExists("empty_branch_name")).isTrue(); + + createAction( + DeleteBranchAction.class, + "delete_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "empty_branch_name") + .run(); + assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); } @Test @@ -199,6 +287,10 @@ void testMergeBranch() throws Exception { callProcedure( String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); assertThat(tagManager.tagExists("tag2")).isTrue(); + // Create tag3 + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); + assertThat(tagManager.tagExists("tag3")).isTrue(); // Create merge_branch_name branch BranchManager branchManager = table.branchManager(); @@ -207,8 +299,24 @@ void testMergeBranch() throws Exception { "CALL sys.create_branch('%s.%s', 'merge_branch_name', 'tag2')", database, tableName)); assertThat(branchManager.branchExists("merge_branch_name")).isTrue(); - - // Merge branch + // Create merge_branch_name_action branch + createAction( + CreateBranchAction.class, + "create_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "merge_branch_name_action", + "--tag_name", + "tag3") + .run(); + assertThat(branchManager.branchExists("merge_branch_name_action")).isTrue(); + + // Merge branch merge_branch_name callProcedure( String.format( "CALL sys.merge_branch('%s.%s', 'merge_branch_name')", @@ -218,6 +326,23 @@ void testMergeBranch() throws Exception { SnapshotManager snapshotManager = table.snapshotManager(); assertThat(snapshotManager.snapshotExists(3)).isFalse(); + // Merge branch merge_branch_name_action + createAction( + MergeBranchAction.class, + "merge_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "merge_branch_name_action") + .run(); + + // Check snapshot + assertThat(snapshotManager.snapshotExists(3)).isTrue(); + // Renew write write = writeBuilder.newWrite(); commit = writeBuilder.newCommit(); @@ -234,6 +359,7 @@ void testMergeBranch() throws Exception { Arrays.asList( "+I[1, Hi]", "+I[2, Hello]", + "+I[3, Paimon]", "+I[4, new.data_4]", "+I[5, new.data_5]", "+I[6, new.data_6]", @@ -246,7 +372,7 @@ void testMergeBranch() throws Exception { "+I[13, new.data_13]"); Assert.assertEquals(expected, sortedActual); - // Merge branch again + // Merge branch merge_branch_name again callProcedure( String.format( "CALL sys.merge_branch('%s.%s', 'merge_branch_name')", @@ -257,6 +383,26 @@ void testMergeBranch() throws Exception { sortedActual = new ArrayList<>(result); expected = Arrays.asList("+I[1, Hi]", "+I[2, Hello]"); Assert.assertEquals(expected, sortedActual); + + // Merge branch merge_branch_name_action again + createAction( + MergeBranchAction.class, + "merge_branch", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--branch_name", + "merge_branch_name_action") + .run(); + + // Check main branch data + result = readTableData(table); + sortedActual = new ArrayList<>(result); + expected = Arrays.asList("+I[1, Hi]", "+I[2, Hello]", "+I[3, Paimon]"); + Assert.assertEquals(expected, sortedActual); } List readTableData(FileStoreTable table) throws Exception {