From c956b3570c733370b503c11a8b59bacb54e8d6aa Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Wed, 30 Oct 2024 20:45:01 +0800 Subject: [PATCH] [core] Introduce RollbackToTimestamp Procedure and Action --- docs/content/flink/procedures.md | 22 +++++ docs/content/spark/procedures.md | 11 +++ .../RollbackToTimestampProcedure.java | 58 +++++++++++ .../action/RollbackToTimestampAction.java | 62 ++++++++++++ .../RollbackToTimestampActionFactory.java | 71 +++++++++++++ .../RollbackToTimestampProcedure.java | 65 ++++++++++++ .../org.apache.paimon.factories.Factory | 2 + .../flink/action/RollbackToActionITCase.java | 55 +++++++++++ .../apache/paimon/spark/SparkProcedures.java | 2 + .../RollbackToTimestampProcedure.java | 99 +++++++++++++++++++ .../procedure/RollbackProcedureTest.scala | 58 +++++++++++ 11 files changed, 505 insertions(+) create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 2ac9fb6258cb..dd6a223f643d 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -356,6 +356,28 @@ All available procedures are listed below. CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10) + + rollback_to_timestamp + + -- for Flink 1.18
+ -- rollback to the snapshot which earlier or equal than timestamp.
+ CALL sys.rollback_to('identifier', timestamp)

+ -- for Flink 1.19 and later
+ -- rollback to the snapshot which earlier or equal than timestamp.
+ CALL sys.rollback_to(`table` => 'default.T', `timestamp` => timestamp)

