From 28cbc716d8aafae7277c869ce699f132d7ec0f82 Mon Sep 17 00:00:00 2001
From: xuyu <11161569@vivo.com>
Date: Thu, 31 Oct 2024 10:10:15 +0800
Subject: [PATCH] address
---
docs/content/flink/procedures.md | 2 +-
.../procedure/RollbackToTimestampProcedure.java | 9 +++++----
.../procedure/RollbackToTimestampProcedure.java | 7 ++++---
.../procedure/RollbackToTimestampProcedure.java | 14 ++++++++++----
.../spark/procedure/RollbackProcedureTest.scala | 2 +-
5 files changed, 21 insertions(+), 13 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index dd6a223f643d..5d82990d8e1d 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -375,7 +375,7 @@ All available procedures are listed below.
-- for Flink 1.18
CALL sys.rollback_to_timestamp('default.T', 10)
-- for Flink 1.19 and later
- CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 1730292023000)
+ CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000)
- * -- rollback to a snapshot
+ * -- rollback to the snapshot which earlier or equal than timestamp.
* CALL sys.rollback_to_timestamp('tableId', timestamp)
*
*/
@@ -41,14 +41,15 @@ public class RollbackToTimestampProcedure extends ProcedureBase {
public String[] call(ProcedureContext procedureContext, String tableId, long timestamp)
throws Catalog.TableNotExistException {
+ Preconditions.checkNotNull(tableId, "table can not be empty");
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
- fileStoreTable.rollbackTo(snapshot.id());
-
- return new String[] {"Success"};
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
index fe2467810540..f84dab8eab89 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
@@ -34,7 +34,7 @@
* Rollback to timestamp procedure. Usage:
*
*
- * -- rollback to a timestamp
+ * -- rollback to the snapshot which earlier or equal than timestamp.
* CALL sys.rollback_to_timestamp(`table` => 'tableId', timestamp => timestamp)
*
*/
@@ -54,8 +54,9 @@ public String[] call(ProcedureContext procedureContext, String tableId, Long tim
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
- fileStoreTable.rollbackTo(snapshot.id());
- return new String[] {"Success"};
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
}
@Override
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
index 90fa10062741..a01f08b3fc7d 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
@@ -25,10 +25,10 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -46,7 +46,7 @@ public class RollbackToTimestampProcedure extends BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
+ new StructField("result", StringType, true, Metadata.empty())
});
private RollbackToTimestampProcedure(TableCatalog tableCatalog) {
@@ -77,8 +77,14 @@ public InternalRow[] call(InternalRow args) {
Preconditions.checkNotNull(
snapshot,
String.format("count not find snapshot earlier than %s", timestamp));
- fileStoreTable.rollbackTo(snapshot.id());
- InternalRow outputRow = newInternalRow(true);
+ long snapshotId = snapshot.id();
+ fileStoreTable.rollbackTo(snapshotId);
+ InternalRow outputRow =
+ newInternalRow(
+ UTF8String.fromString(
+ String.format(
+ "Success roll back to snapshot: %s .",
+ snapshotId)));
return new InternalRow[] {outputRow};
});
}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index 387a81dc66e0..457c5ba513ec 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -141,7 +141,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
checkAnswer(
spark.sql(
s"CALL paimon.sys.rollback_to_timestamp(table => 'test.T', timestamp => $timestamp)"),
- Row(true) :: Nil)
+ Row("Success roll back to snapshot: 2 .") :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
} finally {