From e2acdc2172f230f22d899c5c16873a1bf5da9041 Mon Sep 17 00:00:00 2001
From: askwang <135721692+askwang@users.noreply.github.com>
Date: Thu, 31 Oct 2024 10:38:07 +0800
Subject: [PATCH] [spark] Replace create existing tag semantic with replace_tag
(#4346)
---
docs/content/flink/procedures.md | 26 +++++
docs/content/spark/procedures.md | 13 +++
.../paimon/table/AbstractFileStoreTable.java | 13 +++
.../paimon/table/DelegatedFileStoreTable.java | 5 +
.../org/apache/paimon/table/FormatTable.java | 5 +
.../apache/paimon/table/ReadonlyTable.java | 8 ++
.../java/org/apache/paimon/table/Table.java | 4 +
.../org/apache/paimon/utils/TagManager.java | 43 ++++----
.../paimon/table/FileStoreTableTestBase.java | 5 +-
.../CreateOrReplaceTagBaseProcedure.java | 84 +++++++++++++++
.../flink/procedure/CreateTagProcedure.java | 59 +---------
.../flink/procedure/ReplaceTagProcedure.java | 39 +++++++
.../ProcedurePositionalArgumentsITCase.java | 30 ++++++
.../CreateOrReplaceTagActionFactory.java | 65 +++++++++++
.../flink/action/CreateTagActionFactory.java | 50 +++------
.../paimon/flink/action/ReplaceTagAction.java | 51 +++++++++
.../flink/action/ReplaceTagActionFactory.java | 64 +++++++++++
.../CreateOrReplaceTagBaseProcedure.java | 74 +++++++++++++
.../flink/procedure/CreateTagProcedure.java | 49 +--------
.../flink/procedure/ReplaceTagProcedure.java | 39 +++++++
.../org.apache.paimon.factories.Factory | 2 +
.../flink/action/ReplaceTagActionTest.java | 102 ++++++++++++++++++
.../paimon/flink/procedure/ProcedureTest.java | 3 +-
.../procedure/ReplaceTagProcedureITCase.java | 67 ++++++++++++
.../apache/paimon/spark/SparkProcedures.java | 2 +
.../CreateOrReplaceTagBaseProcedure.java | 87 +++++++++++++++
.../spark/procedure/CreateTagProcedure.java | 63 ++---------
.../spark/procedure/ReplaceTagProcedure.java | 52 +++++++++
.../CreateAndDeleteTagProcedureTest.scala | 16 ++-
.../procedure/ReplaceTagProcedureTest.scala | 59 ++++++++++
30 files changed, 961 insertions(+), 218 deletions(-)
create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java
create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java
create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java
create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java
create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagProcedure.java
create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 52460e27e709..e489f48dde03 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -222,6 +222,32 @@ All available procedures are listed below.
CALL sys.delete_tag(`table` => 'default.T', tag => 'my_tag')
+
delete_tag |
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index b33cf6922490..af0c3d71ec1c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -576,6 +576,19 @@ public void renameTag(String tagName, String targetTagName) {
tagManager().renameTag(tagName, targetTagName);
}
+ @Override
+ public void replaceTag(
+ String tagName, @Nullable Long fromSnapshotId, @Nullable Duration timeRetained) {
+ if (fromSnapshotId == null) {
+ Snapshot latestSnapshot = snapshotManager().latestSnapshot();
+ SnapshotNotExistException.checkNotNull(
+ latestSnapshot, "Cannot replace tag because latest snapshot doesn't exist.");
+ tagManager().replaceTag(latestSnapshot, tagName, timeRetained);
+ } else {
+ tagManager().replaceTag(findSnapshot(fromSnapshotId), tagName, timeRetained);
+ }
+ }
+
@Override
public void deleteTag(String tagName) {
tagManager()
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 5d6331aa414e..f6f3930baade 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -187,6 +187,11 @@ public void renameTag(String tagName, String targetTagName) {
wrapped.renameTag(tagName, targetTagName);
}
+ @Override
+ public void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained) {
+ wrapped.replaceTag(tagName, fromSnapshotId, timeRetained);
+ }
+
@Override
public void deleteTag(String tagName) {
wrapped.deleteTag(tagName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 3224131d4afd..a53ba545c25e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -271,6 +271,11 @@ default void renameTag(String tagName, String targetTagName) {
throw new UnsupportedOperationException();
}
+ @Override
+ default void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index 4ae593b5577f..fe5ebbfcd148 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -197,6 +197,14 @@ default void renameTag(String tagName, String targetTagName) {
this.getClass().getSimpleName()));
}
+ @Override
+ default void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support replaceTag.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 11dc135a6253..db6848f5f1a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -121,6 +121,10 @@ default String fullName() {
@Experimental
void renameTag(String tagName, String targetTagName);
+ /** Replace a tag with new snapshot id and new time retained. */
+ @Experimental
+ void replaceTag(String tagName, Long fromSnapshotId, Duration timeRetained);
+
/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index c3a674bc5eaf..65963aafdf6b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -97,13 +97,26 @@ public List tagPaths(Predicate predicate) throws IOException {
/** Create a tag from given snapshot and save it in the storage. */
public void createTag(
- Snapshot snapshot,
- String tagName,
- @Nullable Duration timeRetained,
- List callbacks) {
+ Snapshot snapshot, String tagName, Duration timeRetained, List callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
+ checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
+ createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
+ }
+
+ /** Replace a tag from given snapshot and save it in the storage. */
+ public void replaceTag(Snapshot snapshot, String tagName, Duration timeRetained) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
+ checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", tagName);
+ createOrReplaceTag(snapshot, tagName, timeRetained, null);
+ }
+ public void createOrReplaceTag(
+ Snapshot snapshot,
+ String tagName,
+ @Nullable Duration timeRetained,
+ @Nullable List callbacks) {
// When timeRetained is not defined, please do not write the tagCreatorTime field,
// as this will cause older versions (<= 0.7) of readers to be unable to read this
// tag.
@@ -117,15 +130,7 @@ public void createTag(
Path tagPath = tagPath(tagName);
try {
- if (tagExists(tagName)) {
- Snapshot tagged = taggedSnapshot(tagName);
- Preconditions.checkArgument(
- tagged.id() == snapshot.id(), "Tag name '%s' already exists.", tagName);
- // update tag metadata into for the same snapshot of the same tag name.
- fileIO.overwriteFileUtf8(tagPath, content);
- } else {
- fileIO.writeFile(tagPath, content, false);
- }
+ fileIO.overwriteFileUtf8(tagPath, content);
} catch (IOException e) {
throw new RuntimeException(
String.format(
@@ -135,11 +140,13 @@ public void createTag(
e);
}
- try {
- callbacks.forEach(callback -> callback.notifyCreation(tagName));
- } finally {
- for (TagCallback tagCallback : callbacks) {
- IOUtils.closeQuietly(tagCallback);
+ if (callbacks != null) {
+ try {
+ callbacks.forEach(callback -> callback.notifyCreation(tagName));
+ } finally {
+ for (TagCallback tagCallback : callbacks) {
+ IOUtils.closeQuietly(tagCallback);
+ }
}
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f8b15c155f63..f6343bfe437f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1136,8 +1136,9 @@ public void testCreateSameTagName() throws Exception {
table.createTag("test-tag", 1);
// verify that tag file exist
assertThat(tagManager.tagExists("test-tag")).isTrue();
- // Create again
- table.createTag("test-tag", 1);
+ // Create again failed if tag existed
+ Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 1))
+ .hasMessageContaining("Tag name 'test-tag' already exists.");
Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2))
.hasMessageContaining("Tag name 'test-tag' already exists.");
}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
new file mode 100644
index 000000000000..2b7dadc05e9b
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.TimeUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/** A base procedure to create or replace a tag. */
+public abstract class CreateOrReplaceTagBaseProcedure extends ProcedureBase {
+
+ public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, tagName, null, null);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String tagName, long snapshotId)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, tagName, snapshotId, null);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String tagName,
+ long snapshotId,
+ String timeRetained)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, tagName, snapshotId, timeRetained);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String tagName, String timeRetained)
+ throws Catalog.TableNotExistException {
+ return innerCall(tableId, tagName, null, timeRetained);
+ }
+
+ private String[] innerCall(
+ String tableId,
+ String tagName,
+ @Nullable Long snapshotId,
+ @Nullable String timeRetained)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ createOrReplaceTag(table, tagName, snapshotId, toDuration(timeRetained));
+ return new String[] {"Success"};
+ }
+
+ abstract void createOrReplaceTag(
+ Table table, String tagName, Long snapshotId, Duration timeRetained);
+
+ @Nullable
+ private static Duration toDuration(@Nullable String s) {
+ if (s == null) {
+ return null;
+ }
+
+ return TimeUtils.parseDuration(s);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 1a7b03ef6512..b1af1c93942f 100644
--- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -18,14 +18,7 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.TimeUtils;
-
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import javax.annotation.Nullable;
import java.time.Duration;
@@ -36,59 +29,17 @@
* CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
*
*/
-public class CreateTagProcedure extends ProcedureBase {
+public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {
public static final String IDENTIFIER = "create_tag";
- public String[] call(
- ProcedureContext procedureContext, String tableId, String tagName, long snapshotId)
- throws Catalog.TableNotExistException {
- return innerCall(tableId, tagName, snapshotId, null);
- }
-
- public String[] call(ProcedureContext procedureContext, String tableId, String tagName)
- throws Catalog.TableNotExistException {
- return innerCall(tableId, tagName, null, null);
- }
-
- public String[] call(
- ProcedureContext procedureContext,
- String tableId,
- String tagName,
- long snapshotId,
- String timeRetained)
- throws Catalog.TableNotExistException {
- return innerCall(tableId, tagName, snapshotId, timeRetained);
- }
-
- public String[] call(
- ProcedureContext procedureContext, String tableId, String tagName, String timeRetained)
- throws Catalog.TableNotExistException {
- return innerCall(tableId, tagName, null, timeRetained);
- }
-
- private String[] innerCall(
- String tableId,
- String tagName,
- @Nullable Long snapshotId,
- @Nullable String timeRetained)
- throws Catalog.TableNotExistException {
- Table table = catalog.getTable(Identifier.fromString(tableId));
+ @Override
+ void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
if (snapshotId == null) {
- table.createTag(tagName, toDuration(timeRetained));
+ table.createTag(tagName, timeRetained);
} else {
- table.createTag(tagName, snapshotId, toDuration(timeRetained));
+ table.createTag(tagName, snapshotId, timeRetained);
}
- return new String[] {"Success"};
- }
-
- @Nullable
- private static Duration toDuration(@Nullable String s) {
- if (s == null) {
- return null;
- }
-
- return TimeUtils.parseDuration(s);
}
@Override
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
new file mode 100644
index 000000000000..6ed6ecc0e512
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.table.Table;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagProcedure extends CreateOrReplaceTagBaseProcedure {
+
+ private static final String IDENTIFIER = "replace_tag";
+
+ @Override
+ void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
+ table.replaceTag(tagName, snapshotId, timeRetained);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 3eb1bf3c40e4..f2385e66d2a1 100644
--- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -24,6 +24,7 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
@@ -518,4 +519,33 @@ public void testExpireTags() throws Exception {
assertThat(sql("CALL sys.expire_tags('default.T')"))
.containsExactlyInAnyOrder(Row.of("tag-3"));
}
+
+ @Test
+ public void testReplaceTags() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " id INT,"
+ + " NAME STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ('bucket' = '1'"
+ + ")");
+ sql("INSERT INTO T VALUES (1, 'a')");
+ sql("INSERT INTO T VALUES (2, 'b')");
+ assertThat(paimonTable("T").snapshotManager().snapshotCount()).isEqualTo(2L);
+
+ Assertions.assertThatThrownBy(() -> sql("CALL sys.replace_tag('default.T', 'test_tag')"))
+ .hasMessageContaining("Tag name 'test_tag' does not exist.");
+
+ sql("CALL sys.create_tag('default.T', 'test_tag')");
+ assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`"))
+ .containsExactly(Row.of("test_tag", 2L, null));
+
+ sql("CALL sys.replace_tag('default.T', 'test_tag', 1)");
+ assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`"))
+ .containsExactly(Row.of("test_tag", 1L, null));
+
+ sql("CALL sys.replace_tag('default.T', 'test_tag', 2, '1 d')");
+ assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`"))
+ .containsExactly(Row.of("test_tag", 2L, "PT24H"));
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java
new file mode 100644
index 000000000000..fecb6895b682
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateOrReplaceTagActionFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.utils.TimeUtils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link ReplaceTagAction} or {@link ReplaceTagAction}. */
+public abstract class CreateOrReplaceTagActionFactory implements ActionFactory {
+
+ private static final String TAG_NAME = "tag_name";
+ private static final String SNAPSHOT = "snapshot";
+ private static final String TIME_RETAINED = "time_retained";
+
+ @Override
+ public Optional create(MultipleParameterToolAdapter params) {
+ checkRequiredArgument(params, TAG_NAME);
+
+ Tuple3 tablePath = getTablePath(params);
+ Map catalogConfig = optionalConfigMap(params, CATALOG_CONF);
+ String tagName = params.get(TAG_NAME);
+
+ Long snapshot = null;
+ if (params.has(SNAPSHOT)) {
+ snapshot = Long.parseLong(params.get(SNAPSHOT));
+ }
+
+ Duration timeRetained = null;
+ if (params.has(TIME_RETAINED)) {
+ timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED));
+ }
+
+ return Optional.of(
+ createOrReplaceTagAction(
+ tablePath, catalogConfig, tagName, snapshot, timeRetained));
+ }
+
+ abstract Action createOrReplaceTagAction(
+ Tuple3 tablePath,
+ Map catalogConfig,
+ String tagName,
+ Long snapshot,
+ Duration timeRetained);
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
index 7769fa1d792f..c525943122bc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java
@@ -18,56 +18,36 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.utils.TimeUtils;
-
import org.apache.flink.api.java.tuple.Tuple3;
import java.time.Duration;
import java.util.Map;
-import java.util.Optional;
/** Factory to create {@link CreateTagAction}. */
-public class CreateTagActionFactory implements ActionFactory {
+public class CreateTagActionFactory extends CreateOrReplaceTagActionFactory {
public static final String IDENTIFIER = "create_tag";
- private static final String TAG_NAME = "tag_name";
- private static final String SNAPSHOT = "snapshot";
- private static final String TIME_RETAINED = "time_retained";
-
@Override
public String identifier() {
return IDENTIFIER;
}
@Override
- public Optional create(MultipleParameterToolAdapter params) {
- checkRequiredArgument(params, TAG_NAME);
-
- Tuple3 tablePath = getTablePath(params);
- Map catalogConfig = optionalConfigMap(params, CATALOG_CONF);
- String tagName = params.get(TAG_NAME);
-
- Long snapshot = null;
- if (params.has(SNAPSHOT)) {
- snapshot = Long.parseLong(params.get(SNAPSHOT));
- }
-
- Duration timeRetained = null;
- if (params.has(TIME_RETAINED)) {
- timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED));
- }
-
- CreateTagAction action =
- new CreateTagAction(
- tablePath.f0,
- tablePath.f1,
- tablePath.f2,
- catalogConfig,
- tagName,
- snapshot,
- timeRetained);
- return Optional.of(action);
+ Action createOrReplaceTagAction(
+ Tuple3 tablePath,
+ Map catalogConfig,
+ String tagName,
+ Long snapshot,
+ Duration timeRetained) {
+ return new CreateTagAction(
+ tablePath.f0,
+ tablePath.f1,
+ tablePath.f2,
+ catalogConfig,
+ tagName,
+ snapshot,
+ timeRetained);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java
new file mode 100644
index 000000000000..09a85fe8a25a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagAction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Map;
+
+/** Replace tag action for Flink. */
+public class ReplaceTagAction extends TableActionBase {
+
+ private final String tagName;
+ private final @Nullable Long snapshotId;
+ private final @Nullable Duration timeRetained;
+
+ public ReplaceTagAction(
+ String warehouse,
+ String databaseName,
+ String tableName,
+ Map catalogConfig,
+ String tagName,
+ @Nullable Long snapshotId,
+ @Nullable Duration timeRetained) {
+ super(warehouse, databaseName, tableName, catalogConfig);
+ this.tagName = tagName;
+ this.timeRetained = timeRetained;
+ this.snapshotId = snapshotId;
+ }
+
+ @Override
+ public void run() throws Exception {
+ table.replaceTag(tagName, snapshotId, timeRetained);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java
new file mode 100644
index 000000000000..a734e9cfbdc5
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ReplaceTagActionFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.time.Duration;
+import java.util.Map;
+
+/** Factory to create {@link ReplaceTagAction}. */
+public class ReplaceTagActionFactory extends CreateOrReplaceTagActionFactory {
+
+ public static final String IDENTIFIER = "replace_tag";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ Action createOrReplaceTagAction(
+ Tuple3 tablePath,
+ Map catalogConfig,
+ String tagName,
+ Long snapshot,
+ Duration timeRetained) {
+ return new ReplaceTagAction(
+ tablePath.f0,
+ tablePath.f1,
+ tablePath.f2,
+ catalogConfig,
+ tagName,
+ snapshot,
+ timeRetained);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println("Action \"replace_tag\" to replace an existing tag with new tag info.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " replace_tag --warehouse --database "
+ + "--table --tag_name [--snapshot ] [--time_retained ]");
+ System.out.println();
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
new file mode 100644
index 000000000000..dba9d46636e6
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.TimeUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/** A base procedure to create or replace a tag. */
+public abstract class CreateOrReplaceTagBaseProcedure extends ProcedureBase {
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "tag", type = @DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "snapshot_id",
+ type = @DataTypeHint("BIGINT"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "time_retained",
+ type = @DataTypeHint("STRING"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String tagName,
+ @Nullable Long snapshotId,
+ @Nullable String timeRetained)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ createOrReplaceTag(table, tagName, snapshotId, toDuration(timeRetained));
+ return new String[] {"Success"};
+ }
+
+ abstract void createOrReplaceTag(
+ Table table, String tagName, Long snapshotId, Duration timeRetained);
+
+ @Nullable
+ private static Duration toDuration(@Nullable String s) {
+ if (s == null) {
+ return null;
+ }
+
+ return TimeUtils.parseDuration(s);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
index 3fb51c8d935c..b1af1c93942f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -18,17 +18,7 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.TimeUtils;
-
-import org.apache.flink.table.annotation.ArgumentHint;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.ProcedureHint;
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import javax.annotation.Nullable;
import java.time.Duration;
@@ -39,46 +29,17 @@
* CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
*
*/
-public class CreateTagProcedure extends ProcedureBase {
+public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {
public static final String IDENTIFIER = "create_tag";
- @ProcedureHint(
- argument = {
- @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "tag", type = @DataTypeHint("STRING")),
- @ArgumentHint(
- name = "snapshot_id",
- type = @DataTypeHint("BIGINT"),
- isOptional = true),
- @ArgumentHint(
- name = "time_retained",
- type = @DataTypeHint("STRING"),
- isOptional = true)
- })
- public String[] call(
- ProcedureContext procedureContext,
- String tableId,
- String tagName,
- @Nullable Long snapshotId,
- @Nullable String timeRetained)
- throws Catalog.TableNotExistException {
- Table table = catalog.getTable(Identifier.fromString(tableId));
+ @Override
+ void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
if (snapshotId == null) {
- table.createTag(tagName, toDuration(timeRetained));
+ table.createTag(tagName, timeRetained);
} else {
- table.createTag(tagName, snapshotId, toDuration(timeRetained));
+ table.createTag(tagName, snapshotId, timeRetained);
}
- return new String[] {"Success"};
- }
-
- @Nullable
- private static Duration toDuration(@Nullable String s) {
- if (s == null) {
- return null;
- }
-
- return TimeUtils.parseDuration(s);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
new file mode 100644
index 000000000000..6ed6ecc0e512
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceTagProcedure.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.table.Table;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagProcedure extends CreateOrReplaceTagBaseProcedure {
+
+ private static final String IDENTIFIER = "replace_tag";
+
+ @Override
+ void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
+ table.replaceTag(tagName, snapshotId, timeRetained);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 2cf57201d6ae..2f40278d659e 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -26,6 +26,7 @@ org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory
org.apache.paimon.flink.action.CreateTagFromWatermarkActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ExpireTagsActionFactory
+org.apache.paimon.flink.action.ReplaceTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
org.apache.paimon.flink.action.MigrateFileActionFactory
@@ -51,6 +52,7 @@ org.apache.paimon.flink.procedure.CreateTagFromTimestampProcedure
org.apache.paimon.flink.procedure.CreateTagFromWatermarkProcedure
org.apache.paimon.flink.procedure.DeleteTagProcedure
org.apache.paimon.flink.procedure.ExpireTagsProcedure
+org.apache.paimon.flink.procedure.ReplaceTagProcedure
org.apache.paimon.flink.procedure.CreateBranchProcedure
org.apache.paimon.flink.procedure.DeleteBranchProcedure
org.apache.paimon.flink.procedure.DropPartitionProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
new file mode 100644
index 000000000000..00b43b9e11c9
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.TagManager;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link ReplaceTagAction}. */
+public class ReplaceTagActionTest extends ActionITCaseBase {
+
+ @BeforeEach
+ public void setUp() {
+ init(warehouse);
+ }
+
+ @Test
+ public void testReplaceTag() throws Exception {
+ bEnv.executeSql(
+ "CREATE TABLE T (id INT, name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED)"
+ + " WITH ('bucket'='1')");
+
+ FileStoreTable table = getFileStoreTable("T");
+ TagManager tagManager = table.tagManager();
+
+ bEnv.executeSql("INSERT INTO T VALUES (1, 'a')").await();
+ bEnv.executeSql("INSERT INTO T VALUES (2, 'b')").await();
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2);
+
+ Assertions.assertThatThrownBy(
+ () ->
+ bEnv.executeSql(
+ "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag')"))
+ .hasMessageContaining("Tag name 'test_tag' does not exist.");
+
+ bEnv.executeSql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')");
+ assertThat(tagManager.tagExists("test_tag")).isEqualTo(true);
+ assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(2);
+ assertThat(tagManager.tag("test_tag").getTagTimeRetained()).isEqualTo(null);
+
+ // replace tag with new time_retained
+ createAction(
+ ReplaceTagAction.class,
+ "replace_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ "T",
+ "--tag_name",
+ "test_tag",
+ "--time_retained",
+ "1 d")
+ .run();
+ assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(24);
+
+ // replace tag with new snapshot and time_retained
+ createAction(
+ ReplaceTagAction.class,
+ "replace_tag",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ "T",
+ "--tag_name",
+ "test_tag",
+ "--snapshot",
+ "1",
+ "--time_retained",
+ "2 d")
+ .run();
+ assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(1);
+ assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(48);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
index c24d4105a557..74e3aeeac53b 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ProcedureTest.java
@@ -98,7 +98,8 @@ public void testProcedureHasNamedArgument() {
}
private Method getMethodFromName(Class> clazz, String methodName) {
- Method[] methods = clazz.getDeclaredMethods();
+ // get all methods of current and parent class
+ Method[] methods = clazz.getMethods();
for (Method method : methods) {
if (method.getName().equals(methodName)) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
new file mode 100644
index 000000000000..8a4eb791a6ad
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.types.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link ReplaceTagProcedure}. */
+public class ReplaceTagProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws Exception {
+ sql(
+ "CREATE TABLE T (id INT, name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED)"
+ + " WITH ('bucket'='1')");
+
+ sql("INSERT INTO T VALUES (1, 'a')");
+ sql("INSERT INTO T VALUES (2, 'b')");
+ assertThat(paimonTable("T").snapshotManager().snapshotCount()).isEqualTo(2);
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag')"))
+ .hasMessageContaining("Tag name 'test_tag' does not exist.");
+
+ sql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')");
+ assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`"))
+ .containsExactly(Row.of("test_tag", 2L, null));
+
+ // replace tag with new time_retained
+ sql(
+ "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag',"
+ + " time_retained => '1 d')");
+ assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`"))
+ .containsExactly(Row.of("test_tag", 2L, "PT24H"));
+
+ // replace tag with new snapshot and time_retained
+ sql(
+ "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag',"
+ + " snapshot => 1, time_retained => '2 d')");
+ assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`"))
+ .containsExactly(Row.of("test_tag", 2L, "PT48H"));
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index dee0c38d46fb..36c6ff897355 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -38,6 +38,7 @@
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
import org.apache.paimon.spark.procedure.RenameTagProcedure;
import org.apache.paimon.spark.procedure.RepairProcedure;
+import org.apache.paimon.spark.procedure.ReplaceTagProcedure;
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
@@ -64,6 +65,7 @@ private static Map> initProcedureBuilders() {
ImmutableMap.builder();
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
+ procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder);
procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
procedureBuilders.put(
"create_tag_from_timestamp", CreateTagFromTimestampProcedure::builder);
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java
new file mode 100644
index 000000000000..ed264140b797
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.TimeUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.time.Duration;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** A base procedure to create or replace a tag. */
+public abstract class CreateOrReplaceTagBaseProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.required("tag", StringType),
+ ProcedureParameter.optional("snapshot", LongType),
+ ProcedureParameter.optional("time_retained", StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
+ });
+
+ protected CreateOrReplaceTagBaseProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+ String tag = args.getString(1);
+ Long snapshot = args.isNullAt(2) ? null : args.getLong(2);
+ Duration timeRetained =
+ args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3));
+
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ createOrReplaceTag(table, tag, snapshot, timeRetained);
+ InternalRow outputRow = newInternalRow(true);
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ abstract void createOrReplaceTag(
+ Table table, String tagName, Long snapshotId, Duration timeRetained);
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
index b3f863c5e305..157743f9892e 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java
@@ -18,71 +18,26 @@
package org.apache.paimon.spark.procedure;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.table.Table;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
import java.time.Duration;
-import static org.apache.spark.sql.types.DataTypes.LongType;
-import static org.apache.spark.sql.types.DataTypes.StringType;
-
/** A procedure to create a tag. */
-public class CreateTagProcedure extends BaseProcedure {
-
- private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- ProcedureParameter.required("table", StringType),
- ProcedureParameter.required("tag", StringType),
- ProcedureParameter.optional("snapshot", LongType),
- ProcedureParameter.optional("time_retained", StringType)
- };
-
- private static final StructType OUTPUT_TYPE =
- new StructType(
- new StructField[] {
- new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
- });
+public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure {
- protected CreateTagProcedure(TableCatalog tableCatalog) {
+ private CreateTagProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}
@Override
- public ProcedureParameter[] parameters() {
- return PARAMETERS;
- }
-
- @Override
- public StructType outputType() {
- return OUTPUT_TYPE;
- }
-
- @Override
- public InternalRow[] call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
- String tag = args.getString(1);
- Long snapshot = args.isNullAt(2) ? null : args.getLong(2);
- Duration timeRetained =
- args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3));
-
- return modifyPaimonTable(
- tableIdent,
- table -> {
- if (snapshot == null) {
- table.createTag(tag, timeRetained);
- } else {
- table.createTag(tag, snapshot, timeRetained);
- }
- InternalRow outputRow = newInternalRow(true);
- return new InternalRow[] {outputRow};
- });
+ void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
+ if (snapshotId == null) {
+ table.createTag(tagName, timeRetained);
+ } else {
+ table.createTag(tagName, snapshotId, timeRetained);
+ }
}
public static ProcedureBuilder builder() {
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagProcedure.java
new file mode 100644
index 000000000000..205fca5ee69e
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagProcedure.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.Table;
+
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+
+import java.time.Duration;
+
+/** A procedure to replace a tag. */
+public class ReplaceTagProcedure extends CreateOrReplaceTagBaseProcedure {
+
+ private ReplaceTagProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) {
+ table.replaceTag(tagName, snapshotId, timeRetained);
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder() {
+ @Override
+ public ReplaceTagProcedure doBuild() {
+ return new ReplaceTagProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "ReplaceTagProcedure";
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 6400cb88c02f..4a4c7ae215df 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -172,19 +172,15 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes
"table => 'test.T', tag => 'test_tag', snapshot => 1)"),
Row(true) :: Nil)
checkAnswer(
- spark.sql(
- "SELECT count(time_retained) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"),
- Row(0) :: Nil)
+ spark.sql("SELECT count(*) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"),
+ Row(1) :: Nil)
- checkAnswer(
+ // throw exception "Tag test_tag already exists"
+ assertThrows[IllegalArgumentException] {
spark.sql(
"CALL paimon.sys.create_tag(" +
- "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 1)"),
- Row(true) :: Nil)
- checkAnswer(
- spark.sql(
- "SELECT count(time_retained) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"),
- Row(1) :: Nil)
+ "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 1)")
+ }
} finally {
stream.stop()
}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala
new file mode 100644
index 000000000000..5a9280887031
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class ReplaceTagProcedureTest extends PaimonSparkTestBase {
+ test("Paimon Procedure: replace tag to update tag meta") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+ |""".stripMargin)
+ spark.sql("insert into T values(1, 'a')")
+ spark.sql("insert into T values(2, 'b')")
+ assertResult(2)(loadTable("T").snapshotManager().snapshotCount())
+
+ // throw exception "Tag test_tag does not exist"
+ assertThrows[IllegalArgumentException] {
+ spark.sql("CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag')")
+ }
+
+ spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag')")
+ checkAnswer(
+ spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+ Row("test_tag", 2, null) :: Nil)
+
+ // replace tag with new time_retained
+ spark.sql(
+ "CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag', time_retained => '1 d')")
+ checkAnswer(
+ spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+ Row("test_tag", 2, "PT24H") :: Nil)
+
+ // replace tag with new snapshot and time_retained
+ spark.sql(
+ "CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag', snapshot => 1, time_retained => '2 d')")
+ checkAnswer(
+ spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"),
+ Row("test_tag", 1, "PT48H") :: Nil)
+ }
+}
|