Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support RollbackAction and Procedure with timestamp #4333

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,15 @@ All available procedures are listed below.
CALL sys.rollback_to(`table` => 'identifier', snapshot_id => snapshotId)<br/><br/>
-- rollback to a tag<br/>
CALL sys.rollback_to(`table` => 'identifier', tag => 'tagName')
-- rollback to a timestamp<br/>
CALL sys.rollback_to(`table` => 'identifier', timestamp => 'timestamp')
</td>
<td>
To rollback to a specific version of target table. Argument:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>snapshotId (Long): id of the snapshot that will roll back to.</li>
<li>tagName: name of the tag that will roll back to.</li>
<li>timestamp: earlier or equal to the timestamp that will roll back to.</li>
</td>
<td>
-- for Flink 1.18<br/>
Expand Down
2 changes: 2 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ This section introduce all available spark procedures about paimon.
To rollback to a specific version of target table. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>version: id of the snapshot or name of tag that will roll back to.</li>
<li>is_timestamp: whether version is a timestamp.</li>
</td>
<td>
CALL sys.rollback(table => 'default.T', version => 'my_tag')<br/><br/>
CALL sys.rollback(table => 'default.T', version => 10)
CALL sys.rollback(table => 'default.T', version => 1729410996001, is_timestamp = true)
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.Snapshot;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,14 +35,18 @@ public class RollbackToAction extends TableActionBase {

private final String version;

private final Boolean isTimestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use long timestamp uniformly in flink action && flink procedure && spark procedure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly I also want to change it like you suggested, but consider older version spark user their procedure sql would not be compatible, so keep it can use in older version first. @wwj6591812


public RollbackToAction(
String warehouse,
String databaseName,
String tableName,
String version,
Boolean isTimestamp,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
this.version = version;
this.isTimestamp = isTimestamp;
}

@Override
Expand All @@ -50,8 +57,20 @@ public void run() throws Exception {
throw new IllegalArgumentException("Unknown table: " + identifier);
}

FileStoreTable fileStoreTable = (FileStoreTable) table;
if (version.chars().allMatch(Character::isDigit)) {
table.rollbackTo(Long.parseLong(version));
if (isTimestamp != null && isTimestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here may not be boolean, because this argument may be null.

Snapshot snapshot =
fileStoreTable
.snapshotManager()
.earlierOrEqualTimeMills(Long.parseLong(version));
Preconditions.checkNotNull(
snapshot,
String.format("count not find snapshot earlier than %s", version));
fileStoreTable.rollbackTo(snapshot.id());
} else {
table.rollbackTo(Long.parseLong(version));
}
} else {
table.rollbackTo(version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class RollbackToActionFactory implements ActionFactory {

private static final String VERSION = "version";

private static final String IS_TIMESTAMP = "is_timestamp";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also modify doc.


@Override
public String identifier() {
return IDENTIFIER;
Expand All @@ -41,12 +43,18 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {

checkRequiredArgument(params, VERSION);
String version = params.get(VERSION);
Boolean isTimestamp = Boolean.parseBoolean(params.get(IS_TIMESTAMP));

Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

RollbackToAction action =
new RollbackToAction(
tablePath.f0, tablePath.f1, tablePath.f2, version, catalogConfig);
tablePath.f0,
tablePath.f1,
tablePath.f2,
version,
isTimestamp,
catalogConfig);

return Optional.of(action);
}
Expand All @@ -60,9 +68,9 @@ public void printHelp() {
System.out.println("Syntax:");
System.out.println(
" rollback_to --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --version <version_string>");
+ "--table <table_name> --version <version_string> --is_timestamp <is_timestamp>");
System.out.println(
" <version_string> can be a long value representing a snapshot ID or a tag name.");
" <version_string> can be a long value representing a snapshot ID or a tag name or a timestamp.");
System.out.println();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.annotation.ArgumentHint;
Expand Down Expand Up @@ -50,16 +53,27 @@ public class RollbackToProcedure extends ProcedureBase {
@ArgumentHint(
name = "snapshot_id",
type = @DataTypeHint("BIGINT"),
isOptional = true)
isOptional = true),
@ArgumentHint(name = "timestamp", type = @DataTypeHint("BIGINT"), isOptional = true)
})
public String[] call(
ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId)
ProcedureContext procedureContext,
String tableId,
String tagName,
Long snapshotId,
Long timestamp)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
if (!StringUtils.isNullOrWhitespaceOnly(tagName)) {
table.rollbackTo(tagName);
} else {
} else if (snapshotId != null) {
table.rollbackTo(snapshotId);
} else {
FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
fileStoreTable.rollbackTo(snapshot.id());
}
return new String[] {"Success"};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,64 @@ public void setUp() {
init(warehouse);
}

