From df30308882c06c4f9ea6aeb30e7a0823efb7fbe1 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Wed, 6 Nov 2024 15:26:37 +0800 Subject: [PATCH 1/3] rollback ddl --- docs/content/spark/sql-ddl.md | 13 ++++ .../PaimonSqlExtensions.g4 | 12 +++- .../plans/logical/RollbackCommand.scala | 42 +++++++++++ .../spark/execution/PaimonStrategy.scala | 5 +- .../paimon/spark/execution/RollbackExec.scala | 70 +++++++++++++++++++ .../PaimonSparkSqlExtensionsParser.scala | 9 ++- .../PaimonSqlExtensionsAstBuilder.scala | 10 ++- .../paimon/spark/sql/PaimonDdlTest.scala | 61 ++++++++++++++++ 8 files changed, 218 insertions(+), 4 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonDdlTest.scala diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index 3b4d1722bc09..b0fed5a8fc00 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -265,3 +265,16 @@ List all tags of a table. SHOW TAGS T; ``` +## Rollback DDL +Rollback to a specific version of target table. +```sql +-- rollback to snapshot +ALTER TABLE T ROLLBACK TO SNAPSHOT `2`; + +-- rollback to tag +ALTER TABLE T ROLLBACK TO TAG `TEST-TAG`; + +-- rollback to timestamp +ALTER TABLE T ROLLBACK TO TIMESTAMP `1730876906134`; +``` + diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 207d9732160f..9abf7d8bf85c 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -74,6 +74,7 @@ statement | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag + | ALTER TABLE multipartIdentifier ROLLBACK TO kind identifier #rollback ; callArgument @@ -104,6 +105,12 @@ timeUnit | MINUTES ; +kind + : SNAPSHOT + | TAG + | TIMESTAMP + ; + expression : constant | stringMap @@ -152,7 +159,7 @@ quotedIdentifier nonReserved : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | OR | TABLE - | REPLACE | RETAIN | VERSION | TAG + | REPLACE | RETAIN | ROLLBACK | SNAPSHOT | TAG | TIMESTAMP | VERSION | TRUE | FALSE | MAP ; @@ -173,11 +180,14 @@ OR: 'OR'; RENAME: 'RENAME'; REPLACE: 'REPLACE'; RETAIN: 'RETAIN'; +ROLLBACK: 'ROLLBACK'; SHOW: 'SHOW'; +SNAPSHOT: 'SNAPSHOT'; TABLE: 'TABLE'; TAG: 'TAG'; TAGS: 'TAGS'; TO: 'TO'; +TIMESTAMP: 'TIMESTAMP'; VERSION: 'VERSION'; TRUE: 'TRUE'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala new file mode 100644 index 000000000000..3e610079775f --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + +case class RollbackCommand( + table: Seq[String], + kind: String, + version: String, + override val output: Seq[Attribute] = RollbackCommand.getOutputAttrs) + extends PaimonLeafCommand { + + override def simpleString(maxFields: Int): String = { + s"Rollback to $kind '$version' for table: $table" + } +} + +object RollbackCommand { + private def getOutputAttrs: Seq[Attribute] = { + Seq(AttributeReference("result", StringType, nullable = false)()) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index 0c3d3e6b6dc6..fd88f9e3798f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.{SparkCatalog, SparkUtils} -import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, RollbackCommand, ShowTagsCommand} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -65,6 +65,9 @@ case class PaimonStrategy(spark: SparkSession) case RenameTagCommand(PaimonCatalogAndIdentifier(catalog, ident), sourceTag, targetTag) => RenameTagExec(catalog, ident, sourceTag, targetTag) :: Nil + case RollbackCommand(PaimonCatalogAndIdentifier(catalog, ident), kind, value, output) => + RollbackExec(catalog, ident, output, kind, value) :: Nil + case _ => Nil } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala new file mode 100644 index 000000000000..3b5ec413d7da --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.unsafe.types.UTF8String + +import java.util.Locale + +case class RollbackExec( + catalog: TableCatalog, + ident: Identifier, + output: Seq[Attribute], + kind: String, + version: String) + extends PaimonLeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + + table.asInstanceOf[SparkTable].getTable match { + case paimonTable: FileStoreTable => + kind.toUpperCase(Locale.ROOT) match { + + case "SNAPSHOT" => + assert(version.chars.allMatch(Character.isDigit)) + paimonTable.rollbackTo(version.toLong) + + case "TAG" => + paimonTable.rollbackTo(version) + + case "TIMESTAMP" => + assert(version.chars.allMatch(Character.isDigit)) + val snapshot = paimonTable.snapshotManager.earlierOrEqualTimeMills(version.toLong) + assert(snapshot != null) + paimonTable.rollbackTo(snapshot.id); + + case _ => + throw new UnsupportedOperationException(s"Unsupported rollback kind '$kind'.") + } + case t => + throw new UnsupportedOperationException( + s"Can not delete tag for non-paimon FileStoreTable: $t") + } + Seq(InternalRow(UTF8String.fromString("success"))) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala index 78a7f80eaf38..ed946b121627 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala @@ -101,7 +101,9 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface) .replaceAll("\\s+", " ") .replaceAll("/\\*.*?\\*/", " ") .trim() - normalized.startsWith("call") || isTagRefDdl(normalized) + normalized.startsWith("call") || + isTagRefDdl(normalized) || + isPaimonDdl(normalized) } private def isTagRefDdl(normalized: String): Boolean = { @@ -113,6 +115,11 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface) normalized.contains("delete tag"))) } + private def isPaimonDdl(normalized: String): Boolean = { + normalized.startsWith("alter table") && + normalized.contains("rollback to") + } + protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { val lexer = new PaimonSqlExtensionsLexer( new UpperCaseCharStream(CharStreams.fromString(command))) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index b864894e7498..2969229b8c87 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser.extensions import org.apache.paimon.spark.catalyst.plans.logical -import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, RollbackCommand, ShowTagsCommand, TagOptions} import org.apache.paimon.utils.TimeUtils import org.antlr.v4.runtime._ @@ -153,6 +153,14 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ctx.identifier(1).getText) } + /** Create a ROLLBACK logical command. */ + override def visitRollback(ctx: RollbackContext): AnyRef = withOrigin(ctx) { + RollbackCommand( + typedVisit[Seq[String]](ctx.multipartIdentifier), + ctx.kind().getText, + ctx.identifier().getText) + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala private def toSeq[T](list: java.util.List[T]) = toBuffer(list) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonDdlTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonDdlTest.scala new file mode 100644 index 000000000000..3e0a97d252b1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonDdlTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +class PaimonDdlTest extends PaimonSparkTestBase { + + test("Paimon ddl: alter table t rollback syntax") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id')""".stripMargin) + + val table = loadTable("T") + val snapshotManager = table.snapshotManager() + + spark.sql("insert into T values(1, 'a')") + val timestamp = System.currentTimeMillis() + + spark.sql("insert into T values(2, 'b')") + spark.sql("insert into T values(3, 'c')") + spark.sql("insert into T values(4, 'd')") + assertResult(4)(snapshotManager.snapshotCount()) + + // rollback to snapshot + checkAnswer(spark.sql("alter table T rollback to snapshot `3`"), Row("success") :: Nil) + assertResult(3)(snapshotManager.latestSnapshotId()) + + // create a tag based on snapshot-2 + spark.sql("alter table T create tag `test-tag` as of version 2") + checkAnswer(spark.sql("show tags T"), Row("test-tag") :: Nil) + + // rollback to tag + checkAnswer(spark.sql("alter table T rollback to tag `test-tag`"), Row("success") :: Nil) + assertResult(2)(snapshotManager.latestSnapshotId()) + + // rollback to timestamp + checkAnswer( + spark.sql(s"alter table T rollback to timestamp `$timestamp`"), + Row("success") :: Nil) + assertResult(1)(snapshotManager.latestSnapshotId()) + } +} From cf275ee038f1f92f671ba12edf7e02ed2ef86a35 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:11:53 +0800 Subject: [PATCH 2/3] fix --- .../scala/org/apache/paimon/spark/execution/RollbackExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala index 3b5ec413d7da..4e9c23b518e8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala @@ -63,7 +63,7 @@ case class RollbackExec( } case t => throw new UnsupportedOperationException( - s"Can not delete tag for non-paimon FileStoreTable: $t") + s"Can not rollback for non-paimon FileStoreTable: $t") } Seq(InternalRow(UTF8String.fromString("success"))) } From d9324acff8c55b0ea522a4c10130123b138a4f8a Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Wed, 6 Nov 2024 21:17:09 +0800 Subject: [PATCH 3/3] rename --- .../PaimonSqlExtensions.g4 | 4 ++-- .../spark/catalyst/plans/logical/RollbackCommand.scala | 4 ++-- .../org/apache/paimon/spark/execution/RollbackExec.scala | 7 ++++--- .../parser/extensions/PaimonSqlExtensionsAstBuilder.scala | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 9abf7d8bf85c..e0a115251e05 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -74,7 +74,7 @@ statement | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag - | ALTER TABLE multipartIdentifier ROLLBACK TO kind identifier #rollback + | ALTER TABLE multipartIdentifier ROLLBACK TO timeTravelKind identifier #rollback ; callArgument @@ -105,7 +105,7 @@ timeUnit | MINUTES ; -kind +timeTravelKind : SNAPSHOT | TAG | TIMESTAMP diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala index 3e610079775f..d4687f24c96a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/RollbackCommand.scala @@ -25,13 +25,13 @@ import org.apache.spark.sql.types.StringType case class RollbackCommand( table: Seq[String], - kind: String, + timeTravelKind: String, version: String, override val output: Seq[Attribute] = RollbackCommand.getOutputAttrs) extends PaimonLeafCommand { override def simpleString(maxFields: Int): String = { - s"Rollback to $kind '$version' for table: $table" + s"Rollback to $timeTravelKind '$version' for table: $table" } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala index 4e9c23b518e8..4e0d2d139502 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/RollbackExec.scala @@ -33,7 +33,7 @@ case class RollbackExec( catalog: TableCatalog, ident: Identifier, output: Seq[Attribute], - kind: String, + timeTravelKind: String, version: String) extends PaimonLeafV2CommandExec { @@ -43,7 +43,7 @@ case class RollbackExec( table.asInstanceOf[SparkTable].getTable match { case paimonTable: FileStoreTable => - kind.toUpperCase(Locale.ROOT) match { + timeTravelKind.toUpperCase(Locale.ROOT) match { case "SNAPSHOT" => assert(version.chars.allMatch(Character.isDigit)) @@ -59,7 +59,8 @@ case class RollbackExec( paimonTable.rollbackTo(snapshot.id); case _ => - throw new UnsupportedOperationException(s"Unsupported rollback kind '$kind'.") + throw new UnsupportedOperationException( + s"Unsupported rollback timeTravelKind '$timeTravelKind'.") } case t => throw new UnsupportedOperationException( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index 2969229b8c87..ed0ce4e6f712 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -157,7 +157,7 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) override def visitRollback(ctx: RollbackContext): AnyRef = withOrigin(ctx) { RollbackCommand( typedVisit[Seq[String]](ctx.multipartIdentifier), - ctx.kind().getText, + ctx.timeTravelKind().getText, ctx.identifier().getText) }