Skip to content

Commit

Permalink
[core] Introduce RollbackToTimestamp Procedure and Action
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyu committed Oct 30, 2024
1 parent 8e4de02 commit c956b35
Show file tree
Hide file tree
Showing 11 changed files with 505 additions and 0 deletions.
22 changes: 22 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,28 @@ All available procedures are listed below.
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 10)
</td>
</tr>
<tr>
<td>rollback_to_timestamp</td>
<td>
-- for Flink 1.18<br/>
-- rollback to the snapshot which earlier or equal than timestamp.<br/>
CALL sys.rollback_to('identifier', timestamp)<br/><br/>
-- for Flink 1.19 and later<br/>
-- rollback to the snapshot which earlier or equal than timestamp.<br/>
CALL sys.rollback_to(`table` => 'default.T', `timestamp` => timestamp)<br/><br/>
</td>
<td>
To rollback to the snapshot which earlier or equal than timestamp. Argument:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>timestamp (Long): Roll back to the snapshot which earlier or equal than timestamp.</li>
</td>
<td>
-- for Flink 1.18<br/>
CALL sys.rollback_to_timestamp('default.T', 10)
-- for Flink 1.19 and later<br/>
CALL sys.rollback_to(`table` => 'default.T', snapshot_id => 1730292023000)
</td>
</tr>
<tr>
<td>expire_snapshots</td>
<td>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ This section introduce all available spark procedures about paimon.
CALL sys.rollback(table => 'default.T', version => 10)
</td>
</tr>
<tr>
<td>rollback_to_timestamp</td>
<td>
To rollback to the snapshot which earlier or equal than timestamp. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
<li>timestamp: roll back to the snapshot which earlier or equal than timestamp.</li>
</td>
<td>
CALL sys.rollback_to_timestamp(table => 'default.T', timestamp => 1730292023000)<br/><br/>
</td>
</tr>
<tr>
<td>migrate_database</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Rollback procedure. Usage:
*
* <pre><code>
* -- rollback to a snapshot
* CALL sys.rollback_to_timestamp('tableId', timestamp)
* </code></pre>
*/
public class RollbackToTimestampProcedure extends ProcedureBase {

public static final String IDENTIFIER = "rollback_to_timestamp";

public String[] call(ProcedureContext procedureContext, String tableId, long timestamp)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
fileStoreTable.rollbackTo(snapshot.id());

return new String[] {"Success"};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.Snapshot;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/** Rollback to specific timestamp action for Flink. */
public class RollbackToTimestampAction extends TableActionBase {

private static final Logger LOG = LoggerFactory.getLogger(RollbackToTimestampAction.class);

private final Long timestamp;

public RollbackToTimestampAction(
String warehouse,
String databaseName,
String tableName,
Long timestamp,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
this.timestamp = timestamp;
}

@Override
public void run() throws Exception {
LOG.debug("Run rollback-to-timestamp action with timestamp '{}'.", timestamp);

if (!(table instanceof DataTable)) {
throw new IllegalArgumentException("Unknown table: " + identifier);
}

FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
fileStoreTable.rollbackTo(snapshot.id());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.Map;
import java.util.Optional;

/** Factory to create {@link RollbackToTimestampAction}. */
public class RollbackToTimestampActionFactory implements ActionFactory {

public static final String IDENTIFIER = "rollback_to_timestamp";

private static final String TIMESTAMP = "timestamp";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);

checkRequiredArgument(params, TIMESTAMP);
String timestamp = params.get(TIMESTAMP);

Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

RollbackToTimestampAction action =
new RollbackToTimestampAction(
tablePath.f0,
tablePath.f1,
tablePath.f2,
Long.parseLong(timestamp),
catalogConfig);

return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println(
"Action \"rollback_to_timestamp\" roll back a table to a specific timestamp.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" rollback_to --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --timestamp <timestamp_string>");
System.out.println(" <timestamp_string> can be a long value representing a timestamp.");
System.out.println();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

/**
* Rollback to timestamp procedure. Usage:
*
* <pre><code>
* -- rollback to a timestamp
* CALL sys.rollback_to_timestamp(`table` => 'tableId', timestamp => timestamp)
* </code></pre>
*/
public class RollbackToTimestampProcedure extends ProcedureBase {

public static final String IDENTIFIER = "rollback_to_timestamp";

@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "timestamp", type = @DataTypeHint("BIGINT"))
})
public String[] call(ProcedureContext procedureContext, String tableId, Long timestamp)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than %s", timestamp));
fileStoreTable.rollbackTo(snapshot.id());
return new String[] {"Success"};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ org.apache.paimon.flink.action.DropPartitionActionFactory
org.apache.paimon.flink.action.DeleteActionFactory
org.apache.paimon.flink.action.MergeIntoActionFactory
org.apache.paimon.flink.action.RollbackToActionFactory
org.apache.paimon.flink.action.RollbackToTimestampActionFactory
org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.CreateTagFromTimestampActionFactory
org.apache.paimon.flink.action.CreateTagFromWatermarkActionFactory
Expand Down Expand Up @@ -57,6 +58,7 @@ org.apache.paimon.flink.procedure.DropPartitionProcedure
org.apache.paimon.flink.procedure.MergeIntoProcedure
org.apache.paimon.flink.procedure.ResetConsumerProcedure
org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,59 @@ public void rollbackToTagTest(String invoker) throws Exception {
"SELECT * FROM `" + tableName + "`",
Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Apache")));
}

@ParameterizedTest
@ValueSource(strings = {"action", "procedure_named", "procedure_indexed"})
public void rollbackToTimestampTest(String invoker) throws Exception {
FileStoreTable table =
createFileStoreTable(
ROW_TYPE,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyList(),
Collections.emptyMap());
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Apache")));
long timestamp = System.currentTimeMillis();
writeData(rowData(2L, BinaryString.fromString("Paimon")));

switch (invoker) {
case "action":
createAction(
RollbackToTimestampAction.class,
"rollback_to_timestamp",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--timestamp",
timestamp + "")
.run();
break;
case "procedure_indexed":
executeSQL(
String.format(
"CALL sys.rollback_to_timestamp('%s.%s', %s)",
database, tableName, timestamp));
break;
case "procedure_named":
executeSQL(
String.format(
"CALL sys.rollback_to_timestamp(`table` => '%s.%s', `timestamp` => %s)",
database, tableName, timestamp));
break;
default:
throw new UnsupportedOperationException(invoker);
}

testBatchRead(
"SELECT * FROM `" + tableName + "`",
Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Apache")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.spark.procedure.RepairProcedure;
import org.apache.paimon.spark.procedure.ResetConsumerProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
import org.apache.paimon.spark.procedure.RollbackToTimestampProcedure;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

Expand All @@ -63,6 +64,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
ImmutableMap.Builder<String, Supplier<ProcedureBuilder>> procedureBuilders =
ImmutableMap.builder();
procedureBuilders.put("rollback", RollbackProcedure::builder);
procedureBuilders.put("rollback_to_timestamp", RollbackToTimestampProcedure::builder);
procedureBuilders.put("create_tag", CreateTagProcedure::builder);
procedureBuilders.put("rename_tag", RenameTagProcedure::builder);
procedureBuilders.put(
Expand Down
Loading

0 comments on commit c956b35

Please sign in to comment.