@ParameterizedTest
@ValueSource(strings = {"action", "procedure_named", "procedure_indexed"})
public void rollbackToTimestampTest(String invoker) throws Exception {
FileStoreTable table =
createFileStoreTable(
ROW_TYPE,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyList(),
Collections.emptyMap());
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(2L, BinaryString.fromString("World")));
long ts = System.currentTimeMillis();
writeData(rowData(2L, BinaryString.fromString("Flink")));

switch (invoker) {
case "action":
createAction(
RollbackToAction.class,
"rollback_to",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--version",
ts + "",
"--is_timestamp",
"true")
.run();
break;
case "procedure_indexed":
executeSQL(
String.format(
"CALL sys.rollback_to('%s.%s', '', null, cast(%s as bigint))",
database, tableName, ts));
break;
case "procedure_named":
executeSQL(
String.format(
"CALL sys.rollback_to(`table` => '%s.%s', `timestamp` => %s)",
database, tableName, ts));
break;
default:
throw new UnsupportedOperationException(invoker);
}

testBatchRead(
"SELECT * FROM `" + tableName + "`",
Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "World")));
}

@ParameterizedTest
@ValueSource(strings = {"action", "procedure_named", "procedure_indexed"})
public void rollbackToSnapshotTest(String invoker) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.table.FileStoreTable;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -26,16 +28,18 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/** A procedure to rollback to a snapshot or a tag. */
/** A procedure to rollback to a snapshot or a tag or a timestamp. */
public class RollbackProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
// snapshot id or tag name
ProcedureParameter.required("version", StringType)
// snapshot id or tag name or timestamp
ProcedureParameter.required("version", StringType),
ProcedureParameter.optional("is_timestamp", BooleanType)
};

private static final StructType OUTPUT_TYPE =
Expand All @@ -62,14 +66,24 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String version = args.getString(1);
Boolean isTimestamp = args.isNullAt(2) ? null : args.getBoolean(2);

return modifyPaimonTable(
tableIdent,
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
if (version.chars().allMatch(Character::isDigit)) {
table.rollbackTo(Long.parseLong(version));
if (isTimestamp != null && isTimestamp) {
fileStoreTable.rollbackTo(
fileStoreTable
.snapshotManager()
.earlierOrEqualTimeMills(Long.parseLong(version))
.id());
} else {
fileStoreTable.rollbackTo(Long.parseLong(version));
}
} else {
table.rollbackTo(version);
fileStoreTable.rollbackTo(version);
}
InternalRow outputRow = newInternalRow(true);
return new InternalRow[] {outputRow};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {

import testImplicits._

test("Paimon Procedure: rollback to snapshot and tag") {
test("Paimon Procedure: rollback to snapshot and tag and timestamp") {
failAfter(streamingTimeout) {
withTempDir {
checkpointDir =>
Expand Down Expand Up @@ -69,16 +69,31 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest {
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

val ts = System.currentTimeMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline this


// snapshot-3
inputData.addData((2, "b2"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)

// snapshot-4
inputData.addData((2, "b3"))
stream.processAllAvailable()
checkAnswer(query(), Row(1, "a") :: Row(2, "b3") :: Nil)

assertThrows[RuntimeException] {
spark.sql("CALL paimon.sys.rollback(table => 'test.T_exception', version => '2')")
}
// rollback to snapshot
checkAnswer(
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '2')"),
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"),
Row(true) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)

// rollback with timestamp
checkAnswer(
spark.sql(
s"CALL paimon.sys.rollback(table => 'test.T', version => '$ts', is_timestamp => true)"),
Row(true) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)

Expand Down
Loading