From 29c1347889e31c9a4789e8f4a3656ece55edac8c Mon Sep 17 00:00:00 2001 From: askwang <135721692+askwang@users.noreply.github.com> Date: Tue, 5 Nov 2024 19:35:59 +0800 Subject: [PATCH] [spark] Add replace tag ddl syntax (#4457) --- docs/content/spark/sql-ddl.md | 14 ++++- .../PaimonSqlExtensions.g4 | 13 +++- ....scala => CreateOrReplaceTagCommand.scala} | 4 +- ...xec.scala => CreateOrReplaceTagExec.scala} | 29 ++++++--- .../spark/execution/PaimonStrategy.scala | 8 ++- .../PaimonSparkSqlExtensionsParser.scala | 10 +-- .../PaimonSqlExtensionsAstBuilder.scala | 63 +++++++++++-------- .../spark/sql/PaimonTagDdlTestBase.scala | 29 +++++++++ 8 files changed, 125 insertions(+), 45 deletions(-) rename paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/{CreateTagCommand.scala => CreateOrReplaceTagCommand.scala} (93%) rename paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/{CreateTagExec.scala => CreateOrReplaceTagExec.scala} (72%) diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index 43b101730f03..3b4d1722bc09 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -211,8 +211,12 @@ CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = ``` ## Tag DDL -### Create Tag -Create a tag based on snapshot or retention. +### Create or replace Tag +Create or replace a tag syntax with the following options. +- Create a tag with or without the snapshot id and time retention. +- Create an existed tag is not failed if using `IF NOT EXISTS` syntax. +- Update a tag using `REPLACE TAG` or `CREATE OR REPLACE TAG` syntax. + ```sql -- create a tag based on the latest snapshot and no retention. ALTER TABLE T CREATE TAG `TAG-1`; @@ -228,6 +232,12 @@ ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1; -- create a tag based on snapshot-2 and retain it for 12 hour. ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS; + +-- replace a existed tag with new snapshot id and new retention +ALTER TABLE T REPLACE TAG `TAG-4` AS OF VERSION 2 RETAIN 24 HOURS; + +-- create or replace a tag, create tag if it not exist, replace tag if it exists. +ALTER TABLE T CREATE OR REPLACE TAG `TAG-5` AS OF VERSION 2 RETAIN 24 HOURS; ``` ### Delete Tag diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index e835b00cdc7d..207d9732160f 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -71,7 +71,7 @@ singleStatement statement : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call | SHOW TAGS multipartIdentifier #showTags - | ALTER TABLE multipartIdentifier CREATE TAG (IF NOT EXISTS)? identifier tagOptions #createTag + | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag ; @@ -81,6 +81,11 @@ callArgument | identifier '=>' expression #namedArgument ; +createReplaceTagClause + : CREATE TAG (IF NOT EXISTS)? identifier tagOptions + | (CREATE OR)? REPLACE TAG identifier tagOptions + ; + tagOptions : (AS OF VERSION snapshotId)? (timeRetain)? ; @@ -146,8 +151,8 @@ quotedIdentifier ; nonReserved - : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | TABLE - | RETAIN | VERSION | TAG + : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | OR | TABLE + | REPLACE | RETAIN | VERSION | TAG | TRUE | FALSE | MAP ; @@ -164,7 +169,9 @@ IF : 'IF'; MINUTES: 'MINUTES'; NOT: 'NOT'; OF: 'OF'; +OR: 'OR'; RENAME: 'RENAME'; +REPLACE: 'REPLACE'; RETAIN: 'RETAIN'; SHOW: 'SHOW'; TABLE: 'TABLE'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala similarity index 93% rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala index 226311663bf5..0830fc9ed3d6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateOrReplaceTagCommand.scala @@ -22,10 +22,12 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand import org.apache.spark.sql.catalyst.expressions.Attribute -case class CreateTagCommand( +case class CreateOrReplaceTagCommand( table: Seq[String], tagName: String, tagOptions: TagOptions, + create: Boolean, + replace: Boolean, ifNotExists: Boolean) extends PaimonLeafCommand { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala similarity index 72% rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala index 57593c3a684e..0506ed42f1f4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateOrReplaceTagExec.scala @@ -27,11 +27,13 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} -case class CreateTagExec( +case class CreateOrReplaceTagExec( catalog: TableCatalog, ident: Identifier, tagName: String, tagOptions: TagOptions, + create: Boolean, + replace: Boolean, ifNotExists: Boolean) extends PaimonLeafV2CommandExec { @@ -42,14 +44,27 @@ case class CreateTagExec( table.asInstanceOf[SparkTable].getTable match { case paimonTable: FileStoreTable => val tagIsExists = paimonTable.tagManager().tagExists(tagName) - if (tagIsExists && ifNotExists) { - return Nil - } val timeRetained = tagOptions.timeRetained.orNull - if (tagOptions.snapshotId.isEmpty) { - paimonTable.createTag(tagName, timeRetained) + val snapshotId = tagOptions.snapshotId + + if (create && replace && !tagIsExists) { + if (snapshotId.isEmpty) { + paimonTable.createTag(tagName, timeRetained) + } else { + paimonTable.createTag(tagName, snapshotId.get, timeRetained) + } + } else if (replace) { + paimonTable.replaceTag(tagName, snapshotId.get, timeRetained) } else { - paimonTable.createTag(tagName, tagOptions.snapshotId.get, timeRetained) + if (tagIsExists && ifNotExists) { + return Nil + } + + if (snapshotId.isEmpty) { + paimonTable.createTag(tagName, timeRetained) + } else { + paimonTable.createTag(tagName, snapshotId.get, timeRetained) + } } case t => throw new UnsupportedOperationException( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index d715ef2f5e4e..0c3d3e6b6dc6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.{SparkCatalog, SparkUtils} -import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -50,12 +50,14 @@ case class PaimonStrategy(spark: SparkSession) case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) => ShowTagsExec(catalog, ident, t.output) :: Nil - case CreateTagCommand( + case CreateOrReplaceTagCommand( PaimonCatalogAndIdentifier(table, ident), tagName, tagOptions, + create, + replace, ifNotExists) => - CreateTagExec(table, ident, tagName, tagOptions, ifNotExists) :: Nil + CreateOrReplaceTagExec(table, ident, tagName, tagOptions, create, replace, ifNotExists) :: Nil case DeleteTagCommand(PaimonCatalogAndIdentifier(catalog, ident), tagStr, ifExists) => DeleteTagExec(catalog, ident, tagStr, ifExists) :: Nil diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala index f7e8a8506375..78a7f80eaf38 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala @@ -105,10 +105,12 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface) } private def isTagRefDdl(normalized: String): Boolean = { - normalized.startsWith("show tags") || (normalized - .startsWith("alter table") && (normalized.contains("create tag") || - normalized.contains("rename tag") || - normalized.contains("delete tag"))) + normalized.startsWith("show tags") || + (normalized.startsWith("alter table") && + (normalized.contains("create tag") || + normalized.contains("replace tag") || + normalized.contains("rename tag") || + normalized.contains("delete tag"))) } protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index 06d57d597e25..b864894e7498 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser.extensions import org.apache.paimon.spark.catalyst.plans.logical -import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions} import org.apache.paimon.utils.TimeUtils import org.antlr.v4.runtime._ @@ -98,31 +98,44 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier)) } - /** Create a CREATE TAG logical command. */ - override def visitCreateTag(ctx: CreateTagContext): CreateTagCommand = withOrigin(ctx) { - val tagName = ctx.identifier().getText - val tagOptionsContext = Option(ctx.tagOptions()) - val snapshotId = - tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId())).map(_.getText.toLong) - val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.timeRetain())) - val timeRetained = if (timeRetainCtx.nonEmpty) { - val (number, timeUnit) = - timeRetainCtx.map(retain => (retain.number().getText.toLong, retain.timeUnit().getText)).get - Option(TimeUtils.parseDuration(number, timeUnit)) - } else { - None + /** Create a CREATE OR REPLACE TAG logical command. */ + override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTagCommand = + withOrigin(ctx) { + val createTagClause = ctx.createReplaceTagClause() + + val tagName = createTagClause.identifier().getText + val tagOptionsContext = Option(createTagClause.tagOptions()) + val snapshotId = + tagOptionsContext + .flatMap(tagOptions => Option(tagOptions.snapshotId())) + .map(_.getText.toLong) + val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.timeRetain())) + val timeRetained = if (timeRetainCtx.nonEmpty) { + val (number, timeUnit) = + timeRetainCtx + .map(retain => (retain.number().getText.toLong, retain.timeUnit().getText)) + .get + Option(TimeUtils.parseDuration(number, timeUnit)) + } else { + None + } + val tagOptions = TagOptions( + snapshotId, + timeRetained + ) + + val create = createTagClause.CREATE() != null + val replace = createTagClause.REPLACE() != null + val ifNotExists = createTagClause.EXISTS() != null + + CreateOrReplaceTagCommand( + typedVisit[Seq[String]](ctx.multipartIdentifier), + tagName, + tagOptions, + create, + replace, + ifNotExists) } - val tagOptions = TagOptions( - snapshotId, - timeRetained - ) - val ifNotExists = ctx.EXISTS() != null - CreateTagCommand( - typedVisit[Seq[String]](ctx.multipartIdentifier), - tagName, - tagOptions, - ifNotExists) - } /** Create a DELETE TAG logical command. */ override def visitDeleteTag(ctx: DeleteTagContext): DeleteTagCommand = withOrigin(ctx) { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala index 7c551f0c9379..5ad687b4da0f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala @@ -71,6 +71,35 @@ abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase { Row("tag-1", 3, null)) } + test("Tag ddl: alter table t create or replace tag syntax") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id')""".stripMargin) + + spark.sql("insert into T values(1, 'a')") + spark.sql("insert into T values(2, 'b')") + assertResult(2)(loadTable("T").snapshotManager().snapshotCount()) + + // test 'replace' syntax + spark.sql("alter table T create tag `tag-1` as of version 1") + spark.sql("alter table T replace tag `tag-1` as of version 2 RETAIN 1 HOURS") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"), + Row("tag-1", 2, "PT1H") :: Nil) + + // test 'create or replace' syntax + // tag-2 not exist, create it + spark.sql("alter table T create or replace tag `tag-2` as of version 1") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name = 'tag-2'"), + Row("tag-2", 1, null) :: Nil) + // tag-2 exists, replace it + spark.sql("alter table T create or replace tag `tag-2` as of version 2 RETAIN 1 HOURS") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name = 'tag-2'"), + Row("tag-2", 2, "PT1H") :: Nil) + } + test("Tag ddl: alter table t delete tag syntax") { spark.sql("""CREATE TABLE T (id INT, name STRING) |USING PAIMON