Skip to content

Commit

Permalink
[flink] Add flink actions for branch (#3622)
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree authored Jun 28, 2024
1 parent e55884a commit 5bd7ee9
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 3 deletions.
50 changes: 50 additions & 0 deletions docs/content/maintenance/manage-branches.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ CALL sys.create_branch('default.T', 'branch1');
```
{{< /tab >}}

{{< tab "Flink Action Jar" >}}

Run the following command:

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
create_branch \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
--branch_name <branch-name> \
[--tag_name <tag-name>] \
[--snapshot <snapshot_id>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
{{< /tab >}}
{{< /tabs >}}
## Delete Branches
Expand All @@ -71,6 +89,22 @@ CALL sys.delete_branch('default.T', 'branch1');
```
{{< /tab >}}
{{< tab "Flink Action Jar" >}}
Run the following command:
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
delete_branch \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
--branch_name <branch-name> \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
{{< /tab >}}
{{< /tabs >}}
## Read / Write With Branch
Expand Down Expand Up @@ -107,4 +141,20 @@ CALL sys.merge_branch('default.T', 'branch1');
{{< /tab >}}
{{< tab "Flink Action Jar" >}}
Run the following command:
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
merge_branch \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
--branch_name <branch-name> \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```
{{< /tab >}}
{{< /tabs >}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.commons.lang3.StringUtils;

import java.util.Map;

/** Create branch action for Flink. */
public class CreateBranchAction extends TableActionBase {
private final String branchName;
private final String tagName;

private final Long snapshotId;

public CreateBranchAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String branchName,
String tagName,
Long snapshotId) {
super(warehouse, databaseName, tableName, catalogConfig);
this.branchName = branchName;
this.tagName = tagName;
this.snapshotId = snapshotId;
}

@Override
public void run() throws Exception {
if (!StringUtils.isBlank(tagName)) {
table.createBranch(branchName, tagName);
} else if (snapshotId != null) {
table.createBranch(branchName, snapshotId);
} else {
table.createBranch(branchName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link CreateBranchAction}. */
public class CreateBranchActionFactory implements ActionFactory {

public static final String IDENTIFIER = "create_branch";

private static final String TAG_NAME = "tag_name";
private static final String BRANCH_NAME = "branch_name";
private static final String SNAPSHOT = "snapshot";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, BRANCH_NAME);

Tuple3<String, String, String> tablePath = getTablePath(params);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

Long snapshot = null;
if (params.has(SNAPSHOT)) {
snapshot = Long.parseLong(params.get(SNAPSHOT));
}

String tagName = null;
if (params.has(TAG_NAME)) {
tagName = params.get(TAG_NAME);
}

String branchName = params.get(BRANCH_NAME);

CreateBranchAction action =
new CreateBranchAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
catalogConfig,
branchName,
tagName,
snapshot);
return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println("Action \"create_branch\" create a branch from given tag.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" create_branch --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --branch_name <branch_name> [--tag_name <tag_name>] [--snapshot <snapshot_id>]");
System.out.println();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;

/** Delete branch action for Flink. */
public class DeleteBranchAction extends TableActionBase {

private final String branchName;

public DeleteBranchAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String branchName) {
super(warehouse, databaseName, tableName, catalogConfig);
this.branchName = branchName;
}

@Override
public void run() throws Exception {
table.deleteBranch(branchName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link DeleteBranchAction}. */
public class DeleteBranchActionFactory implements ActionFactory {

public static final String IDENTIFIER = "delete_branch";

private static final String BRANCH_NAME = "branch_name";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
checkRequiredArgument(params, BRANCH_NAME);

Tuple3<String, String, String> tablePath = getTablePath(params);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String branchName = params.get(BRANCH_NAME);

DeleteBranchAction action =
new DeleteBranchAction(
tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, branchName);
return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println("Action \"delete_branch\" delete a branch by name.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" delete_branch --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --branch_name <branch_name>");
System.out.println();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;

/** Merge branch action for Flink. */
public class MergeBranchAction extends TableActionBase {
private final String branchName;

public MergeBranchAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String branchName) {
super(warehouse, databaseName, tableName, catalogConfig);
this.branchName = branchName;
}

@Override
public void run() throws Exception {
table.mergeBranch(branchName);
}
}
Loading

0 comments on commit 5bd7ee9

Please sign in to comment.