Skip to content

Commit

Permalink
add flink action branch
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Jun 27, 2024
1 parent 368cfd4 commit 60304fe
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 4 deletions.
26 changes: 26 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,32 @@ This section introduce all available spark procedures about paimon.
CALL sys.repair('test_db.T')
</td>
</tr>
<tr>
<td>create_branch</td>
<td>
To merge a branch to main branch. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>branch: name of the branch to be merged.</li>
<li>tag: name of the new tag. Cannot be empty.</li>
<li>snapshot(Long): id of the snapshot which the new tag is based on.</li>
</td>
<td>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch')<br/>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', tag => 'my_tag')<br/>
CALL sys.create_branch(table => 'test_db.T', branch => 'test_branch', snapshot => 10)
</td>
</tr>
<tr>
<td>delete_branch</td>
<td>
To merge a branch to main branch. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>branch: name of the branch to be merged.</li>
</td>
<td>
CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
</td>
</tr>
<tr>
<td>merge_branch</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** Spark procedure to create a branch. */
Expand All @@ -35,7 +36,8 @@ public class CreateBranchProcedure extends BaseProcedure {
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.required("branch", StringType),
ProcedureParameter.required("tag", StringType)
ProcedureParameter.optional("tag", StringType),
ProcedureParameter.optional("snapshot", LongType)
};

private static final StructType OUTPUT_TYPE =
Expand All @@ -62,12 +64,20 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String branch = args.getString(1);
String tag = args.getString(2);
String tag = args.isNullAt(2) ? null : args.getString(2);
Long snapshot = args.isNullAt(3) ? null : args.getLong(3);

return modifyPaimonTable(
tableIdent,
table -> {
table.createBranch(branch, tag);
if (tag != null) {
table.createBranch(branch, tag);
} else if (snapshot != null) {
table.createBranch(branch, snapshot);
} else {
table.createBranch(branch);
}

InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with Stream
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_tag") :: Nil)

// create branch
// create branch with tag
checkAnswer(
spark.sql(
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'test_branch', tag => 'test_tag')"),
Expand All @@ -86,6 +86,20 @@ class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with Stream
val branchManager = table.branchManager()
assert(branchManager.branchExists("test_branch"))

// create empty branch
checkAnswer(
spark.sql(
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'empty_branch')"),
Row(true) :: Nil)
assert(branchManager.branchExists("empty_branch"))

// create branch with snapshot
checkAnswer(
spark.sql(
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'snapshot_branch', snapshot => 2)"),
Row(true) :: Nil)
assert(branchManager.branchExists("snapshot_branch"))

// delete branch
checkAnswer(
spark.sql(
Expand Down

0 comments on commit 60304fe

Please sign in to comment.