Skip to content

Commit

Permalink
[procedure] Support multiple tags and branches delete (apache#3684)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Jul 5, 2024
1 parent 4a89dd9 commit 4088bc7
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 19 deletions.
4 changes: 2 additions & 2 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ All available procedures are listed below.
<td>
To delete a tag. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>tagName: name of the tag to be deleted.</li>
<li>tagName: name of the tag to be deleted.If you specify multiple tags, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_tag('default.T', 'my_tag')
Expand Down Expand Up @@ -331,7 +331,7 @@ All available procedures are listed below.
<td>
To delete a branch. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>branchName: name of the branch to be deleted.</li>
<li>branchName: name of the branch to be deleted.If you specify multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch('default.T', 'branch1')
Expand Down
2 changes: 1 addition & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ This section introduce all available spark procedures about paimon.
<td>
To merge a branch to main branch. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>branch: name of the branch to be merged.</li>
<li>branch: name of the branch to be merged.If you specify multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,23 @@
/** Delete branch action for Flink. */
public class DeleteBranchAction extends TableActionBase {

private final String branchName;
private final String branchNames;

public DeleteBranchAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String branchName) {
String branchNames) {
super(warehouse, databaseName, tableName, catalogConfig);
this.branchName = branchName;
this.branchNames = branchNames;
}

@Override
public void run() throws Exception {
table.deleteBranch(branchName);
String[] branches = branchNames.split(",");
for (String branch : branches) {
table.deleteBranch(branch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,23 @@
/** Delete tag action for Flink. */
public class DeleteTagAction extends TableActionBase {

private final String tagName;
private final String tagNameStr;

public DeleteTagAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
String tagName) {
String tagNameStr) {
super(warehouse, databaseName, tableName, catalogConfig);
this.tagName = tagName;
this.tagNameStr = tagNameStr;
}

@Override
public void run() throws Exception {
table.deleteTag(tagName);
String[] tagNames = tagNameStr.split(",");
for (String tagName : tagNames) {
table.deleteTag(tagName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
public String[] call(ProcedureContext procedureContext, String tableId, String branchStr)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.deleteBranch(branchName);
String[] branchs = branchStr.split(",");
for (String branch : branchs) {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.deleteBranch(branch);
}

return new String[] {"Success"};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ public class DeleteTagProcedure extends ProcedureBase {

public static final String IDENTIFIER = "delete_tag";

public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
public String[] call(ProcedureContext procedureContext, String tableId, String tagNameStr)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.deleteTag(tagName);
String[] tagNames = tagNameStr.split(",");
for (String tagName : tagNames) {
table.deleteTag(tagName);
}

return new String[] {"Success"};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,27 @@ void testCreateAndDeleteBranchWithSnapshotId() throws Exception {
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId")).isFalse();

// create branch1 and branch3
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name_with_snapshotId_1', 1)",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isTrue();

callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name_with_snapshotId_3', 3)",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isTrue();

// delete branch1 and branch3 batch
callProcedure(
String.format(
"CALL sys.delete_branch('%s.%s', 'branch_name_with_snapshotId_1,branch_name_with_snapshotId_3')",
database, tableName));
assertThat(branchManager.branchExists("branch_name_with_snapshotId_1")).isFalse();
assertThat(branchManager.branchExists("branch_name_with_snapshotId_3")).isFalse();

createAction(
CreateBranchAction.class,
"create_branch",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,69 @@ public void testCreateAndDeleteTag() throws Exception {
String.format("CALL sys.delete_tag('%s.%s', 'tag2')", database, tableName));
}
assertThat(tagManager.tagExists("tag2")).isFalse();

// create tag1
if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
CreateTagAction.class,
"create_tag",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--tag_name",
"tag1",
"--snapshot",
"1")
.run();
} else {
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag1', 1)", database, tableName));
}

// create tag3
if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
CreateTagAction.class,
"create_tag",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--tag_name",
"tag3",
"--snapshot",
"3")
.run();
} else {
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName));
}

if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
DeleteTagAction.class,
"delete_tag",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--tag_name",
"tag1,tag3")
.run();
} else {
callProcedure(
String.format(
"CALL sys.delete_tag('%s.%s', 'tag1,tag3')", database, tableName));
}
assertThat(tagManager.tagExists("tag1")).isFalse();
assertThat(tagManager.tagExists("tag3")).isFalse();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String branch = args.getString(1);
String branchStr = args.getString(1);
String[] branches = branchStr.split(",");

return modifyPaimonTable(
tableIdent,
table -> {
table.deleteBranch(branch);
for (String branch : branches) {
table.deleteBranch(branch);
}
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with Stream
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)

// create tag
// create tags
checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"),
Expand Down Expand Up @@ -106,6 +106,29 @@ class CreateAndDeleteBranchProcedureTest extends PaimonSparkTestBase with Stream
"CALL paimon.sys.delete_branch(table => 'test.T', branch => 'test_branch')"),
Row(true) :: Nil)
assert(!branchManager.branchExists("test_branch"))

// create branch with snapshot2
checkAnswer(
spark.sql(
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'snapshot_branch_2', snapshot => 2)"),
Row(true) :: Nil)
assert(branchManager.branchExists("snapshot_branch_2"))

// create branch with snapshot3
checkAnswer(
spark.sql(
"CALL paimon.sys.create_branch(table => 'test.T', branch => 'snapshot_branch_3', snapshot => 3)"),
Row(true) :: Nil)
assert(branchManager.branchExists("snapshot_branch_3"))

// delete branch:snapshot_branch_2 and snapshot_branch_3
checkAnswer(
spark.sql(
"CALL paimon.sys.delete_branch(table => 'test.T', branch => 'snapshot_branch_2,snapshot_branch_3')"),
Row(true) :: Nil)
assert(!branchManager.branchExists("snapshot_branch_2"))
assert(!branchManager.branchExists("snapshot_branch_3"))

} finally {
stream.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,32 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_latestSnapshot_tag')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)

// create test_tag_1 and test_tag_2
checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(" +
"table => 'test.T', tag => 'test_tag_1', snapshot => 1)"),
Row(true) :: Nil)

checkAnswer(
spark.sql(
"CALL paimon.sys.create_tag(" +
"table => 'test.T', tag => 'test_tag_2', snapshot => 2)"),
Row(true) :: Nil)

checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_tag_1") :: Row("test_tag_2") :: Nil)

// delete test_tag_1 and test_tag_2
checkAnswer(
spark.sql(
"CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_1,test_tag_2')"),
Row(true) :: Nil)

checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)

} finally {
stream.stop()
}
Expand Down

0 comments on commit 4088bc7

Please sign in to comment.