Skip to content

Commit

Permalink
rollback ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Nov 6, 2024
1 parent 29c1347 commit df30308
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 4 deletions.
13 changes: 13 additions & 0 deletions docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,16 @@ List all tags of a table.
SHOW TAGS T;
```

## Rollback DDL
Rollback to a specific version of target table.
```sql
-- rollback to snapshot
ALTER TABLE T ROLLBACK TO SNAPSHOT `2`;

-- rollback to tag
ALTER TABLE T ROLLBACK TO TAG `TEST-TAG`;

-- rollback to timestamp
ALTER TABLE T ROLLBACK TO TIMESTAMP `1730876906134`;
```

Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ statement
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag
| ALTER TABLE multipartIdentifier ROLLBACK TO kind identifier #rollback
;

callArgument
Expand Down Expand Up @@ -104,6 +105,12 @@ timeUnit
| MINUTES
;

kind
: SNAPSHOT
| TAG
| TIMESTAMP
;

expression
: constant
| stringMap
Expand Down Expand Up @@ -152,7 +159,7 @@ quotedIdentifier

nonReserved
: ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | OR | TABLE
| REPLACE | RETAIN | VERSION | TAG
| REPLACE | RETAIN | ROLLBACK | SNAPSHOT | TAG | TIMESTAMP | VERSION
| TRUE | FALSE
| MAP
;
Expand All @@ -173,11 +180,14 @@ OR: 'OR';
RENAME: 'RENAME';
REPLACE: 'REPLACE';
RETAIN: 'RETAIN';
ROLLBACK: 'ROLLBACK';
SHOW: 'SHOW';
SNAPSHOT: 'SNAPSHOT';
TABLE: 'TABLE';
TAG: 'TAG';
TAGS: 'TAGS';
TO: 'TO';
TIMESTAMP: 'TIMESTAMP';
VERSION: 'VERSION';

TRUE: 'TRUE';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.plans.logical

import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType

case class RollbackCommand(
table: Seq[String],
kind: String,
version: String,
override val output: Seq[Attribute] = RollbackCommand.getOutputAttrs)
extends PaimonLeafCommand {

override def simpleString(maxFields: Int): String = {
s"Rollback to $kind '$version' for table: $table"
}
}

object RollbackCommand {
private def getOutputAttrs: Seq[Attribute] = {
Seq(AttributeReference("result", StringType, nullable = false)())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark.execution

import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallCommand, RenameTagCommand, RollbackCommand, ShowTagsCommand}

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -65,6 +65,9 @@ case class PaimonStrategy(spark: SparkSession)
case RenameTagCommand(PaimonCatalogAndIdentifier(catalog, ident), sourceTag, targetTag) =>
RenameTagExec(catalog, ident, sourceTag, targetTag) :: Nil

case RollbackCommand(PaimonCatalogAndIdentifier(catalog, ident), kind, value, output) =>
RollbackExec(catalog, ident, output, kind, value) :: Nil

case _ => Nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.execution

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.unsafe.types.UTF8String

import java.util.Locale

case class RollbackExec(
catalog: TableCatalog,
ident: Identifier,
output: Seq[Attribute],
kind: String,
version: String)
extends PaimonLeafV2CommandExec {

override protected def run(): Seq[InternalRow] = {
val table = catalog.loadTable(ident)
assert(table.isInstanceOf[SparkTable])

table.asInstanceOf[SparkTable].getTable match {
case paimonTable: FileStoreTable =>
kind.toUpperCase(Locale.ROOT) match {

case "SNAPSHOT" =>
assert(version.chars.allMatch(Character.isDigit))
paimonTable.rollbackTo(version.toLong)

case "TAG" =>
paimonTable.rollbackTo(version)

case "TIMESTAMP" =>
assert(version.chars.allMatch(Character.isDigit))
val snapshot = paimonTable.snapshotManager.earlierOrEqualTimeMills(version.toLong)
assert(snapshot != null)
paimonTable.rollbackTo(snapshot.id);

case _ =>
throw new UnsupportedOperationException(s"Unsupported rollback kind '$kind'.")
}
case t =>
throw new UnsupportedOperationException(
s"Can not delete tag for non-paimon FileStoreTable: $t")
}
Seq(InternalRow(UTF8String.fromString("success")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface)
.replaceAll("\\s+", " ")
.replaceAll("/\\*.*?\\*/", " ")
.trim()
normalized.startsWith("call") || isTagRefDdl(normalized)
normalized.startsWith("call") ||
isTagRefDdl(normalized) ||
isPaimonDdl(normalized)
}

private def isTagRefDdl(normalized: String): Boolean = {
Expand All @@ -113,6 +115,11 @@ class PaimonSparkSqlExtensionsParser(delegate: ParserInterface)
normalized.contains("delete tag")))
}

private def isPaimonDdl(normalized: String): Boolean = {
normalized.startsWith("alter table") &&
normalized.contains("rollback to")
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(command)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.catalyst.plans.logical
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument, PaimonPositionalArgument, RenameTagCommand, RollbackCommand, ShowTagsCommand, TagOptions}
import org.apache.paimon.utils.TimeUtils

import org.antlr.v4.runtime._
Expand Down Expand Up @@ -153,6 +153,14 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface)
ctx.identifier(1).getText)
}

/** Create a ROLLBACK logical command. */
override def visitRollback(ctx: RollbackContext): AnyRef = withOrigin(ctx) {
RollbackCommand(
typedVisit[Seq[String]](ctx.multipartIdentifier),
ctx.kind().getText,
ctx.identifier().getText)
}

private def toBuffer[T](list: java.util.List[T]) = list.asScala

private def toSeq[T](list: java.util.List[T]) = toBuffer(list)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row

class PaimonDdlTest extends PaimonSparkTestBase {

test("Paimon ddl: alter table t rollback syntax") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')""".stripMargin)

val table = loadTable("T")
val snapshotManager = table.snapshotManager()

spark.sql("insert into T values(1, 'a')")
val timestamp = System.currentTimeMillis()

spark.sql("insert into T values(2, 'b')")
spark.sql("insert into T values(3, 'c')")
spark.sql("insert into T values(4, 'd')")
assertResult(4)(snapshotManager.snapshotCount())

// rollback to snapshot
checkAnswer(spark.sql("alter table T rollback to snapshot `3`"), Row("success") :: Nil)
assertResult(3)(snapshotManager.latestSnapshotId())

// create a tag based on snapshot-2
spark.sql("alter table T create tag `test-tag` as of version 2")
checkAnswer(spark.sql("show tags T"), Row("test-tag") :: Nil)

// rollback to tag
checkAnswer(spark.sql("alter table T rollback to tag `test-tag`"), Row("success") :: Nil)
assertResult(2)(snapshotManager.latestSnapshotId())

// rollback to timestamp
checkAnswer(
spark.sql(s"alter table T rollback to timestamp `$timestamp`"),
Row("success") :: Nil)
assertResult(1)(snapshotManager.latestSnapshotId())
}
}

0 comments on commit df30308

Please sign in to comment.