From 034753f92a0adb4364245386969e6667793d47ed Mon Sep 17 00:00:00 2001 From: wangwj Date: Tue, 30 Apr 2024 09:08:36 +0800 Subject: [PATCH] support clone latest snapshot --- docs/content/migration/clone-tables.md | 116 ++++++++++ .../paimon/operation/OrphanFilesClean.java | 3 +- .../java/org/apache/paimon/schema/Schema.java | 9 + .../paimon/flink/action/CloneAction.java | 144 +++++++++++++ .../flink/action/CloneActionFactory.java | 88 ++++++++ .../paimon/flink/clone/CloneFileInfo.java | 48 +++++ .../flink/clone/CloneSourceBuilder.java | 99 +++++++++ .../paimon/flink/clone/CopyFileOperator.java | 99 +++++++++ .../clone/PickFilesForCloneOperator.java | 127 +++++++++++ .../paimon/flink/clone/PickFilesUtil.java | 200 ++++++++++++++++++ .../clone/SnapshotHintChannelComputer.java | 43 ++++ .../flink/clone/SnapshotHintOperator.java | 98 +++++++++ .../flink/procedure/CloneProcedure.java | 107 ++++++++++ .../org.apache.paimon.factories.Factory | 2 + .../paimon/flink/action/ActionITCaseBase.java | 16 -- .../flink/action/CloneActionITCase.java | 147 +++++++++++++ .../flink/action/CompactActionITCaseBase.java | 32 --- .../flink/procedure/CloneProcedureITCase.java | 105 +++++++++ .../paimon/flink/util/AbstractTestBase.java | 50 +++++ 19 files changed, 1483 insertions(+), 50 deletions(-) create mode 100644 docs/content/migration/clone-tables.md create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CloneProcedureITCase.java diff --git a/docs/content/migration/clone-tables.md b/docs/content/migration/clone-tables.md new file mode 100644 index 0000000000000..286a111a8af24 --- /dev/null +++ b/docs/content/migration/clone-tables.md @@ -0,0 +1,116 @@ +--- +title: "Clone Tables" +weight: 3 +type: docs +aliases: +- /migration/clone-tables.html +--- + + +# Clone Tables + +Paimon supports clone tables of the latest Snapshot for data migration. + +{{< hint info >}} +1、Clone Tables only support batch mode yet. Please use -D execution.runtime-mode=batch or -yD execution.runtime-mode=batch (for the ON-YARN scenario) to run clone job. + +2、If you want clone job runs quickly, you can add parameter parallelism. + +3、Only support Flink now. +{{< /hint >}} + +## Clone Table +The target table needs to be a non-existent table, and it will have the exact same schema (only the schema for current snapshot) as the source table. + +To run a Flink batch job for clone, follow these instructions. + +### LatestSnapshot +Clone the latest snapshot of the source table, copying all the files required for the snapshot to the new target table. + +{{< tabs "clone-tables" >}} + +{{< tab "Flink" >}} + +Flink SQL currently does not support statements related to clone, so we have to submit the clone job through `flink run`. + +Run the following command to submit a clone job for the table's latest Snapshot. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + clone \ + --warehouse \ + [--database ] \ + [--table ] \ + [--catalog_conf [--catalog_conf ...]] \ + --target_warehouse \ + --target_database \ + --target_table \ + [--target_catalog_conf [--target_catalog_conf ...]] + [--parallelism 128 ] +``` + +{{< hint info >}} +1、If the database parameter is not passed, then all tables of all databases will be cloned. +2、If the table parameter is not passed, then all tables of the database will be cloned. +{{< /hint >}} + +Example: clone table latest Snapshot. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + clone \ + --warehouse s3:///path/to/warehouse_source \ + --database test_db \ + --table test_table \ + --catalog_conf s3.endpoint=https://****.com \ + --catalog_conf s3.access-key=***** \ + --catalog_conf s3.secret-key=***** \ + --target_warehouse s3:///path/to/warehouse_target \ + --target_database test_db \ + --target_table test_table \ + --target_catalog_conf s3.endpoint=https://****.com \ + --target_catalog_conf s3.access-key=***** \ + --target_catalog_conf s3.secret-key=***** +``` + +For more usage of the clone action, see + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + clone --help +``` + +{{< /tab >}} + +{{< tab "Flink Procedure" >}} + +Run the following command to submit a clone job for the table's latest Snapshot. + +```bash +CALL sys.clone('source_warehouse', 'source_database', 'source_table', '', 'target_warehouse', 'target_database', 'target_table', '', '') +``` + +{{< /tab >}} + +{{< /tabs >}} + diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index e78174db7ac9d..f407c73bf46bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -499,8 +499,7 @@ private void deleteFileOrDirQuietly(Path path) { /** A helper functional interface for method {@link #retryReadingFiles}. */ @FunctionalInterface - private interface ReaderWithIOException { - + interface ReaderWithIOException { T read() throws IOException; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index 824dff5c4c97e..b7575837471c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -329,4 +329,13 @@ public Schema build() { return new Schema(columns, partitionKeys, primaryKeys, options, comment); } } + + public static Schema fromTableSchema(TableSchema tableSchema) { + return new Schema( + tableSchema.fields(), + tableSchema.partitionKeys(), + tableSchema.primaryKeys(), + tableSchema.options(), + tableSchema.comment()); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java new file mode 100644 index 0000000000000..7cc78c29f45a9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.clone.CloneFileInfo; +import org.apache.paimon.flink.clone.CloneSourceBuilder; +import org.apache.paimon.flink.clone.CopyFileOperator; +import org.apache.paimon.flink.clone.PickFilesForCloneOperator; +import org.apache.paimon.flink.clone.SnapshotHintChannelComputer; +import org.apache.paimon.flink.clone.SnapshotHintOperator; +import org.apache.paimon.flink.sink.FlinkStreamPartitioner; +import org.apache.paimon.options.CatalogOptions; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkNotNull; +import static org.apache.paimon.utils.StringUtils.isBlank; + +/** Snapshot/Tag/Table clone action for Flink. */ +public class CloneAction extends ActionBase { + + private final int parallelism; + + private Map sourceCatalogConfig = new HashMap<>(); + private final String database; + private final String tableName; + + private Map targetCatalogConfig = new HashMap<>(); + private final String targetDatabase; + private final String targetTableName; + + public CloneAction( + String warehouse, + String database, + String tableName, + Map sourceCatalogConfig, + String targetWarehouse, + String targetDatabase, + String targetTableName, + Map targetCatalogConfig, + String parallelismStr) { + super(warehouse, sourceCatalogConfig); + + checkNotNull(warehouse, "warehouse must not be null."); + checkNotNull(targetWarehouse, "targetWarehouse must not be null."); + + this.parallelism = + isBlank(parallelismStr) ? env.getParallelism() : Integer.parseInt(parallelismStr); + + if (!sourceCatalogConfig.isEmpty()) { + this.sourceCatalogConfig = sourceCatalogConfig; + } + this.sourceCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), warehouse); + this.database = database; + this.tableName = tableName; + + if (!targetCatalogConfig.isEmpty()) { + this.targetCatalogConfig = targetCatalogConfig; + } + this.targetCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), targetWarehouse); + this.targetDatabase = targetDatabase; + this.targetTableName = targetTableName; + } + + // ------------------------------------------------------------------------ + // Java API + // ------------------------------------------------------------------------ + + @Override + public void build() { + buildCloneFlinkJob(env); + } + + private void buildCloneFlinkJob(StreamExecutionEnvironment env) { + DataStream> cloneSource = + new CloneSourceBuilder( + env, + sourceCatalogConfig, + database, + tableName, + targetDatabase, + targetTableName) + .build(); + + SingleOutputStreamOperator pickFilesForClone = + cloneSource + .transform( + "Pick Files", + TypeInformation.of(CloneFileInfo.class), + new PickFilesForCloneOperator( + sourceCatalogConfig, targetCatalogConfig)) + .forceNonParallel(); + + SingleOutputStreamOperator copyFiles = + pickFilesForClone + .rebalance() + .transform( + "Copy Files", + TypeInformation.of(CloneFileInfo.class), + new CopyFileOperator(sourceCatalogConfig, targetCatalogConfig)) + .setParallelism(parallelism); + + SingleOutputStreamOperator snapshotHintOperator = + FlinkStreamPartitioner.partition( + copyFiles, new SnapshotHintChannelComputer(), parallelism) + .transform( + "Recreate Snapshot Hint", + TypeInformation.of(CloneFileInfo.class), + new SnapshotHintOperator(targetCatalogConfig)) + .setParallelism(parallelism); + + snapshotHintOperator.addSink(new DiscardingSink<>()).name("end").setParallelism(1); + } + + @Override + public void run() throws Exception { + build(); + execute("Clone job"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java new file mode 100644 index 0000000000000..49d02c3de5094 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Optional; + +/** Factory to create {@link CloneAction}. */ +public class CloneActionFactory implements ActionFactory { + + private static final String IDENTIFIER = "clone"; + private static final String PARALLELISM = "parallelism"; + private static final String TARGET_WAREHOUSE = "target_warehouse"; + private static final String TARGET_DATABASE = "target_database"; + private static final String TARGET_TABLE = "target_table"; + private static final String TARGET_CATALOG_CONF = "target_catalog_conf"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + CloneAction cloneAction = + new CloneAction( + params.get(WAREHOUSE), + params.get(DATABASE), + params.get(TABLE), + optionalConfigMap(params, CATALOG_CONF), + params.get(TARGET_WAREHOUSE), + params.get(TARGET_DATABASE), + params.get(TARGET_TABLE), + optionalConfigMap(params, TARGET_CATALOG_CONF), + params.get(PARALLELISM)); + + return Optional.of(cloneAction); + } + + @Override + public void printHelp() { + System.out.println("Action \"clone\" runs a batch job for clone the latest Snapshot."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " clone --warehouse " + + "[--database ]" + + "[--table ]" + + "[--catalog_conf [--catalog_conf ...]]" + + "--target_warehouse " + + "--target_database " + + "--target_table " + + "[--target_catalog_conf [--target_catalog_conf ...]]"); + + System.out.println(); + + System.out.println("Examples:"); + System.out.println( + " clone --warehouse s3:///path1/from/warehouse " + + "--database test_db " + + "--table test_table " + + "--catalog_conf s3.endpoint=https://****.com " + + "--catalog_conf s3.access-key=***** " + + "--catalog_conf s3.secret-key=***** " + + "--target_warehouse s3:///path2/to/warehouse " + + "--target_database test_db_copy " + + "--target_table test_table_copy " + + "--target_catalog_conf s3.endpoint=https://****.com " + + "--target_catalog_conf s3.access-key=***** " + + "--target_catalog_conf s3.secret-key=***** "); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java new file mode 100644 index 0000000000000..ddef4cdde1910 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.fs.Path; + +/** The information of copy file. */ +public class CloneFileInfo { + + private final Path filePathExcludeTableRoot; + private final String sourceIdentifier; + private final String targetIdentifier; + + public CloneFileInfo( + Path filePathExcludeTableRoot, String sourceIdentifier, String targetIdentifier) { + this.filePathExcludeTableRoot = filePathExcludeTableRoot; + this.sourceIdentifier = sourceIdentifier; + this.targetIdentifier = targetIdentifier; + } + + public Path getFilePathExcludeTableRoot() { + return filePathExcludeTableRoot; + } + + public String sourceIdentifier() { + return sourceIdentifier; + } + + public String targetIdentifier() { + return targetIdentifier; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java new file mode 100644 index 0000000000000..f8b98ca29a0b8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneSourceBuilder.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Pick the tables to be cloned based on the user input parameters. The record type of the build + * DataStream is Tuple2. The left element is the identifier of source table and the right element is + * the identifier of target table. + */ +public class CloneSourceBuilder { + + private final StreamExecutionEnvironment env; + private final Map sourceCatalogConfig; + private final String database; + private final String tableName; + private final String targetDatabase; + private final String targetTableName; + + public CloneSourceBuilder( + StreamExecutionEnvironment env, + Map sourceCatalogConfig, + String database, + String tableName, + String targetDatabase, + String targetTableName) { + this.env = env; + this.sourceCatalogConfig = sourceCatalogConfig; + this.database = database; + this.tableName = tableName; + this.targetDatabase = targetDatabase; + this.targetTableName = targetTableName; + } + + public DataStream> build() { + Catalog sourceCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); + + List> result = new ArrayList<>(); + + if (database == null) { + checkArgument(tableName == null, "tableName must be null when database is null."); + for (String db : sourceCatalog.listDatabases()) { + try { + for (String table : sourceCatalog.listTables(db)) { + String s = db + "." + table; + result.add(new Tuple2<>(s, s)); + } + } catch (Exception e) { + // ignore + } + } + } else if (tableName == null) { + try { + for (String table : sourceCatalog.listTables(database)) { + result.add(new Tuple2<>(database + "." + table, targetDatabase + "." + table)); + } + } catch (Exception e) { + throw new RuntimeException( + "Unexpected error when trying to list tables in database " + database, e); + } + } else { + result.add( + new Tuple2<>( + database + "." + tableName, targetDatabase + "." + targetTableName)); + } + + return env.fromCollection(result).forceNonParallel().forward(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java new file mode 100644 index 0000000000000..1d60df896cd49 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.IOUtils; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Map; + +/** A Operator to copy files. */ +public class CopyFileOperator extends AbstractStreamOperator + implements OneInputStreamOperator { + + private final Map sourceCatalogConfig; + private final Map targetCatalogConfig; + + private AbstractCatalog sourceCatalog; + private AbstractCatalog targetCatalog; + + public CopyFileOperator( + Map sourceCatalogConfig, Map targetCatalogConfig) { + super(); + this.sourceCatalogConfig = sourceCatalogConfig; + this.targetCatalogConfig = targetCatalogConfig; + } + + @Override + public void open() throws Exception { + super.open(); + sourceCatalog = + (AbstractCatalog) + FlinkCatalogFactory.createPaimonCatalog( + Options.fromMap(sourceCatalogConfig)); + targetCatalog = + (AbstractCatalog) + FlinkCatalogFactory.createPaimonCatalog( + Options.fromMap(targetCatalogConfig)); + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + CloneFileInfo cloneFileInfo = streamRecord.getValue(); + + FileIO sourceTableFileIO = sourceCatalog.fileIO(); + FileIO targetTableFileIO = targetCatalog.fileIO(); + Path sourceTableRootPath = + sourceCatalog.getDataTableLocation( + Identifier.fromString(cloneFileInfo.sourceIdentifier())); + Path targetTableRootPath = + targetCatalog.getDataTableLocation( + Identifier.fromString(cloneFileInfo.targetIdentifier())); + + Path filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); + Path sourcePath = new Path(sourceTableRootPath.toString() + filePathExcludeTableRoot); + Path targetPath = new Path(targetTableRootPath.toString() + filePathExcludeTableRoot); + + if (targetTableFileIO.exists(targetPath) + && targetTableFileIO.getFileSize(targetPath) + == sourceTableFileIO.getFileSize(sourcePath)) { + LOG.info( + "Skipping target file {} because it already exists and has the same size.", + targetPath); + return; + } + + LOG.debug("Begin copy file from {} to {}.", sourcePath, targetPath); + IOUtils.copyBytes( + sourceTableFileIO.newInputStream(sourcePath), + targetTableFileIO.newOutputStream(targetPath, true)); + LOG.debug("End copy file from {} to {}.", sourcePath, targetPath); + + output.collect(streamRecord); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java new file mode 100644 index 0000000000000..d6d535483e9bf --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkState; + +/** + * Pick the files to be cloned of a table based on the input record. The record type it produce is + * CloneFileInfo that indicate the information of copy file. + */ +public class PickFilesForCloneOperator extends AbstractStreamOperator + implements OneInputStreamOperator, CloneFileInfo> { + + private final Map sourceCatalogConfig; + private final Map targetCatalogConfig; + + private Catalog sourceCatalog; + private Catalog targetCatalog; + + public PickFilesForCloneOperator( + Map sourceCatalogConfig, Map targetCatalogConfig) { + this.sourceCatalogConfig = sourceCatalogConfig; + this.targetCatalogConfig = targetCatalogConfig; + } + + @Override + public void open() throws Exception { + super.open(); + sourceCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); + targetCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); + } + + @Override + public void processElement(StreamRecord> streamRecord) throws Exception { + try { + processElementImpl(streamRecord); + } catch (Exception e) { + throw new RuntimeException("Failed to pick files", e); + } + } + + private void processElementImpl(StreamRecord> streamRecord) + throws Exception { + String sourceIdentifierStr = streamRecord.getValue().f0; + Identifier sourceIdentifier = Identifier.fromString(sourceIdentifierStr); + String targetIdentifierStr = streamRecord.getValue().f1; + Identifier targetIdentifier = Identifier.fromString(targetIdentifierStr); + + FileStoreTable sourceTable = (FileStoreTable) sourceCatalog.getTable(sourceIdentifier); + targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true); + targetCatalog.createTable( + targetIdentifier, Schema.fromTableSchema(sourceTable.schema()), true); + + List result = + toCloneFileInfos( + PickFilesUtil.getUsedFilesForSnapshot(sourceTable), + sourceTable.location(), + sourceIdentifierStr, + targetIdentifierStr); + + for (CloneFileInfo info : result) { + output.collect(new StreamRecord<>(info)); + } + } + + private List toCloneFileInfos( + List files, + Path sourceTableRoot, + String sourceIdentifier, + String targetIdentifier) { + List result = new ArrayList<>(); + for (Path file : files) { + Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot); + result.add(new CloneFileInfo(relativePath, sourceIdentifier, targetIdentifier)); + } + return result; + } + + private Path getPathExcludeTableRoot(Path absolutePath, Path sourceTableRoot) { + String fileAbsolutePath = absolutePath.toUri().toString(); + String sourceTableRootPath = sourceTableRoot.toString(); + + checkState( + fileAbsolutePath.startsWith(sourceTableRootPath), + "This is a bug, please report. fileAbsolutePath is : " + + fileAbsolutePath + + ", sourceTableRootPath is : " + + sourceTableRootPath); + + return new Path(fileAbsolutePath.substring(sourceTableRootPath.length())); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java new file mode 100644 index 0000000000000..879b0da34fe65 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.manifest.SimpleFileEntry; +import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.SnapshotManager; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** PickFilesUtil. */ +public class PickFilesUtil { + + private static final int READ_FILE_RETRY_NUM = 3; + private static final int READ_FILE_RETRY_INTERVAL = 5; + + public static List getUsedFilesForSnapshot(FileStoreTable table) { + FileStore store = table.store(); + SnapshotManager snapshotManager = store.snapshotManager(); + Snapshot snapshot = snapshotManager.latestSnapshot(); + ManifestList manifestList = store.manifestListFactory().create(); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + IndexFileHandler indexFileHandler = store.newIndexFileHandler(); + + List files = new ArrayList<>(); + if (snapshot != null) { + files.add(snapshotManager.snapshotPath(snapshot.id())); + files.addAll( + getUsedFilesInternal( + snapshot, + store.pathFactory(), + store.newScan(), + manifestList, + indexFileHandler)); + } + for (long id : schemaManager.listAllIds()) { + files.add(schemaManager.toSchemaPath(id)); + } + return files; + } + + private static List getUsedFilesInternal( + Snapshot snapshot, + FileStorePathFactory pathFactory, + FileStoreScan scan, + ManifestList manifestList, + IndexFileHandler indexFileHandler) { + List files = new ArrayList<>(); + addManifestList(files, snapshot, pathFactory); + + try { + // try to read manifests + List manifestFileMetas = + retryReadingFiles( + () -> readAllManifestsWithIOException(snapshot, manifestList)); + if (manifestFileMetas == null) { + return Collections.emptyList(); + } + List manifestFileName = + manifestFileMetas.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toList()); + files.addAll( + manifestFileName.stream() + .map(pathFactory::toManifestFilePath) + .collect(Collectors.toList())); + + // try to read data files + List dataFiles = new ArrayList<>(); + List simpleFileEntries = + scan.withSnapshot(snapshot).readSimpleEntries(); + for (SimpleFileEntry simpleFileEntry : simpleFileEntries) { + Path dataFilePath = + pathFactory + .createDataFilePathFactory( + simpleFileEntry.partition(), simpleFileEntry.bucket()) + .toPath(simpleFileEntry.fileName()); + dataFiles.add(dataFilePath); + } + // In order to make files created earlier perform copying earlier. + Collections.reverse(dataFiles); + files.addAll(dataFiles); + + // try to read index files + String indexManifest = snapshot.indexManifest(); + if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { + files.add(pathFactory.indexManifestFileFactory().toPath(indexManifest)); + + List indexManifestEntries = + retryReadingFiles( + () -> indexFileHandler.readManifestWithIOException(indexManifest)); + if (indexManifestEntries == null) { + return Collections.emptyList(); + } + + indexManifestEntries.stream() + .map(IndexManifestEntry::indexFile) + .map(indexFileHandler::filePath) + .forEach(files::add); + } + + // add statistic file + if (snapshot.statistics() != null) { + files.add(pathFactory.statsFileFactory().toPath(snapshot.statistics())); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return files; + } + + private static void addManifestList( + List used, Snapshot snapshot, FileStorePathFactory pathFactory) { + used.add(pathFactory.toManifestListPath(snapshot.baseManifestList())); + used.add(pathFactory.toManifestListPath(snapshot.deltaManifestList())); + String changelogManifestList = snapshot.changelogManifestList(); + if (changelogManifestList != null) { + used.add(pathFactory.toManifestListPath(changelogManifestList)); + } + } + + private static List readAllManifestsWithIOException( + Snapshot snapshot, ManifestList manifestList) throws IOException { + List result = new ArrayList<>(); + + result.addAll(manifestList.readWithIOException(snapshot.baseManifestList())); + result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList())); + + String changelogManifestList = snapshot.changelogManifestList(); + if (changelogManifestList != null) { + result.addAll(manifestList.readWithIOException(changelogManifestList)); + } + + return result; + } + + @Nullable + private static T retryReadingFiles(ReaderWithIOException reader) throws IOException { + int retryNumber = 0; + IOException caught = null; + while (retryNumber++ < READ_FILE_RETRY_NUM) { + try { + return reader.read(); + } catch (FileNotFoundException e) { + return null; + } catch (IOException e) { + caught = e; + } + try { + TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + throw caught; + } + + /** A helper functional interface for method {@link #retryReadingFiles}. */ + @FunctionalInterface + interface ReaderWithIOException { + T read() throws IOException; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java new file mode 100644 index 0000000000000..6f6561ef3c29a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintChannelComputer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.table.sink.ChannelComputer; + +/** SnapshotHintChannelComputer. */ +public class SnapshotHintChannelComputer implements ChannelComputer { + + private static final long serialVersionUID = 1L; + + private int numChannels; + + @Override + public void setup(int numChannels) { + this.numChannels = numChannels; + } + + @Override + public int channel(CloneFileInfo record) { + int hsh = 0; + for (int i = 0; i < record.targetIdentifier().length(); i++) { + hsh = (hsh * 131 + record.targetIdentifier().charAt(i)) % numChannels; + } + return hsh; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java new file mode 100644 index 0000000000000..0c5bb2baddedf --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/SnapshotHintOperator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.clone; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; + +/** + * When the files copy finished of a table, then create snapshot hint, it means that this table can + * be used now. + */ +public class SnapshotHintOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private final Map targetCatalogConfig; + + private Catalog targetCatalog; + private Set identifiers; + + public SnapshotHintOperator(Map targetCatalogConfig) { + this.targetCatalogConfig = targetCatalogConfig; + } + + @Override + public void open() throws Exception { + super.open(); + targetCatalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); + identifiers = new HashSet<>(); + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + String identifier = streamRecord.getValue().targetIdentifier(); + identifiers.add(identifier); + } + + @Override + public void endInput() throws Exception { + for (String identifier : identifiers) { + FileStoreTable targetTable = + (FileStoreTable) targetCatalog.getTable(Identifier.fromString(identifier)); + commitSnapshotHintInTargetTable(targetTable.snapshotManager()); + } + } + + private void commitSnapshotHintInTargetTable(SnapshotManager targetTableSnapshotManager) + throws IOException { + OptionalLong optionalSnapshotId = + targetTableSnapshotManager.safelyGetAllSnapshots().stream() + .mapToLong(Snapshot::id) + .max(); + if (optionalSnapshotId.isPresent()) { + long snapshotId = optionalSnapshotId.getAsLong(); + targetTableSnapshotManager.commitEarliestHint(snapshotId); + targetTableSnapshotManager.commitLatestHint(snapshotId); + for (Snapshot snapshot : targetTableSnapshotManager.safelyGetAllSnapshots()) { + if (snapshot.id() != snapshotId) { + targetTableSnapshotManager + .fileIO() + .deleteQuietly(targetTableSnapshotManager.snapshotPath(snapshot.id())); + } + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java new file mode 100644 index 0000000000000..01a7c6be30d4f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.flink.action.CloneAction; + +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.Collections; +import java.util.Map; + +import static org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues; +import static org.apache.paimon.utils.StringUtils.isBlank; + +/** + * Clone snapshot/tag/table procedure. Usage: + * + *

