From 8bc0a5b3d758165f41b1a3a9c4b866f2b2d44260 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sun, 13 Oct 2024 23:45:22 +0800 Subject: [PATCH 1/2] [core] Support RollbackAction and Procedure with timestammp --- .../paimon/flink/action/RollbackToAction.java | 16 ++++- .../flink/action/RollbackToActionFactory.java | 14 ++++- .../flink/procedure/RollbackToProcedure.java | 16 ++++- .../flink/action/RollbackToActionITCase.java | 58 +++++++++++++++++++ .../spark/procedure/RollbackProcedure.java | 24 ++++++-- .../procedure/RollbackProcedureTest.scala | 16 ++++- 6 files changed, 131 insertions(+), 13 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java index 431383d38215..eaf39bccc95d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.action; import org.apache.paimon.table.DataTable; +import org.apache.paimon.table.FileStoreTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,14 +33,18 @@ public class RollbackToAction extends TableActionBase { private final String version; + private final Boolean isTimestamp; + public RollbackToAction( String warehouse, String databaseName, String tableName, String version, + Boolean isTimestamp, Map catalogConfig) { super(warehouse, databaseName, tableName, catalogConfig); this.version = version; + this.isTimestamp = isTimestamp; } @Override @@ -51,7 +56,16 @@ public void run() throws Exception { } if (version.chars().allMatch(Character::isDigit)) { - table.rollbackTo(Long.parseLong(version)); + if (isTimestamp != null && isTimestamp) { + FileStoreTable fileStoreTable = (FileStoreTable) table; + fileStoreTable.rollbackTo( + fileStoreTable + .snapshotManager() + .earlierOrEqualTimeMills(Long.parseLong(version)) + .id()); + } else { + table.rollbackTo(Long.parseLong(version)); + } } else { table.rollbackTo(version); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java index 077c608acab5..6b7364d35b83 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToActionFactory.java @@ -30,6 +30,8 @@ public class RollbackToActionFactory implements ActionFactory { private static final String VERSION = "version"; + private static final String IS_TIMESTAMP = "is_timestamp"; + @Override public String identifier() { return IDENTIFIER; @@ -41,12 +43,18 @@ public Optional create(MultipleParameterToolAdapter params) { checkRequiredArgument(params, VERSION); String version = params.get(VERSION); + Boolean isTimestamp = Boolean.parseBoolean(params.get(IS_TIMESTAMP)); Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); RollbackToAction action = new RollbackToAction( - tablePath.f0, tablePath.f1, tablePath.f2, version, catalogConfig); + tablePath.f0, + tablePath.f1, + tablePath.f2, + version, + isTimestamp, + catalogConfig); return Optional.of(action); } @@ -60,9 +68,9 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " rollback_to --warehouse --database " - + "--table --version "); + + "--table --version --is_timestamp "); System.out.println( - " can be a long value representing a snapshot ID or a tag name."); + " can be a long value representing a snapshot ID or a tag name or a timestamp."); System.out.println(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java index 9bca6505c99e..dfcbe111673e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.StringUtils; @@ -50,16 +51,25 @@ public class RollbackToProcedure extends ProcedureBase { @ArgumentHint( name = "snapshot_id", type = @DataTypeHint("BIGINT"), - isOptional = true) + isOptional = true), + @ArgumentHint(name = "timestamp", type = @DataTypeHint("BIGINT"), isOptional = true) }) public String[] call( - ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId) + ProcedureContext procedureContext, + String tableId, + String tagName, + Long snapshotId, + Long timestamp) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (!StringUtils.isNullOrWhitespaceOnly(tagName)) { table.rollbackTo(tagName); - } else { + } else if (snapshotId != null) { table.rollbackTo(snapshotId); + } else { + FileStoreTable fileStoreTable = (FileStoreTable) table; + fileStoreTable.rollbackTo( + fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp).id()); } return new String[] {"Success"}; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java index 859f8deda4d1..489a61c5fcdb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java @@ -49,6 +49,64 @@ public void setUp() { init(warehouse); } + @ParameterizedTest + @ValueSource(strings = {"action", "procedure_named", "procedure_indexed"}) + public void rollbackToTimestampTest(String invoker) throws Exception { + FileStoreTable table = + createFileStoreTable( + ROW_TYPE, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(2L, BinaryString.fromString("World"))); + long ts = System.currentTimeMillis(); + writeData(rowData(2L, BinaryString.fromString("Flink"))); + + switch (invoker) { + case "action": + createAction( + RollbackToAction.class, + "rollback_to", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--version", + ts + "", + "--is_timestamp", + "true") + .run(); + break; + case "procedure_indexed": + executeSQL( + String.format( + "CALL sys.rollback_to('%s.%s', '', null, cast(%s as bigint))", + database, tableName, ts)); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.rollback_to(`table` => '%s.%s', `timestamp` => %s)", + database, tableName, ts)); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + testBatchRead( + "SELECT * FROM `" + tableName + "`", + Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "World"))); + } + @ParameterizedTest @ValueSource(strings = {"action", "procedure_named", "procedure_indexed"}) public void rollbackToSnapshotTest(String invoker) throws Exception { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 6d004e946607..4a49d5d1e29d 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.table.FileStoreTable; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,16 +28,18 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; -/** A procedure to rollback to a snapshot or a tag. */ +/** A procedure to rollback to a snapshot or a tag or a timestamp. */ public class RollbackProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", StringType), - // snapshot id or tag name - ProcedureParameter.required("version", StringType) + // snapshot id or tag name or timestamp + ProcedureParameter.required("version", StringType), + ProcedureParameter.optional("isTimestamp", BooleanType) }; private static final StructType OUTPUT_TYPE = @@ -62,14 +66,24 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String version = args.getString(1); + Boolean isTimestamp = args.isNullAt(2) ? null : args.getBoolean(2); return modifyPaimonTable( tableIdent, table -> { + FileStoreTable fileStoreTable = (FileStoreTable) table; if (version.chars().allMatch(Character::isDigit)) { - table.rollbackTo(Long.parseLong(version)); + if (isTimestamp != null && isTimestamp) { + fileStoreTable.rollbackTo( + fileStoreTable + .snapshotManager() + .earlierOrEqualTimeMills(Long.parseLong(version)) + .id()); + } else { + fileStoreTable.rollbackTo(Long.parseLong(version)); + } } else { - table.rollbackTo(version); + fileStoreTable.rollbackTo(version); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 945f70ce0e63..bc9ee91ef193 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -68,17 +68,31 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { inputData.addData((2, "b")) stream.processAllAvailable() checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + val ts = System.currentTimeMillis // snapshot-3 inputData.addData((2, "b2")) stream.processAllAvailable() checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + + // snapshot-4 + inputData.addData((2, "b3")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b3") :: Nil) + assertThrows[RuntimeException] { spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')") } // rollback to snapshot checkAnswer( - spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"), + spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"), + Row(true) :: Nil) + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + + // rollback to timestamp + checkAnswer( + spark.sql( + s"CALL paimon.sys.rollback(table => 'test.T', version => '$ts', isTimestamp => true)"), Row(true) :: Nil) checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) From 1a8873c4f7b0b823d21b5f4977160d4e5dbccf29 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sun, 20 Oct 2024 16:05:30 +0800 Subject: [PATCH 2/2] address --- docs/content/flink/procedures.md | 3 +++ docs/content/spark/procedures.md | 2 ++ .../paimon/flink/action/RollbackToAction.java | 13 +++++++++---- .../paimon/flink/procedure/RollbackToProcedure.java | 8 ++++++-- .../paimon/spark/procedure/RollbackProcedure.java | 2 +- .../spark/procedure/RollbackProcedureTest.scala | 7 ++++--- 6 files changed, 25 insertions(+), 10 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index ce8c8043ae40..d4b68f96f309 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -325,12 +325,15 @@ All available procedures are listed below. CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId)

-- rollback to a tag
CALL sys.rollback_to(`table` => 'identifier', tag => 'tagName') + -- rollback to a timestamp
+ CALL sys.rollback_to(`table` => 'identifier', timestamp => 'timestamp') To rollback to a specific version of target table. Argument:
  • identifier: the target table identifier. Cannot be empty.
  • snapshotId (Long): id of the snapshot that will roll back to.
  • tagName: name of the tag that will roll back to.
  • +
  • timestamp: earlier or equal to the timestamp that will roll back to.
  • -- for Flink 1.18
    diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index d0d8f2397ccb..ee629596e479 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -136,10 +136,12 @@ This section introduce all available spark procedures about paimon. To rollback to a specific version of target table. Argument:
  • table: the target table identifier. Cannot be empty.
  • version: id of the snapshot or name of tag that will roll back to.
  • +
  • is_timestamp: whether version is a timestamp.
  • CALL sys.rollback(table => 'default.T', version => 'my_tag')

    CALL sys.rollback(table => 'default.T', version => 10) + CALL sys.rollback(table => 'default.T', version => 1729410996001, is_timestamp = true) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java index eaf39bccc95d..d0a013a7f83a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RollbackToAction.java @@ -18,8 +18,10 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.Snapshot; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,14 +57,17 @@ public void run() throws Exception { throw new IllegalArgumentException("Unknown table: " + identifier); } + FileStoreTable fileStoreTable = (FileStoreTable) table; if (version.chars().allMatch(Character::isDigit)) { if (isTimestamp != null && isTimestamp) { - FileStoreTable fileStoreTable = (FileStoreTable) table; - fileStoreTable.rollbackTo( + Snapshot snapshot = fileStoreTable .snapshotManager() - .earlierOrEqualTimeMills(Long.parseLong(version)) - .id()); + .earlierOrEqualTimeMills(Long.parseLong(version)); + Preconditions.checkNotNull( + snapshot, + String.format("count not find snapshot earlier than %s", version)); + fileStoreTable.rollbackTo(snapshot.id()); } else { table.rollbackTo(Long.parseLong(version)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java index dfcbe111673e..48a2efdd43e5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java @@ -18,10 +18,12 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -68,8 +70,10 @@ public String[] call( table.rollbackTo(snapshotId); } else { FileStoreTable fileStoreTable = (FileStoreTable) table; - fileStoreTable.rollbackTo( - fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp).id()); + Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp); + Preconditions.checkNotNull( + snapshot, String.format("count not find snapshot earlier than %s", timestamp)); + fileStoreTable.rollbackTo(snapshot.id()); } return new String[] {"Success"}; } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java index 4a49d5d1e29d..ffbc16833539 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java @@ -39,7 +39,7 @@ public class RollbackProcedure extends BaseProcedure { ProcedureParameter.required("table", StringType), // snapshot id or tag name or timestamp ProcedureParameter.required("version", StringType), - ProcedureParameter.optional("isTimestamp", BooleanType) + ProcedureParameter.optional("is_timestamp", BooleanType) }; private static final StructType OUTPUT_TYPE = diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index bc9ee91ef193..fb0fac0c4baa 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -28,7 +28,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { import testImplicits._ - test("Paimon Procedure: rollback to snapshot and tag") { + test("Paimon Procedure: rollback to snapshot and tag and timestamp") { failAfter(streamingTimeout) { withTempDir { checkpointDir => @@ -68,6 +68,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { inputData.addData((2, "b")) stream.processAllAvailable() checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + val ts = System.currentTimeMillis // snapshot-3 @@ -89,10 +90,10 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { Row(true) :: Nil) checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) - // rollback to timestamp + // rollback with timestamp checkAnswer( spark.sql( - s"CALL paimon.sys.rollback(table => 'test.T', version => '$ts', isTimestamp => true)"), + s"CALL paimon.sys.rollback(table => 'test.T', version => '$ts', is_timestamp => true)"), Row(true) :: Nil) checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)