From d7423faf1399d26149257f12b32e1621e85d280b Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Thu, 17 Oct 2024 23:44:35 +0800 Subject: [PATCH] replace-tag-procedure --- docs/content/spark/procedures.md | 13 +++ .../paimon/table/AbstractFileStoreTable.java | 19 ++++ .../paimon/table/DelegatedFileStoreTable.java | 10 +++ .../org/apache/paimon/table/FormatTable.java | 10 +++ .../apache/paimon/table/ReadonlyTable.java | 16 ++++ .../java/org/apache/paimon/table/Table.java | 6 ++ .../org/apache/paimon/utils/TagManager.java | 30 +++++-- .../apache/paimon/spark/SparkProcedures.java | 2 + .../CreateOrReplaceTagBaseProcedure.java | 87 +++++++++++++++++++ .../spark/procedure/CreateTagProcedure.java | 61 ++----------- .../procedure/ReplaceTagTagProcedure.java | 56 ++++++++++++ .../CreateAndDeleteTagProcedureTest.scala | 16 ++-- .../procedure/ReplaceTagProcedureTest.scala | 59 +++++++++++++ 13 files changed, 313 insertions(+), 72 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagTagProcedure.java create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 8c5226921692..98366e22c3ba 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -121,6 +121,19 @@ This section introduce all available spark procedures about paimon. CALL sys.rename_tag(table => 'default.T', tag_name => 'tag1', target_tag_name => 'tag2') + + replace_tag + + Replace an existing tag with new tag info. Arguments: +
  • table: the target table identifier. Cannot be empty.
  • +
  • tag: name of the existed tag. Cannot be empty.
  • +
  • snapshot(Long): id of the snapshot which the tag is based on.
  • +
  • time_retained: The maximum time retained for the existing tag.
  • + + + CALL sys.replace_tag(table => 'default.T', tag_name => 'tag1', snapshot => 10, time_retained => '1 d') + + delete_tag diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index b33cf6922490..9f006d6aae18 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -576,6 +576,25 @@ public void renameTag(String tagName, String targetTagName) { tagManager().renameTag(tagName, targetTagName); } + @Override + public void replaceTag(String tagName, @Nullable Duration timeRetained) { + Snapshot latestSnapshot = snapshotManager().latestSnapshot(); + SnapshotNotExistException.checkNotNull( + latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); + tagManager() + .replaceTag(latestSnapshot, tagName, timeRetained, store().createTagCallbacks()); + } + + @Override + public void replaceTag(String tagName, long fromSnapshotId, @Nullable Duration timeRetained) { + tagManager() + .replaceTag( + findSnapshot(fromSnapshotId), + tagName, + timeRetained, + store().createTagCallbacks()); + } + @Override public void deleteTag(String tagName) { tagManager() diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 5d6331aa414e..b87292180197 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -187,6 +187,16 @@ public void renameTag(String tagName, String targetTagName) { wrapped.renameTag(tagName, targetTagName); } + @Override + public void replaceTag(String tagName, Duration timeRetained) { + wrapped.replaceTag(tagName, timeRetained); + } + + @Override + public void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained) { + wrapped.replaceTag(tagName, fromSnapshotId, timeRetained); + } + @Override public void deleteTag(String tagName) { wrapped.deleteTag(tagName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index 3224131d4afd..3ae758e73aa2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -271,6 +271,16 @@ default void renameTag(String tagName, String targetTagName) { throw new UnsupportedOperationException(); } + @Override + default void replaceTag(String tagName, Duration timeRetained) { + throw new UnsupportedOperationException(); + } + + @Override + default void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained) { + throw new UnsupportedOperationException(); + } + @Override default void deleteTag(String tagName) { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 4ae593b5577f..88964ff0f1d5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -197,6 +197,22 @@ default void renameTag(String tagName, String targetTagName) { this.getClass().getSimpleName())); } + @Override + default void replaceTag(String tagName, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support replaceTag.", + this.getClass().getSimpleName())); + } + + @Override + default void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support replaceTag.", + this.getClass().getSimpleName())); + } + @Override default void deleteTag(String tagName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 613dfca3158a..638f70916afe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -120,6 +120,12 @@ default String fullName() { @Experimental void renameTag(String tagName, String targetTagName); + @Experimental + void replaceTag(String tagName, Duration timeRetained); + + @Experimental + void replaceTag(String tagName, long fromSnapshotId, Duration timeRetained); + /** Delete a tag by name. */ @Experimental void deleteTag(String tagName); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index c3a674bc5eaf..75942909f94e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -103,7 +103,27 @@ public void createTag( List callbacks) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); + checkArgument(!tagExists(tagName), "Tag %s already exists.", tagName); + createOrReplaceTag(snapshot, tagName, timeRetained, callbacks); + } + + /** Replace a tag from given snapshot and save it in the storage. */ + public void replaceTag( + Snapshot snapshot, + String tagName, + @Nullable Duration timeRetained, + List callbacks) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); + checkArgument(tagExists(tagName), "Tag %s does not exist.", tagName); + createOrReplaceTag(snapshot, tagName, timeRetained, callbacks); + } + public void createOrReplaceTag( + Snapshot snapshot, + String tagName, + @Nullable Duration timeRetained, + List callbacks) { // When timeRetained is not defined, please do not write the tagCreatorTime field, // as this will cause older versions (<= 0.7) of readers to be unable to read this // tag. @@ -117,15 +137,7 @@ public void createTag( Path tagPath = tagPath(tagName); try { - if (tagExists(tagName)) { - Snapshot tagged = taggedSnapshot(tagName); - Preconditions.checkArgument( - tagged.id() == snapshot.id(), "Tag name '%s' already exists.", tagName); - // update tag metadata into for the same snapshot of the same tag name. - fileIO.overwriteFileUtf8(tagPath, content); - } else { - fileIO.writeFile(tagPath, content, false); - } + fileIO.overwriteFileUtf8(tagPath, content); } catch (IOException e) { throw new RuntimeException( String.format( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index d2a7180413b7..b6d409b56275 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -37,6 +37,7 @@ import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure; import org.apache.paimon.spark.procedure.RenameTagProcedure; import org.apache.paimon.spark.procedure.RepairProcedure; +import org.apache.paimon.spark.procedure.ReplaceTagTagProcedure; import org.apache.paimon.spark.procedure.ResetConsumerProcedure; import org.apache.paimon.spark.procedure.RollbackProcedure; @@ -63,6 +64,7 @@ private static Map> initProcedureBuilders() { ImmutableMap.builder(); procedureBuilders.put("rollback", RollbackProcedure::builder); procedureBuilders.put("create_tag", CreateTagProcedure::builder); + procedureBuilders.put("replace_tag", ReplaceTagTagProcedure::builder); procedureBuilders.put("rename_tag", RenameTagProcedure::builder); procedureBuilders.put( "create_tag_from_timestamp", CreateTagFromTimestampProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java new file mode 100644 index 000000000000..ed264140b797 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateOrReplaceTagBaseProcedure.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.TimeUtils; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.time.Duration; + +import static org.apache.spark.sql.types.DataTypes.LongType; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** A base procedure to create or replace a tag. */ +public abstract class CreateOrReplaceTagBaseProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.required("tag", StringType), + ProcedureParameter.optional("snapshot", LongType), + ProcedureParameter.optional("time_retained", StringType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected CreateOrReplaceTagBaseProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String tag = args.getString(1); + Long snapshot = args.isNullAt(2) ? null : args.getLong(2); + Duration timeRetained = + args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3)); + + return modifyPaimonTable( + tableIdent, + table -> { + createOrReplaceTag(table, tag, snapshot, timeRetained); + InternalRow outputRow = newInternalRow(true); + return new InternalRow[] {outputRow}; + }); + } + + abstract void createOrReplaceTag( + Table table, String tagName, Long snapshotId, Duration timeRetained); +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index b3f863c5e305..457613edc1c2 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -18,71 +18,26 @@ package org.apache.paimon.spark.procedure; -import org.apache.paimon.utils.TimeUtils; +import org.apache.paimon.table.Table; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import java.time.Duration; -import static org.apache.spark.sql.types.DataTypes.LongType; -import static org.apache.spark.sql.types.DataTypes.StringType; - /** A procedure to create a tag. */ -public class CreateTagProcedure extends BaseProcedure { - - private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", StringType), - ProcedureParameter.required("tag", StringType), - ProcedureParameter.optional("snapshot", LongType), - ProcedureParameter.optional("time_retained", StringType) - }; - - private static final StructType OUTPUT_TYPE = - new StructType( - new StructField[] { - new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) - }); +public class CreateTagProcedure extends CreateOrReplaceTagBaseProcedure { protected CreateTagProcedure(TableCatalog tableCatalog) { super(tableCatalog); } @Override - public ProcedureParameter[] parameters() { - return PARAMETERS; - } - - @Override - public StructType outputType() { - return OUTPUT_TYPE; - } - - @Override - public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String tag = args.getString(1); - Long snapshot = args.isNullAt(2) ? null : args.getLong(2); - Duration timeRetained = - args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3)); - - return modifyPaimonTable( - tableIdent, - table -> { - if (snapshot == null) { - table.createTag(tag, timeRetained); - } else { - table.createTag(tag, snapshot, timeRetained); - } - InternalRow outputRow = newInternalRow(true); - return new InternalRow[] {outputRow}; - }); + void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) { + if (snapshotId == null) { + table.createTag(tagName, timeRetained); + } else { + table.createTag(tagName, snapshotId, timeRetained); + } } public static ProcedureBuilder builder() { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagTagProcedure.java new file mode 100644 index 000000000000..d2b5d29a1bb1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceTagTagProcedure.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.table.Table; + +import org.apache.spark.sql.connector.catalog.TableCatalog; + +import java.time.Duration; + +/** A procedure to replace a tag. */ +public class ReplaceTagTagProcedure extends CreateOrReplaceTagBaseProcedure { + + protected ReplaceTagTagProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + void createOrReplaceTag(Table table, String tagName, Long snapshotId, Duration timeRetained) { + if (snapshotId == null) { + table.replaceTag(tagName, timeRetained); + } else { + table.replaceTag(tagName, snapshotId, timeRetained); + } + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public ReplaceTagTagProcedure doBuild() { + return new ReplaceTagTagProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "ReplaceTagProcedure"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index 3621d44b8395..6ac2f986761e 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -172,19 +172,15 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes "table => 'test.T', tag => 'test_tag', snapshot => 1)"), Row(true) :: Nil) checkAnswer( - spark.sql( - "SELECT count(time_retained) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"), - Row(0) :: Nil) + spark.sql("SELECT count(*) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"), + Row(1) :: Nil) - checkAnswer( + // throw exception "Tag test_tag already exists" + assertThrows[IllegalArgumentException] { spark.sql( "CALL paimon.sys.create_tag(" + - "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 1)"), - Row(true) :: Nil) - checkAnswer( - spark.sql( - "SELECT count(time_retained) FROM paimon.test.`T$tags` where tag_name = 'test_tag'"), - Row(1) :: Nil) + "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 1)") + } } finally { stream.stop() } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala new file mode 100644 index 000000000000..5a9280887031 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceTagProcedureTest.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +class ReplaceTagProcedureTest extends PaimonSparkTestBase { + test("Paimon Procedure: replace tag to update tag meta") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + spark.sql("insert into T values(1, 'a')") + spark.sql("insert into T values(2, 'b')") + assertResult(2)(loadTable("T").snapshotManager().snapshotCount()) + + // throw exception "Tag test_tag does not exist" + assertThrows[IllegalArgumentException] { + spark.sql("CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag')") + } + + spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag')") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"), + Row("test_tag", 2, null) :: Nil) + + // replace tag with new time_retained + spark.sql( + "CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag', time_retained => '1 d')") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"), + Row("test_tag", 2, "PT24H") :: Nil) + + // replace tag with new snapshot and time_retained + spark.sql( + "CALL paimon.sys.replace_tag(table => 'test.T', tag => 'test_tag', snapshot => 1, time_retained => '2 d')") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"), + Row("test_tag", 1, "PT48H") :: Nil) + } +}