diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 9f6e061c1df2..1557fd6a7ee5 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -109,6 +109,18 @@ This section introduce all available spark procedures about paimon. CALL sys.create_tag_from_timestamp(`table` => 'default.T', `tag` => 'my_tag', `timestamp` => 1724404318750, time_retained => '1 d') +
+ * CALL sys.rename_tag('tableId', 'tagName', 'targetTagName')
+ *
+ */
+public class RenameTagProcedure extends ProcedureBase {
+ private static final String IDENTIFIER = "rename_tag";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tagName", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "targetTagName", type = @DataTypeHint("STRING"))
+ })
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String tagName, String targetTagName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.renameTag(tagName, targetTagName);
+ String ret = String.format("Rename [%s] to [%s] successfully.", tagName, targetTagName);
+ return new String[] {ret};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 9eb450d33508..3ae35ade54bb 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -36,6 +36,7 @@ org.apache.paimon.flink.action.MarkPartitionDoneActionFactory
org.apache.paimon.flink.action.CreateBranchActionFactory
org.apache.paimon.flink.action.DeleteBranchActionFactory
org.apache.paimon.flink.action.FastForwardActionFactory
+org.apache.paimon.flink.action.RenameTagActionFactory
org.apache.paimon.flink.action.RepairActionFactory
org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
@@ -67,6 +68,7 @@ org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
+org.apache.paimon.flink.procedure.RenameTagProcedure
org.apache.paimon.flink.procedure.FastForwardProcedure
org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
org.apache.paimon.flink.procedure.CloneProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index aa98de38715b..7310a68df7a2 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -240,6 +240,101 @@ public void testCreateAndDeleteTag(String invoker) throws Exception {
assertThat(tagManager.tagExists("tag3")).isFalse();
}
+ @ParameterizedTest(name = "{0}")
+ @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
+ public void testRenameTag(String invoker) throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ TagManager tagManager = new TagManager(table.fileIO(), table.location());
+ switch (invoker) {
+ case "action":
+ createAction(
+ CreateTagAction.class,
+ "create_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--tag_name",
+ "tag2")
+ .run();
+ break;
+ case "procedure_indexed":
+ executeSQL(
+ String.format(
+ "CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
+ break;
+ case "procedure_named":
+ executeSQL(
+ String.format(
+ "CALL sys.create_tag(`table` => '%s.%s', tag => 'tag2', snapshot_id => cast(2 as bigint))",
+ database, tableName));
+ break;
+ default:
+ throw new UnsupportedOperationException(invoker);
+ }
+ assertThat(tagManager.tagExists("tag2")).isTrue();
+
+ switch (invoker) {
+ case "action":
+ createAction(
+ RenameTagAction.class,
+ "rename_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--tag_name",
+ "tag2",
+ "--target_tag_name",
+ "tag3")
+ .run();
+ break;
+ case "procedure_indexed":
+ executeSQL(
+ String.format(
+ "CALL sys.rename_tag('%s.%s', 'tag2', 'tag3')",
+ database, tableName));
+ break;
+ case "procedure_named":
+ executeSQL(
+ String.format(
+ "CALL sys.rename_tag(`table` => '%s.%s', tagName => 'tag2', targetTagName => 'tag3')",
+ database, tableName));
+ break;
+ default:
+ throw new UnsupportedOperationException(invoker);
+ }
+
+ assertThat(tagManager.tagExists("tag2")).isFalse();
+ assertThat(tagManager.tagExists("tag3")).isTrue();
+ }
+
@ParameterizedTest(name = "{0}")
@ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
public void testCreateLatestTag(String invoker) throws Exception {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagsProcedureITCase.java
index aef952b6c561..77e2b74c5d50 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagsProcedureITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagsProcedureITCase.java
@@ -58,6 +58,30 @@ public void testCreateTags() {
.containsExactlyInAnyOrder("+I[k, 2024-01-01]");
}
+ @Test
+ public void testRenameTag() {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+ sql("insert into T values('k', '2024-01-01')");
+ sql("insert into T values('k2', '2024-01-02')");
+
+ sql("CALL sys.create_tag('default.T', 'tag1')");
+
+ assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[tag1]");
+
+ sql("CALL sys.rename_tag('default.T', 'tag1', 'tag2')");
+
+ assertThat(sql("select tag_name from `T$tags`").stream().map(Row::toString))
+ .containsExactlyInAnyOrder("+I[tag2]");
+ }
+
@Test
public void testThrowSnapshotNotExistException() {
sql(
diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
index c4b861b29752..3f59e897ec6c 100644
--- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
+++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
@@ -75,8 +75,16 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
checkAnswer(
spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
Row("test_tag") :: Nil)
+ // test rename_tag
checkAnswer(
- spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag')"),
+ spark.sql(
+ "CALL paimon.sys.rename_tag(table => 'test.T', tag => 'test_tag', target_tag => 'test_tag_1')"),
+ Row(true) :: Nil)
+ checkAnswer(
+ spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),
+ Row("test_tag_1") :: Nil)
+ checkAnswer(
+ spark.sql("CALL paimon.sys.delete_tag(table => 'test.T', tag => 'test_tag_1')"),
Row(true) :: Nil)
checkAnswer(spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"), Nil)
checkAnswer(
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index b8e43c19c8a6..1d5bd9df2f86 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -34,6 +34,7 @@
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
+import org.apache.paimon.spark.procedure.RenameTagProcedure;
import org.apache.paimon.spark.procedure.RepairProcedure;
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
@@ -61,6 +62,7 @@ private static Map
+ * CALL sys.rename_tag(tag => 'tag0', target_tag => 'tag1')
+ *
+ */
+public class RenameTagProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.required("tag", StringType),
+ ProcedureParameter.required("target_tag", StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
+ });
+
+ protected RenameTagProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+ String tag = args.getString(1);
+ String targetTag = args.getString(2);
+
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ table.renameTag(tag, targetTag);
+ InternalRow outputRow = newInternalRow(true);
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder