From 5586a952947729563f1fbf366bb519c407a18a8a Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Mon, 5 Aug 2024 16:05:14 +0800 Subject: [PATCH] [core] add interface in Catalog for CloneAction adapter CachingCatalog --- .../org/apache/paimon/catalog/Catalog.java | 8 ++++++++ .../apache/paimon/catalog/DelegateCatalog.java | 6 ++++++ .../paimon/flink/clone/CopyFileOperator.java | 18 ++++-------------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 072af2a4e7df..7bf645e98633 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -169,6 +170,13 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) */ List listTables(String databaseName) throws DatabaseNotExistException; + /** + * Get the table location in this catalog. + * + * @return the table location + */ + Path getDataTableLocation(Identifier identifier); + /** * Check if a table exists in this catalog. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 20bb99f1aa31..2080bb99991b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -19,6 +19,7 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -134,6 +135,11 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return wrapped.getTable(identifier); } + @Override + public Path getDataTableLocation(Identifier identifier) { + return wrapped.getDataTableLocation(identifier); + } + @Override public void dropPartition(Identifier identifier, Map partitions) throws TableNotExistException, PartitionNotExistException { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index e5c83360627e..a3208e3f341d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -34,8 +34,6 @@ import java.util.Map; -import static org.apache.paimon.CoreOptions.PATH; - /** A Operator to copy files. */ public class CopyFileOperator extends AbstractStreamOperator implements OneInputStreamOperator { @@ -69,19 +67,11 @@ public void processElement(StreamRecord streamRecord) throws Exce FileIO sourceTableFileIO = sourceCatalog.fileIO(); FileIO targetTableFileIO = targetCatalog.fileIO(); Path sourceTableRootPath = - new Path( - sourceCatalog - .getTable( - Identifier.fromString(cloneFileInfo.getSourceIdentifier())) - .options() - .get(PATH.key())); + sourceCatalog.getDataTableLocation( + Identifier.fromString(cloneFileInfo.getSourceIdentifier())); Path targetTableRootPath = - new Path( - targetCatalog - .getTable( - Identifier.fromString(cloneFileInfo.getTargetIdentifier())) - .options() - .get(PATH.key())); + targetCatalog.getDataTableLocation( + Identifier.fromString(cloneFileInfo.getTargetIdentifier())); String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot);