Skip to content

Commit

Permalink
Clean up code and add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Jun 3, 2024
1 parent b51eb15 commit 24137af
Show file tree
Hide file tree
Showing 19 changed files with 737 additions and 692 deletions.
52 changes: 9 additions & 43 deletions docs/content/migration/clone-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,12 @@ under the License.

# Clone Tables

Paimon supports clone tables of the latest Snapshot for data migration.
Paimon supports cloning tables for data migration.
Currently, only table files used by the latest snapshot will be cloned.

{{< hint info >}}
1、Clone Tables only support batch mode yet. Please use -D execution.runtime-mode=batch or -yD execution.runtime-mode=batch (for the ON-YARN scenario) to run clone job.

2、If you want clone job runs quickly, you can add parameter parallelism.

3、Only support Flink now.
{{< /hint >}}

## Clone Table
The target table needs to be a non-existent table, and it will have the exact same schema (only the schema for current snapshot) as the source table.

To run a Flink batch job for clone, follow these instructions.

### LatestSnapshot
Clone the latest snapshot of the source table, copying all the files required for the snapshot to the new target table.

{{< tabs "clone-tables" >}}

{{< tab "Flink" >}}

Flink SQL currently does not support statements related to clone, so we have to submit the clone job through `flink run`.

Run the following command to submit a clone job for the table's latest Snapshot.
To clone a table, run the following command to submit a clone job.
If the table you clone is not modified at the same time, it is recommended to submit a Flink batch job for better performance.
However, if you want to clone the table while writing it at the same time, submit a Flink streaming job for automatic failure recovery.

```bash
<FLINK_HOME>/bin/flink run \
Expand All @@ -64,15 +45,15 @@ Run the following command to submit a clone job for the table's latest Snapshot.
[--target_database <target-database>] \
[--target_table <target-table-name>] \
[--target_catalog_conf <target-paimon-catalog-conf> [--target_catalog_conf <target-paimon-catalog-conf> ...]]
[--parallelism 128 ]
[--parallelism <parallelism>]
```
{{< hint info >}}
1If the database parameter is not passed, then all tables of all databases will be cloned.
2If the table parameter is not passed, then all tables of the database will be cloned.
1. If `database` is not specified, all tables in all databases of the specified warehouse will be cloned.
2. If `table` is not specified, all tables of the specified database will be cloned.
{{< /hint >}}
Example: clone table latest Snapshot.
Example: Clone `test_db.test_table` from source warehouse to target warehouse.
```bash
<FLINK_HOME>/bin/flink run \
Expand All @@ -99,18 +80,3 @@ For more usage of the clone action, see
/path/to/paimon-flink-action-{{< version >}}.jar \
clone --help
```
{{< /tab >}}
{{< tab "Flink Procedure" >}}
Run the following command to submit a clone job for the table's latest Snapshot.
```bash
CALL sys.clone('source_warehouse', 'source_database', 'source_table', '', 'target_warehouse', 'target_database', 'target_table', '', '')
```
{{< /tab >}}
{{< /tabs >}}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,14 @@ public CloneAction(

@Override
public void build() {
buildCloneFlinkJob(env);
try {
buildCloneFlinkJob(env);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void buildCloneFlinkJob(StreamExecutionEnvironment env) {
private void buildCloneFlinkJob(StreamExecutionEnvironment env) throws Exception {
DataStream<Tuple2<String, String>> cloneSource =
new CloneSourceBuilder(
env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ public void printHelp() {

System.out.println("Syntax:");
System.out.println(
" clone --warehouse <warehouse_path>"
+ "[--database <database_name>]"
+ "[--table <table_name>]"
+ "[--catalog_conf <source-paimon-catalog-conf> [--catalog_conf <source-paimon-catalog-conf> ...]]"
+ "--target_warehouse <target_warehouse_path>"
+ "[--target_database <target_database_name>]"
+ "[--target_table <target_table_name>]"
+ "[--target_catalog_conf <target-paimon-catalog-conf> [--target_catalog_conf <target-paimon-catalog-conf> ...]]");
" clone --warehouse <warehouse_path> "
+ "[--database <database_name>] "
+ "[--table <table_name>] "
+ "[--catalog_conf <source-paimon-catalog-conf> [--catalog_conf <source-paimon-catalog-conf> ...]] "
+ "--target_warehouse <target_warehouse_path> "
+ "[--target_database <target_database_name>] "
+ "[--target_table <target_table_name>] "
+ "[--target_catalog_conf <target-paimon-catalog-conf> [--target_catalog_conf <target-paimon-catalog-conf> ...]] "
+ "[--parallelism <parallelism>]");

System.out.println();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@

package org.apache.paimon.flink.clone;

import org.apache.paimon.fs.Path;

/** The information of copy file. */
public class CloneFileInfo {

private final Path filePathExcludeTableRoot;
private final String filePathExcludeTableRoot;
private final String sourceIdentifier;
private final String targetIdentifier;

public CloneFileInfo(
Path filePathExcludeTableRoot, String sourceIdentifier, String targetIdentifier) {
String filePathExcludeTableRoot, String sourceIdentifier, String targetIdentifier) {
this.filePathExcludeTableRoot = filePathExcludeTableRoot;
this.sourceIdentifier = sourceIdentifier;
this.targetIdentifier = targetIdentifier;
}

public Path getFilePathExcludeTableRoot() {
public String getFilePathExcludeTableRoot() {
return filePathExcludeTableRoot;
}

Expand All @@ -45,4 +43,11 @@ public String getSourceIdentifier() {
public String getTargetIdentifier() {
return targetIdentifier;
}

@Override
public String toString() {
return String.format(
"{ filePath: %s, sourceIdentifier: %s, targetIdentifier: %s }",
filePathExcludeTableRoot, sourceIdentifier, targetIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ public CloneSourceBuilder(
this.targetTableName = targetTableName;
}

public DataStream<Tuple2<String, String>> build() {
Catalog sourceCatalog =
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
public DataStream<Tuple2<String, String>> build() throws Exception {
try (Catalog sourceCatalog =
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig))) {
return build(sourceCatalog);
}
}

private DataStream<Tuple2<String, String>> build(Catalog sourceCatalog) throws Exception {
List<Tuple2<String, String>> result = new ArrayList<>();

if (database == null) {
Expand All @@ -83,13 +87,9 @@ public DataStream<Tuple2<String, String>> build() {
StringUtils.isBlank(targetTableName),
"targetTableName must be blank when clone all tables in a catalog.");
for (String db : sourceCatalog.listDatabases()) {
try {
for (String table : sourceCatalog.listTables(db)) {
String s = db + "." + table;
result.add(new Tuple2<>(s, s));
}
} catch (Exception e) {
throw new RuntimeException("ListTable error.", e);
for (String table : sourceCatalog.listTables(db)) {
String s = db + "." + table;
result.add(new Tuple2<>(s, s));
}
}
} else if (tableName == null) {
Expand All @@ -99,13 +99,8 @@ public DataStream<Tuple2<String, String>> build() {
checkArgument(
StringUtils.isBlank(targetTableName),
"targetTableName must be blank when clone all tables in a catalog.");
try {
for (String table : sourceCatalog.listTables(database)) {
result.add(new Tuple2<>(database + "." + table, targetDatabase + "." + table));
}
} catch (Exception e) {
throw new RuntimeException(
"Unexpected error when trying to list tables in database " + database, e);
for (String table : sourceCatalog.listTables(database)) {
result.add(new Tuple2<>(database + "." + table, targetDatabase + "." + table));
}
} else {
checkArgument(
Expand All @@ -119,8 +114,9 @@ public DataStream<Tuple2<String, String>> build() {
database + "." + tableName, targetDatabase + "." + targetTableName));
}

LOG.info("The clone identifier of source table and target table is : " + result);

if (LOG.isDebugEnabled()) {
LOG.debug("The clone identifiers of source table and target table are: {}", result);
}
return env.fromCollection(result).forceNonParallel().forward();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>

public CopyFileOperator(
Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
super();
this.sourceCatalogConfig = sourceCatalogConfig;
this.targetCatalogConfig = targetCatalogConfig;
}

@Override
public void open() throws Exception {
super.open();
sourceCatalog =
(AbstractCatalog)
FlinkCatalogFactory.createPaimonCatalog(
Expand All @@ -79,24 +77,26 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce
targetCatalog.getDataTableLocation(
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));

Path filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath.toString() + filePathExcludeTableRoot);
Path targetPath = new Path(targetTableRootPath.toString() + filePathExcludeTableRoot);

/**
* We still send record to SnapshotHintOperator to avoid the following corner case: When
* clone two tables under a catalog, after clone Table A is completed, the job fails due to
* FileNotFound(snapshot expiration) when clone Table B. And then after restarts the clone
* job, the relevant file information of Table A will not be sent to SnapshotHintOperator,
* resulting in the job successfully ending but the snapshot hint file of Table A is missing
* created.
*/
String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot);
Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot);

if (targetTableFileIO.exists(targetPath)
&& targetTableFileIO.getFileSize(targetPath)
== sourceTableFileIO.getFileSize(sourcePath)) {
LOG.info(
"Skipping clone target file {} because it already exists and has the same size.",
targetPath);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Skipping clone target file {} because it already exists and has the same size.",
targetPath);
}

// We still send record to SnapshotHintOperator to avoid the following corner case:
//
// When cloning two tables under a catalog, after clone table A is completed,
// the job fails due to snapshot expiration when cloning table B.
// If we don't re-send file information of table A to SnapshotHintOperator,
// the snapshot hint file of A will not be created after the restart.
output.collect(streamRecord);
return;
}
Expand All @@ -113,4 +113,14 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce

output.collect(streamRecord);
}

@Override
public void close() throws Exception {
if (sourceCatalog != null) {
sourceCatalog.close();
}
if (targetCatalog != null) {
targetCatalog.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand All @@ -37,8 +38,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.utils.Preconditions.checkState;

/**
* Pick the files to be cloned of a table based on the input record. The record type it produce is
* CloneFileInfo that indicate the information of copy file.
Expand All @@ -62,7 +61,6 @@ public PickFilesForCloneOperator(

@Override
public void open() throws Exception {
super.open();
sourceCatalog =
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
targetCatalog =
Expand All @@ -71,15 +69,6 @@ public void open() throws Exception {

@Override
public void processElement(StreamRecord<Tuple2<String, String>> streamRecord) throws Exception {
try {
processElementImpl(streamRecord);
} catch (Exception e) {
throw new RuntimeException("Failed to pick files", e);
}
}

private void processElementImpl(StreamRecord<Tuple2<String, String>> streamRecord)
throws Exception {
String sourceIdentifierStr = streamRecord.getValue().f0;
Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr);
String targetIdentifierStr = streamRecord.getValue().f1;
Expand All @@ -97,7 +86,9 @@ private void processElementImpl(StreamRecord<Tuple2<String, String>> streamRecor
sourceIdentifierStr,
targetIdentifierStr);

LOG.info("The CloneFileInfo of table {} is {} : ", sourceTable.location(), result);
if (LOG.isDebugEnabled()) {
LOG.debug("The CloneFileInfo of table {} is {} : ", sourceTable.location(), result);
}

for (CloneFileInfo info : result) {
output.collect(new StreamRecord<>(info));
Expand All @@ -112,7 +103,8 @@ private List<CloneFileInfo> toCloneFileInfos(
List<CloneFileInfo> result = new ArrayList<>();
for (Path file : files) {
Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
result.add(new CloneFileInfo(relativePath, sourceIdentifier, targetIdentifier));
result.add(
new CloneFileInfo(relativePath.toString(), sourceIdentifier, targetIdentifier));
}
return result;
}
Expand All @@ -121,13 +113,24 @@ private Path getPathExcludeTableRoot(Path absolutePath, Path sourceTableRoot) {
String fileAbsolutePath = absolutePath.toUri().toString();
String sourceTableRootPath = sourceTableRoot.toString();

checkState(
Preconditions.checkState(
fileAbsolutePath.startsWith(sourceTableRootPath),
"This is a bug, please report. fileAbsolutePath is : "
"File absolute path does not start with source table root path. This is unexpected. "
+ "fileAbsolutePath is: "
+ fileAbsolutePath
+ ", sourceTableRootPath is : "
+ ", sourceTableRootPath is: "
+ sourceTableRootPath);

return new Path(fileAbsolutePath.substring(sourceTableRootPath.length()));
}

@Override
public void close() throws Exception {
if (sourceCatalog != null) {
sourceCatalog.close();
}
if (targetCatalog != null) {
targetCatalog.close();
}
}
}
Loading

0 comments on commit 24137af

Please sign in to comment.