diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index 35b2e53514c1..b9ba8c57448a 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -210,8 +210,48 @@ CREATE TABLE my_table_all ( CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all; ``` -## Show Tags -The SHOW TAGS statement is used to list all tags of a table. +## Tag DDL +### Create Tag +Create a tag based on snapshot or retention. +```sql +-- create a tag based on lastest snapshot and no retention. +ALTER TABLE T CREATE TAG `TAG-1`; + +-- create a tag based on lastest snapshot and no retention if it doesn't exist. +ALTER TABLE T CREATE TAG IF NOT EXISTS `TAG-1`; + +-- create a tag based on lastest snapshot and retain it for 7 day. +ALTER TABLE T CREATE TAG `TAG-2` RETAIN 7 DAYS; + +-- create a tag based on snapshot 1 and no retention. +ALTER TABLE T CREATE TAG `TAG-3` AS OF VERSION 1; + +-- create a tag based on snapshot 2 and retain it for 12 hour. +ALTER TABLE T CREATE TAG `TAG-4` AS OF VERSION 2 RETAIN 12 HOURS; +``` + +### Delete Tag +Delete a tag or multiple tags of a table. +```sql +-- delete a tag. +ALTER TABLE T DELETE TAG `TAG-1`; + +-- delete a tag if it exists. +ALTER TABLE T DELETE TAG IF EXISTS `TAG-1` + +-- delete multiple tags, delimiter is ','. +ALTER TABLE T DELETE TAG `TAG-1,TAG-2`; +``` + +### Rename Tag +Rename an existing tag with a new tag name. +```sql +ALTER TABLE T RENAME TAG `TAG-1` TO `TAG-2`; ``` -SHOW TAGS my_table; + +### Show Tags +List all tags of a table. +```sql +SHOW TAGS T; ``` + diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TimeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TimeUtils.java index 3b8e2ec81275..a101098f68c6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TimeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TimeUtils.java @@ -104,6 +104,26 @@ public static Duration parseDuration(String text) { } } + /** + * Parse the given number and unitLabel to a java {@link Duration}. The usage is in format + * "(digital number, time unit label)", e.g. "(1, DAYS)". + * + * @param number a digital number + * @param unitLabel time unit label + */ + public static Duration parseDuration(Long number, String unitLabel) { + ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel.toLowerCase(Locale.US)); + if (unit != null) { + return Duration.of(number, unit); + } else { + throw new IllegalArgumentException( + "Time interval unit label '" + + unitLabel + + "' does not match any of the recognized units: " + + TimeUnit.getAllUnits()); + } + } + private static Map initMap() { Map labelToUnit = new HashMap<>(); for (TimeUnit timeUnit : TimeUnit.values()) { diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala similarity index 94% rename from paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala rename to paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala index 36b42e6b677e..2d199491dc0a 100644 --- a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala @@ -18,4 +18,4 @@ package org.apache.paimon.spark.sql -class ShowTagsTest extends PaimonShowTagsTestBase {} +class TagDdlTest extends PaimonTagDdlTestBase diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala similarity index 94% rename from paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala rename to paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala index 36b42e6b677e..2d199491dc0a 100644 --- a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala @@ -18,4 +18,4 @@ package org.apache.paimon.spark.sql -class ShowTagsTest extends PaimonShowTagsTestBase {} +class TagDdlTest extends PaimonTagDdlTestBase diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala similarity index 94% rename from paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala rename to paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala index 36b42e6b677e..2d199491dc0a 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala @@ -18,4 +18,4 @@ package org.apache.paimon.spark.sql -class ShowTagsTest extends PaimonShowTagsTestBase {} +class TagDdlTest extends PaimonTagDdlTestBase diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala similarity index 94% rename from paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala rename to paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala index 36b42e6b677e..92309d54167b 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/ShowTagsTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/TagDdlTest.scala @@ -18,4 +18,4 @@ package org.apache.paimon.spark.sql -class ShowTagsTest extends PaimonShowTagsTestBase {} +class TagDdlTest extends PaimonTagDdlTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index d29aaa423899..e835b00cdc7d 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -71,13 +71,34 @@ singleStatement statement : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call | SHOW TAGS multipartIdentifier #showTags - ; + | ALTER TABLE multipartIdentifier CREATE TAG (IF NOT EXISTS)? identifier tagOptions #createTag + | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag + | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag + ; callArgument : expression #positionalArgument | identifier '=>' expression #namedArgument ; +tagOptions + : (AS OF VERSION snapshotId)? (timeRetain)? + ; + +snapshotId + : number + ; + +timeRetain + : RETAIN number timeUnit + ; + +timeUnit + : DAYS + | HOURS + | MINUTES + ; + expression : constant | stringMap @@ -125,14 +146,32 @@ quotedIdentifier ; nonReserved - : CALL + : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | TABLE + | RETAIN | VERSION | TAG | TRUE | FALSE | MAP ; +ALTER: 'ALTER'; +AS: 'AS'; CALL: 'CALL'; +CREATE: 'CREATE'; +DAYS: 'DAYS'; +DELETE: 'DELETE'; +EXISTS: 'EXISTS'; +HOURS: 'HOURS'; +IF : 'IF'; +MINUTES: 'MINUTES'; +NOT: 'NOT'; +OF: 'OF'; +RENAME: 'RENAME'; +RETAIN: 'RETAIN'; SHOW: 'SHOW'; +TABLE: 'TABLE'; +TAG: 'TAG'; TAGS: 'TAGS'; +TO: 'TO'; +VERSION: 'VERSION'; TRUE: 'TRUE'; FALSE: 'FALSE'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala new file mode 100644 index 000000000000..226311663bf5 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CreateTagCommand.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class CreateTagCommand( + table: Seq[String], + tagName: String, + tagOptions: TagOptions, + ifNotExists: Boolean) + extends PaimonLeafCommand { + + override def output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"Create tag: $tagName for table: $table" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/DeleteTagCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/DeleteTagCommand.scala new file mode 100644 index 000000000000..072ed6b09f39 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/DeleteTagCommand.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class DeleteTagCommand(table: Seq[String], tagStr: String, ifExists: Boolean) + extends PaimonLeafCommand { + + override def output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"Delete tag: $tagStr for table: $table" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RenameTagCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RenameTagCommand.scala new file mode 100644 index 000000000000..df68c40382e7 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RenameTagCommand.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} + +case class RenameTagCommand(table: Seq[String], sourceTag: String, targetTag: String) + extends PaimonLeafCommand { + + override def output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"Rename tag from $sourceTag to $targetTag for table: $table" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TagOptions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TagOptions.scala new file mode 100644 index 000000000000..242e9dac15a6 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TagOptions.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import java.time.Duration + +case class TagOptions(snapshotId: Option[Long], timeRetained: Option[Duration]) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala new file mode 100644 index 000000000000..57593c3a684e --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CreateTagExec.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.catalyst.plans.logical.TagOptions +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} + +case class CreateTagExec( + catalog: TableCatalog, + ident: Identifier, + tagName: String, + tagOptions: TagOptions, + ifNotExists: Boolean) + extends PaimonLeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + + table.asInstanceOf[SparkTable].getTable match { + case paimonTable: FileStoreTable => + val tagIsExists = paimonTable.tagManager().tagExists(tagName) + if (tagIsExists && ifNotExists) { + return Nil + } + val timeRetained = tagOptions.timeRetained.orNull + if (tagOptions.snapshotId.isEmpty) { + paimonTable.createTag(tagName, timeRetained) + } else { + paimonTable.createTag(tagName, tagOptions.snapshotId.get, timeRetained) + } + case t => + throw new UnsupportedOperationException( + s"Can not create tag for non-paimon FileStoreTable: $t") + } + Nil + } + + override def output: Seq[Attribute] = Nil +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/DeleteTagExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/DeleteTagExec.scala new file mode 100644 index 000000000000..d27839bdc6b6 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/DeleteTagExec.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} + +case class DeleteTagExec( + catalog: TableCatalog, + ident: Identifier, + tagStr: String, + ifExists: Boolean) + extends PaimonLeafV2CommandExec { + + private val DELIMITER = "," + + override protected def run(): Seq[InternalRow] = { + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + + table.asInstanceOf[SparkTable].getTable match { + case paimonTable: FileStoreTable => + val tagNames = tagStr.split(DELIMITER).map(_.trim) + for (tagName <- tagNames) { + val tagIsExists = paimonTable.tagManager().tagExists(tagName) + if (tagIsExists || !ifExists) { + paimonTable.deleteTag(tagName) + } + } + case t => + throw new UnsupportedOperationException( + s"Can not delete tag for non-paimon FileStoreTable: $t") + } + Nil + } + + override def output: Seq[Attribute] = Nil +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index 21b809d1e21b..d715ef2f5e4e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.{SparkCatalog, SparkUtils} -import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallCommand, ShowTagsCommand} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -49,6 +49,20 @@ case class PaimonStrategy(spark: SparkSession) case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) => ShowTagsExec(catalog, ident, t.output) :: Nil + + case CreateTagCommand( + PaimonCatalogAndIdentifier(table, ident), + tagName, + tagOptions, + ifNotExists) => + CreateTagExec(table, ident, tagName, tagOptions, ifNotExists) :: Nil + + case DeleteTagCommand(PaimonCatalogAndIdentifier(catalog, ident), tagStr, ifExists) => + DeleteTagExec(catalog, ident, tagStr, ifExists) :: Nil + + case RenameTagCommand(PaimonCatalogAndIdentifier(catalog, ident), sourceTag, targetTag) => + RenameTagExec(catalog, ident, sourceTag, targetTag) :: Nil + case _ => Nil } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RenameTagExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RenameTagExec.scala new file mode 100644 index 000000000000..655cf70b9cc1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RenameTagExec.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} + +case class RenameTagExec( + catalog: TableCatalog, + ident: Identifier, + sourceTag: String, + targetTag: String) + extends PaimonLeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + + table.asInstanceOf[SparkTable].getTable match { + case paimonTable: FileStoreTable => + paimonTable.renameTag(sourceTag, targetTag) + case t => + throw new UnsupportedOperationException( + s"Can not rename tag for non-paimon FileStoreTable: $t") + } + Nil + } + + override def output: Seq[Attribute] = Nil +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala index 47991228acdf..f7e8a8506375 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala @@ -101,8 +101,14 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface) .replaceAll("\\s+", " ") .replaceAll("/\\*.*?\\*/", " ") .trim() - normalized.startsWith("call") || - normalized.contains("show tags") + normalized.startsWith("call") || isTagRefDdl(normalized) + } + + private def isTagRefDdl(normalized: String): Boolean = { + normalized.startsWith("show tags") || (normalized + .startsWith("alter table") && (normalized.contains("create tag") || + normalized.contains("rename tag") || + normalized.contains("delete tag"))) } protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index 1fe92a1b38bf..06d57d597e25 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.parser.extensions import org.apache.paimon.spark.catalyst.plans.logical -import org.apache.paimon.spark.catalyst.plans.logical.{PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, ShowTagsCommand} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions} +import org.apache.paimon.utils.TimeUtils import org.antlr.v4.runtime._ import org.antlr.v4.runtime.misc.Interval @@ -92,10 +93,53 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ctx.parts.asScala.map(_.getText).toSeq } - override def visitShowTags(ctx: ShowTagsContext): AnyRef = withOrigin(ctx) { + /** Create a SHOW TAGS logical command. */ + override def visitShowTags(ctx: ShowTagsContext): ShowTagsCommand = withOrigin(ctx) { ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier)) } + /** Create a CREATE TAG logical command. */ + override def visitCreateTag(ctx: CreateTagContext): CreateTagCommand = withOrigin(ctx) { + val tagName = ctx.identifier().getText + val tagOptionsContext = Option(ctx.tagOptions()) + val snapshotId = + tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId())).map(_.getText.toLong) + val timeRetainCtx = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.timeRetain())) + val timeRetained = if (timeRetainCtx.nonEmpty) { + val (number, timeUnit) = + timeRetainCtx.map(retain => (retain.number().getText.toLong, retain.timeUnit().getText)).get + Option(TimeUtils.parseDuration(number, timeUnit)) + } else { + None + } + val tagOptions = TagOptions( + snapshotId, + timeRetained + ) + val ifNotExists = ctx.EXISTS() != null + CreateTagCommand( + typedVisit[Seq[String]](ctx.multipartIdentifier), + tagName, + tagOptions, + ifNotExists) + } + + /** Create a DELETE TAG logical command. */ + override def visitDeleteTag(ctx: DeleteTagContext): DeleteTagCommand = withOrigin(ctx) { + DeleteTagCommand( + typedVisit[Seq[String]](ctx.multipartIdentifier), + ctx.identifier().getText, + ctx.EXISTS() != null) + } + + /** Create a RENAME TAG logical command. */ + override def visitRenameTag(ctx: RenameTagContext): RenameTagCommand = withOrigin(ctx) { + RenameTagCommand( + typedVisit[Seq[String]](ctx.multipartIdentifier), + ctx.identifier(0).getText, + ctx.identifier(1).getText) + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala private def toSeq[T](list: java.util.List[T]) = toBuffer(list) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala index 4d8f0b9b4b82..08d9dceff3fa 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonShowTagsTestBase.scala @@ -22,22 +22,101 @@ import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.spark.sql.Row -abstract class PaimonShowTagsTestBase extends PaimonSparkTestBase { +abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase { + test("Tag ddl: show tags syntax") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id')""".stripMargin) + + spark.sql("insert into T values(1, 'a')") + + spark.sql("alter table T create tag `2024-10-12`") + spark.sql("alter table T create tag `2024-10-11`") + spark.sql("alter table T create tag `2024-10-13`") + + checkAnswer( + spark.sql("show tags T"), + Row("2024-10-11") :: Row("2024-10-12") :: Row("2024-10-13") :: Nil) + } - test("Paimon DDL: show tags for table") { + test("Tag ddl: alter table t crete tag syntax") { spark.sql("""CREATE TABLE T (id INT, name STRING) |USING PAIMON |TBLPROPERTIES ('primary-key'='id')""".stripMargin) spark.sql("insert into T values(1, 'a')") spark.sql("insert into T values(2, 'b')") + spark.sql("insert into T values(3, 'c')") + val table = loadTable("T") + assertResult(3)(table.snapshotManager().snapshotCount()) + + spark.sql("alter table T create tag `tag-1`") + spark.sql("alter table T create tag `tag-2` RETAIN 2 DAYS") + spark.sql("alter table T create tag `tag-3` as of version 1") + spark.sql("alter table T create tag `tag-4` as of version 2 RETAIN 3 HOURS") + assertResult(4)(spark.sql("show tags T").count()) - spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-12')") - spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-11')") - spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => '2024-10-13')") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags`"), + Row("tag-1", 3, null) :: Row("tag-2", 3, "PT48H") :: Row("tag-3", 1, null) :: Row( + "tag-4", + 2, + "PT3H") :: Nil + ) + + // update the tag info if tag exists + spark.sql("alter table T create tag `tag-1` RETAIN 1 HOURS") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name='tag-1'"), + Row("tag-1", 3, "PT1H")) + // not update tag with 'if not exists' syntax, although tag already exists + spark.sql("alter table T create tag if not exists `tag-1` RETAIN 10 HOURS") + checkAnswer( + spark.sql("select tag_name,snapshot_id,time_retained from `T$tags` where tag_name='tag-1'"), + Row("tag-1", 3, "PT1H")) + } + + test("Tag ddl: alter table t delete tag syntax") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id')""".stripMargin) + + spark.sql("insert into T values(1, 'a')") + assertResult(1)(loadTable("T").snapshotManager().snapshotCount()) + + spark.sql("alter table T create tag `2024-10-12`") + spark.sql("alter table T create tag `2024-10-15`") + spark.sql("alter table T create tag `2024-10-13`") + spark.sql("alter table T create tag `2024-10-14`") checkAnswer( spark.sql("show tags T"), - Row("2024-10-11") :: Row("2024-10-12") :: Row("2024-10-13") :: Nil) + Row("2024-10-12") :: Row("2024-10-13") :: Row("2024-10-14") :: Row("2024-10-15") :: Nil) + + spark.sql("alter table T delete tag `2024-10-12`") + checkAnswer( + spark.sql("show tags T"), + Row("2024-10-13") :: Row("2024-10-14") :: Row("2024-10-15") :: Nil) + + spark.sql("alter table T delete tag `2024-10-13, 2024-10-14`") + checkAnswer(spark.sql("show tags T"), Row("2024-10-15") :: Nil) + + spark.sql("alter table T delete tag if EXISTS `2024-10-18`") + checkAnswer(spark.sql("show tags T"), Row("2024-10-15") :: Nil) + } + + test("Tag ddl: alter table t rename tag syntax") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id')""".stripMargin) + + spark.sql("insert into T values(1, 'a')") + assertResult(1)(loadTable("T").snapshotManager().snapshotCount()) + + spark.sql("alter table T create tag `tag-1`") + checkAnswer(spark.sql("show tags T"), Row("tag-1")) + + spark.sql("alter table T rename tag `tag-1` to `tag-2`") + checkAnswer(spark.sql("show tags T"), Row("tag-2")) } }