+ *  CALL sys.clone('warehouse', 'database', 'table', 'catalog_config', 'target_warehouse', 'target_database',
+ *      'target_table', 'target_catalog_config', 'parallelism')
+ * 
+ */ +public class CloneProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "clone"; + + public String[] call( + ProcedureContext procedureContext, + String warehouse, + String database, + String table, + String catalogConfig, + String targetWarehouse, + String targetDatabase, + String targetTable, + String targetCatalogConfig, + String parallelism) + throws Exception { + + return innerCall( + procedureContext, + warehouse, + database, + table, + getCatalogConfigMap(catalogConfig), + targetWarehouse, + targetDatabase, + targetTable, + getCatalogConfigMap(targetCatalogConfig), + parallelism); + } + + private String[] innerCall( + ProcedureContext procedureContext, + String warehouse, + String database, + String table, + Map catalogConfigMap, + String targetWarehouse, + String targetDatabase, + String targetTable, + Map targetCatalogConfigMap, + String parallelism) + throws Exception { + + CloneAction cloneAction = + new CloneAction( + warehouse, + database, + table, + catalogConfigMap, + targetWarehouse, + targetDatabase, + targetTable, + targetCatalogConfigMap, + parallelism); + + return execute(procedureContext, cloneAction, "Clone"); + } + + private Map getCatalogConfigMap(String catalogConfig) { + return isBlank(catalogConfig) + ? Collections.emptyMap() + : parseCommaSeparatedKeyValues(catalogConfig); + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 943da3e16e6d6..fb55c746cda3f 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -14,6 +14,7 @@ # limitations under the License. ### action factories +org.apache.paimon.flink.action.CloneActionFactory org.apache.paimon.flink.action.CompactActionFactory org.apache.paimon.flink.action.CompactDatabaseActionFactory org.apache.paimon.flink.action.DropPartitionActionFactory @@ -29,6 +30,7 @@ org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory org.apache.paimon.flink.action.QueryServiceActionFactory ### procedure factories +org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactDatabaseProcedure org.apache.paimon.flink.procedure.CompactProcedure org.apache.paimon.flink.procedure.CreateTagProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index 7926e80d442e5..57b5618b59c34 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -22,18 +22,13 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.DataFormatTestUtil; import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; -import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; -import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; import org.apache.flink.table.api.TableEnvironment; @@ -42,7 +37,6 @@ import org.junit.jupiter.api.BeforeEach; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -136,16 +130,6 @@ protected void writeData(GenericRow... data) throws Exception { incrementalIdentifier++; } - protected List getResult(TableRead read, List splits, RowType rowType) - throws Exception { - try (RecordReader recordReader = read.createReader(splits)) { - List result = new ArrayList<>(); - recordReader.forEachRemaining( - row -> result.add(DataFormatTestUtil.internalRowToString(row, rowType))); - return result; - } - } - @Override protected TableEnvironmentBuilder tableEnvironmentBuilder() { return super.tableEnvironmentBuilder() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java new file mode 100644 index 0000000000000..a534719a06fa6 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CloneActionITCase.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for {@link CloneAction}. */ +public class CloneActionITCase extends ActionITCaseBase { + + private static final DataType[] FIELD_TYPES = + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}; + + private static final RowType ROW_TYPE = + RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"}); + + private final String targetTableName = "copy_table"; + + @Test + @Timeout(60_000) + public void testCloneLatestSnapshot() throws Exception { + FileStoreTable table = + prepareTable( + Arrays.asList("dt", "hh"), + Arrays.asList("dt", "hh", "k"), + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); + + writeData( + rowData(1, 100, 15, BinaryString.fromString("20221208")), + rowData(1, 100, 16, BinaryString.fromString("20221208")), + rowData(1, 100, 15, BinaryString.fromString("20221209"))); + writeData( + rowData(2, 100, 15, BinaryString.fromString("20221208")), + rowData(2, 100, 16, BinaryString.fromString("20221208")), + rowData(2, 100, 15, BinaryString.fromString("20221209"))); + + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + // do clone + runCloneLatestSnapshotAction(); + + FileStoreTable targetTable = getFileStoreTable(targetTableName); + checkLatestSnapshot(targetTable, 2, Snapshot.CommitKind.APPEND); + + List splits1 = table.newSnapshotReader().read().dataSplits(); + assertThat(splits1.size()).isEqualTo(3); + List splits2 = targetTable.newSnapshotReader().read().dataSplits(); + assertThat(splits2.size()).isEqualTo(3); + + TableScan sourceTableScan = table.newReadBuilder().newScan(); + TableScan targetTableScan = targetTable.newReadBuilder().newScan(); + + List scanResult = + Arrays.asList( + "+I[1, 100, 15, 20221208]", + "+I[1, 100, 15, 20221209]", + "+I[1, 100, 16, 20221208]", + "+I[2, 100, 15, 20221208]", + "+I[2, 100, 15, 20221209]", + "+I[2, 100, 16, 20221208]"); + validateResult(table, ROW_TYPE, sourceTableScan, scanResult, 60_000); + validateResult(targetTable, ROW_TYPE, targetTableScan, scanResult, 60_000); + } + + private FileStoreTable prepareTable( + List partitionKeys, List primaryKeys, Map tableOptions) + throws Exception { + FileStoreTable table = + createFileStoreTable( + ROW_TYPE, + partitionKeys, + primaryKeys, + Collections.emptyList(), + tableOptions); + StreamWriteBuilder streamWriteBuilder = + table.newStreamWriteBuilder().withCommitUser(commitUser); + write = streamWriteBuilder.newWrite(); + commit = streamWriteBuilder.newCommit(); + return table; + } + + private void checkLatestSnapshot( + FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) { + SnapshotManager snapshotManager = table.snapshotManager(); + Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); + assertThat(snapshot.id()).isEqualTo(snapshotId); + assertThat(snapshot.commitKind()).isEqualTo(commitKind); + } + + private void runCloneLatestSnapshotAction() throws Exception { + StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().batchMode().build(); + CloneAction action = + createAction( + CloneAction.class, + "clone", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--target_warehouse", + warehouse, + "--target_database", + database, + "--target_table", + targetTableName); + action.withStreamExecutionEnvironment(env).build(); + env.execute(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java index 4c646444cb72c..63dc671938125 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java @@ -22,48 +22,16 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.source.StreamTableScan; -import org.apache.paimon.table.source.TableScan; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; /** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction} . */ public class CompactActionITCaseBase extends ActionITCaseBase { - protected void validateResult( - FileStoreTable table, - RowType rowType, - StreamTableScan scan, - List expected, - long timeout) - throws Exception { - List actual = new ArrayList<>(); - long start = System.currentTimeMillis(); - while (actual.size() != expected.size()) { - TableScan.Plan plan = scan.plan(); - actual.addAll(getResult(table.newReadBuilder().newRead(), plan.splits(), rowType)); - - if (System.currentTimeMillis() - start > timeout) { - break; - } - } - if (actual.size() != expected.size()) { - throw new TimeoutException( - String.format( - "Cannot collect %s records in %s milliseconds.", - expected.size(), timeout)); - } - actual.sort(String::compareTo); - assertThat(actual).isEqualTo(expected); - } - protected void checkFileAndRowSize( FileStoreTable table, Long expectedSnapshotId, Long timeout, int fileNum, long rowCount) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CloneProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CloneProcedureITCase.java new file mode 100644 index 0000000000000..b57d203240aad --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CloneProcedureITCase.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.flink.CatalogITCaseBase; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Case for {@link CloneProcedure}. */ +public class CloneProcedureITCase extends CatalogITCaseBase { + + private static final DataType[] FIELD_TYPES = + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}; + + private static final RowType ROW_TYPE = + RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"}); + + @Test + public void testCloneLatestSnapshot() throws Exception { + sql( + "CREATE TABLE T (" + + " k INT," + + " v INT," + + " hh INT," + + " dt STRING," + + " PRIMARY KEY (k, dt, hh) NOT ENFORCED" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'bucket' = '1'" + + ")"); + FileStoreTable table = paimonTable("T"); + + sql( + "INSERT INTO T VALUES (1, 100, 15, '20221208'), (1, 100, 16, '20221208'), (1, 100, 15, '20221209')"); + sql( + "INSERT INTO T VALUES (2, 100, 15, '20221208'), (2, 100, 16, '20221208'), (2, 100, 15, '20221209')"); + + checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND); + + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); + sql( + "CALL sys.clone('%s', '%s', 'T', '', '%s', '%s', 'T_copy', '', '')", + path, tEnv.getCurrentDatabase(), path, tEnv.getCurrentDatabase()); + + FileStoreTable targetTable = paimonTable("T_copy"); + checkLatestSnapshot(targetTable, 2, Snapshot.CommitKind.APPEND); + + List splits1 = table.newSnapshotReader().read().dataSplits(); + assertThat(splits1.size()).isEqualTo(3); + List splits2 = targetTable.newSnapshotReader().read().dataSplits(); + assertThat(splits2.size()).isEqualTo(3); + + TableScan sourceTableScan = table.newReadBuilder().newStreamScan(); + TableScan targetTableScan = targetTable.newReadBuilder().newScan(); + + List scanResult = + Arrays.asList( + "+I[1, 100, 15, 20221208]", + "+I[1, 100, 15, 20221209]", + "+I[1, 100, 16, 20221208]", + "+I[2, 100, 15, 20221208]", + "+I[2, 100, 15, 20221209]", + "+I[2, 100, 16, 20221208]"); + validateResult(table, ROW_TYPE, sourceTableScan, scanResult, 60_000); + validateResult(targetTable, ROW_TYPE, targetTableScan, scanResult, 60_000); + } + + private void checkLatestSnapshot( + FileStoreTable table, long snapshotId, Snapshot.CommitKind commitKind) { + SnapshotManager snapshotManager = table.snapshotManager(); + Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); + assertThat(snapshot.id()).isEqualTo(snapshotId); + assertThat(snapshot.commitKind()).isEqualTo(commitKind); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java index d3efddecc5676..ab60038404697 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java @@ -18,6 +18,14 @@ package org.apache.paimon.flink.util; +import org.apache.paimon.data.DataFormatTestUtil; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileIOUtils; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -41,13 +49,55 @@ import java.io.IOException; import java.nio.file.Path; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; /** Similar to Flink's AbstractTestBase but using Junit5. */ public class AbstractTestBase { private static final int DEFAULT_PARALLELISM = 16; + protected void validateResult( + FileStoreTable table, + RowType rowType, + TableScan scan, + List expected, + long timeout) + throws Exception { + List actual = new ArrayList<>(); + long start = System.currentTimeMillis(); + while (actual.size() != expected.size()) { + TableScan.Plan plan = scan.plan(); + actual.addAll(getResult(table.newReadBuilder().newRead(), plan.splits(), rowType)); + + if (System.currentTimeMillis() - start > timeout) { + break; + } + } + if (actual.size() != expected.size()) { + throw new TimeoutException( + String.format( + "Cannot collect %s records in %s milliseconds.", + expected.size(), timeout)); + } + actual.sort(String::compareTo); + assertThat(actual).isEqualTo(expected); + } + + protected List getResult(TableRead read, List splits, RowType rowType) + throws Exception { + try (RecordReader recordReader = read.createReader(splits)) { + List result = new ArrayList<>(); + recordReader.forEachRemaining( + row -> result.add(DataFormatTestUtil.internalRowToString(row, rowType))); + return result; + } + } + @RegisterExtension protected static final MiniClusterWithClientExtension MINI_CLUSTER_EXTENSION = new MiniClusterWithClientExtension(