Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyu committed Oct 31, 2024
1 parent 4e26508 commit 28cbc71
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ All available procedures are listed below.
-- for Flink 1.18<br/>
CALL sys.rollback_to_timestamp('default.T', 10)
-- for Flink 1.19 and later<br/>
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 1730292023000)
CALL sys.rollback_to_timestamp(`table` => 'default.T', timestamp => 1730292023000)
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Rollback to timestamp procedure. Usage:
*
* <pre><code>
* -- rollback to a snapshot
* -- rollback to the snapshot which earlier or equal than timestamp.
* CALL sys.rollback_to_timestamp('tableId', timestamp)
* </code></pre>
*/
Expand All @@ -41,14 +41,15 @@ public class RollbackToTimestampProcedure extends ProcedureBase {

public String[] call(ProcedureContext procedureContext, String tableId, long timestamp)
throws Catalog.TableNotExistException {
Preconditions.checkNotNull(tableId, "table can not be empty");
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
fileStoreTable.rollbackTo(snapshot.id());

return new String[] {"Success"};
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* Rollback to timestamp procedure. Usage:
*
* <pre><code>
* -- rollback to a timestamp
* -- rollback to the snapshot which earlier or equal than timestamp.
* CALL sys.rollback_to_timestamp(`table` => 'tableId', timestamp => timestamp)
* </code></pre>
*/
Expand All @@ -54,8 +54,9 @@ public String[] call(ProcedureContext procedureContext, String tableId, Long tim
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
fileStoreTable.rollbackTo(snapshot.id());
return new String[] {"Success"};
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
return new String[] {String.format("Success roll back to snapshot: %s .", snapshotId)};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;
Expand All @@ -46,7 +46,7 @@ public class RollbackToTimestampProcedure extends BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
new StructField("result", StringType, true, Metadata.empty())
});

private RollbackToTimestampProcedure(TableCatalog tableCatalog) {
Expand Down Expand Up @@ -77,8 +77,14 @@ public InternalRow[] call(InternalRow args) {
Preconditions.checkNotNull(
snapshot,
String.format("count not find snapshot earlier than %s", timestamp));
fileStoreTable.rollbackTo(snapshot.id());
InternalRow outputRow = newInternalRow(true);
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
InternalRow outputRow =
newInternalRow(
UTF8String.fromString(
String.format(
"Success roll back to snapshot: %s .",
snapshotId)));
return new InternalRow[] {outputRow};
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
checkAnswer(
spark.sql(
s"CALL paimon.sys.rollback_to_timestamp(table => 'test.T', timestamp => $timestamp)"),
Row(true) :: Nil)
Row("Success roll back to snapshot: 2 .") :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

} finally {
Expand Down

0 comments on commit 28cbc71

Please sign in to comment.