Skip to content

Commit

Permalink
[core] Introduce deleteTags and deleteBranches to Table
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jul 5, 2024
1 parent 4088bc7 commit db8bcd7
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 33 deletions.
4 changes: 2 additions & 2 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ All available procedures are listed below.
<td>
To delete a tag. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>tagName: name of the tag to be deleted.If you specify multiple tags, delimiter is ','.</li>
<li>tagName: name of the tag to be deleted. If you specify multiple tags, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_tag('default.T', 'my_tag')
Expand Down Expand Up @@ -331,7 +331,7 @@ All available procedures are listed below.
<td>
To delete a branch. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>branchName: name of the branch to be deleted.If you specify multiple branches, delimiter is ','.</li>
<li>branchName: name of the branch to be deleted. If you specify multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch('default.T', 'branch1')
Expand Down
2 changes: 1 addition & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ This section introduce all available spark procedures about paimon.
<td>
To merge a branch to main branch. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>branch: name of the branch to be merged.If you specify multiple branches, delimiter is ','.</li>
<li>branch: name of the branch to be merged. If you specify multiple branches, delimiter is ','.</li>
</td>
<td>
CALL sys.delete_branch(table => 'test_db.T', branch => 'test_branch')
Expand Down
16 changes: 16 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ default String fullName() {
@Experimental
void deleteTag(String tagName);

/** Delete tags, tags are separated by commas. */
@Experimental
default void deleteTags(String tagNames) {
for (String tagName : tagNames.split(",")) {
deleteTag(tagName);
}
}

/** Rollback table's state to a specific tag. */
@Experimental
void rollbackTo(String tagName);
Expand All @@ -115,6 +123,14 @@ default String fullName() {
@Experimental
void deleteBranch(String branchName);

/** Delete branches, branches are separated by commas. */
@Experimental
default void deleteBranches(String branchNames) {
for (String branch : branchNames.split(",")) {
deleteBranch(branch);
}
}

/** Merge a branch to main branch. */
@Experimental
void fastForward(String branchName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ public DeleteBranchAction(

@Override
public void run() throws Exception {
String[] branches = branchNames.split(",");
for (String branch : branches) {
table.deleteBranch(branch);
}
table.deleteBranches(branchNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ public DeleteTagAction(

@Override
public void run() throws Exception {
String[] tagNames = tagNameStr.split(",");
for (String tagName : tagNames) {
table.deleteTag(tagName);
}
table.deleteTags(tagNameStr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

Expand All @@ -42,12 +41,7 @@ public String identifier() {

public String[] call(ProcedureContext procedureContext, String tableId, String branchStr)
throws Catalog.TableNotExistException {
String[] branchs = branchStr.split(",");
for (String branch : branchs) {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.deleteBranch(branch);
}

catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr);
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ public class DeleteTagProcedure extends ProcedureBase {
public String[] call(ProcedureContext procedureContext, String tableId, String tagNameStr)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
String[] tagNames = tagNameStr.split(",");
for (String tagName : tagNames) {
table.deleteTag(tagName);
}

table.deleteTags(tagNameStr);
return new String[] {"Success"};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,10 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String branchStr = args.getString(1);
String[] branches = branchStr.split(",");

return modifyPaimonTable(
tableIdent,
table -> {
for (String branch : branches) {
table.deleteBranch(branch);
}
table.deleteBranches(branchStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,10 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String tagStr = args.getString(1);
String[] tags = tagStr.split(",");

return modifyPaimonTable(
tableIdent,
table -> {
for (String tag : tags) {
table.deleteTag(tag);
}
table.deleteTags(tagStr);
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
});
Expand Down

0 comments on commit db8bcd7

Please sign in to comment.