+ + + To rollback to the snapshot which earlier or equal than timestamp. Argument: +
  • identifier: the target table identifier. Cannot be empty.
  • +
  • timestamp (Long): Roll back to the snapshot which earlier or equal than timestamp.
  • + + + -- for Flink 1.18
    + CALL sys.rollback_to_timestamp('default.T', 10) + -- for Flink 1.19 and later
    + CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 1730292023000) + + expire_snapshots diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 8c5226921692..616d0bd49d02 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -153,6 +153,17 @@ This section introduce all available spark procedures about paimon. CALL sys.rollback(table => 'default.T', version => 10) + + rollback_to_timestamp + + To rollback to the snapshot which earlier or equal than timestamp. Argument: +
  • table: the target table identifier. Cannot be empty.
  • +
  • timestamp: roll back to the snapshot which earlier or equal than timestamp.
  • + + + CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000)

    + + migrate_database diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java new file mode 100644 index 000000000000..0b46561db30e --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Rollback procedure. Usage: + * + *
    
    + *  -- rollback to a snapshot
    + *  CALL sys.rollback_to_timestamp('tableId', timestamp)
    + * 
    + */ +public class RollbackToTimestampProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "rollback_to_timestamp"; + + public String[] call(ProcedureContext procedureContext, String tableId, long timestamp) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp); + Preconditions.checkNotNull( + snapshot, String.format("count not find snapshot earlier than %s", timestamp)); + fileStoreTable.rollbackTo(snapshot.id()); + + return new String[] {"Success"}; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java new file mode 100644 index 000000000000..ca706101cc1d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampAction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.DataTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** Rollback to specific timestamp action for Flink. */ +public class RollbackToTimestampAction extends TableActionBase { + + private static final Logger LOG = LoggerFactory.getLogger(RollbackToTimestampAction.class); + + private final Long timestamp; + + public RollbackToTimestampAction( + String warehouse, + String databaseName, + String tableName, + Long timestamp, + Map catalogConfig) { + super(warehouse, databaseName, tableName, catalogConfig); + this.timestamp = timestamp; + } + + @Override + public void run() throws Exception { + LOG.debug("Run rollback-to-timestamp action with timestamp '{}'.", timestamp); + + if (!(table instanceof DataTable)) { + throw new IllegalArgumentException("Unknown table: " + identifier); + } + + FileStoreTable fileStoreTable = (FileStoreTable) table; + Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp); + Preconditions.checkNotNull( + snapshot, String.format("count not find snapshot earlier than %s", timestamp)); + fileStoreTable.rollbackTo(snapshot.id()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java new file mode 100644 index 000000000000..c694ac0041b5 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToTimestampActionFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.flink.api.java.tuple.Tuple3; + +import java.util.Map; +import java.util.Optional; + +/** Factory to create {@link RollbackToTimestampAction}. */ +public class RollbackToTimestampActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "rollback_to_timestamp"; + + private static final String TIMESTAMP = "timestamp"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + Tuple3 tablePath = getTablePath(params); + + checkRequiredArgument(params, TIMESTAMP); + String timestamp = params.get(TIMESTAMP); + + Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); + + RollbackToTimestampAction action = + new RollbackToTimestampAction( + tablePath.f0, + tablePath.f1, + tablePath.f2, + Long.parseLong(timestamp), + catalogConfig); + + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"rollback_to_timestamp\" roll back a table to a specific timestamp."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " rollback_to --warehouse --database " + + "--table --timestamp "); + System.out.println(" can be a long value representing a timestamp."); + System.out.println(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java new file mode 100644 index 000000000000..fe2467810540 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Rollback to timestamp procedure. Usage: + * + *
    
    + *  -- rollback to a timestamp
    + *  CALL sys.rollback_to_timestamp(`table` => 'tableId', timestamp => timestamp)
    + * 
    + */ +public class RollbackToTimestampProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "rollback_to_timestamp"; + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "timestamp", type = @DataTypeHint("BIGINT")) + }) + public String[] call(ProcedureContext procedureContext, String tableId, Long timestamp) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp); + Preconditions.checkNotNull( + snapshot, String.format("count not find snapshot earlier than %s", timestamp)); + fileStoreTable.rollbackTo(snapshot.id()); + return new String[] {"Success"}; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 2cf57201d6ae..57aab9e5b14c 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -21,6 +21,7 @@ org.apache.paimon.flink.action.DropPartitionActionFactory org.apache.paimon.flink.action.DeleteActionFactory org.apache.paimon.flink.action.MergeIntoActionFactory org.apache.paimon.flink.action.RollbackToActionFactory +org.apache.paimon.flink.action.RollbackToTimestampActionFactory org.apache.paimon.flink.action.CreateTagActionFactory org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory org.apache.paimon.flink.action.CreateTagFromWatermarkActionFactory @@ -57,6 +58,7 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure org.apache.paimon.flink.procedure.MergeIntoProcedure org.apache.paimon.flink.procedure.ResetConsumerProcedure org.apache.paimon.flink.procedure.RollbackToProcedure +org.apache.paimon.flink.procedure.RollbackToTimestampProcedure org.apache.paimon.flink.procedure.MigrateTableProcedure org.apache.paimon.flink.procedure.MigrateDatabaseProcedure org.apache.paimon.flink.procedure.MigrateFileProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java index 859f8deda4d1..7dc9fec643cf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java @@ -160,4 +160,59 @@ public void rollbackToTagTest(String invoker) throws Exception { "SELECT * FROM `" + tableName + "`", Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Apache"))); } + + @ParameterizedTest + @ValueSource(strings = {"action", "procedure_named", "procedure_indexed"}) + public void rollbackToTimestampTest(String invoker) throws Exception { + FileStoreTable table = + createFileStoreTable( + ROW_TYPE, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Apache"))); + long timestamp = System.currentTimeMillis(); + writeData(rowData(2L, BinaryString.fromString("Paimon"))); + + switch (invoker) { + case "action": + createAction( + RollbackToTimestampAction.class, + "rollback_to_timestamp", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--timestamp", + timestamp + "") + .run(); + break; + case "procedure_indexed": + executeSQL( + String.format( + "CALL sys.rollback_to_timestamp('%s.%s', %s)", + database, tableName, timestamp)); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.rollback_to_timestamp(`table` => '%s.%s', `timestamp` => %s)", + database, tableName, timestamp)); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + testBatchRead( + "SELECT * FROM `" + tableName + "`", + Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Apache"))); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index dee0c38d46fb..7b7ff30acb7b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -40,6 +40,7 @@ import org.apache.paimon.spark.procedure.RepairProcedure; import org.apache.paimon.spark.procedure.ResetConsumerProcedure; import org.apache.paimon.spark.procedure.RollbackProcedure; +import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -63,6 +64,7 @@ private static Map> initProcedureBuilders() { ImmutableMap.Builder> procedureBuilders = ImmutableMap.builder(); procedureBuilders.put("rollback", RollbackProcedure::builder); + procedureBuilders.put("rollback_to_timestamp", RollbackToTimestampProcedure::builder); procedureBuilders.put("create_tag", CreateTagProcedure::builder); procedureBuilders.put("rename_tag", RenameTagProcedure::builder); procedureBuilders.put( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java new file mode 100644 index 000000000000..90fa10062741 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.LongType; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** A procedure to rollback to a timestamp. */ +public class RollbackToTimestampProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + // timestamp value + ProcedureParameter.required("timestamp", LongType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + private RollbackToTimestampProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + Long timestamp = args.getLong(1); + + return modifyPaimonTable( + tableIdent, + table -> { + FileStoreTable fileStoreTable = (FileStoreTable) table; + Snapshot snapshot = + fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp); + Preconditions.checkNotNull( + snapshot, + String.format("count not find snapshot earlier than %s", timestamp)); + fileStoreTable.rollbackTo(snapshot.id()); + InternalRow outputRow = newInternalRow(true); + return new InternalRow[] {outputRow}; + }); + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public RollbackToTimestampProcedure doBuild() { + return new RollbackToTimestampProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "RollbackToTimestampProcedure"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 945f70ce0e63..387a81dc66e0 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -93,4 +93,62 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { } } } + + test("Paimon Procedure: rollback to timestamp") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + val timestamp = System.currentTimeMillis() + + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + + // rollback to timestamp + checkAnswer( + spark.sql( + s"CALL paimon.sys.rollback_to_timestamp(table => 'test.T', timestamp => $timestamp)"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + } finally { + stream.stop() + } + } + } + } + }