diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index b56fec279ab1..a1b41e3b8a41 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -496,7 +496,6 @@ public Optional metastoreClientFactory( return Optional.empty(); } - @Override public Path getTableLocation(Identifier identifier) { return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index d919c5978297..c3808caa135a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -20,7 +20,6 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -136,14 +135,6 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) */ Table getTable(Identifier identifier) throws TableNotExistException; - /** - * Get the table location in this catalog. If the table exists, return the location of the - * table; If the table does not exist, construct the location for table. - * - * @return the table location - */ - Path getTableLocation(Identifier identifier); - /** * Get names of all tables under this database. An empty list is returned if none exists. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index ec14d53a2b03..2298626b0e48 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -19,7 +19,6 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -147,11 +146,6 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN wrapped.renameView(fromView, toView, ignoreIfNotExists); } - @Override - public Path getTableLocation(Identifier identifier) { - return wrapped.getTableLocation(identifier); - } - @Override public void createPartition(Identifier identifier, Map partitions) throws TableNotExistException { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 03b257efbf86..86b87e25e832 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -177,11 +176,6 @@ public Table getTable(Identifier identifier) throws TableNotExistException { throw new UnsupportedOperationException(); } - @Override - public Path getTableLocation(Identifier identifier) { - throw new UnsupportedOperationException(); - } - @Override public List listTables(String databaseName) throws DatabaseNotExistException { return new ArrayList(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 3a7f9790ccca..dd95c48af8d1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -25,7 +25,6 @@ import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; import org.apache.paimon.flink.utils.FlinkDescriptorProperties; -import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.options.Options; @@ -525,20 +524,6 @@ protected Schema buildPaimonSchema( registerLogSystem(catalog, identifier, options, classLoader); } - // remove table path - String path = options.remove(PATH.key()); - if (path != null) { - Path expectedPath = catalog.getTableLocation(identifier); - if (!new Path(path).equals(expectedPath)) { - throw new CatalogException( - String.format( - "You specified the Path when creating the table, " - + "but the Path '%s' is different from where it should be '%s'. " - + "Please remove the Path.", - path, expectedPath)); - } - } - if (catalogTable instanceof CatalogTable) { return fromCatalogTable(((CatalogTable) catalogTable).copy(options)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index d3554242994b..8bcaa2a2071f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -18,12 +18,14 @@ package org.apache.paimon.flink.clone; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.IOUtils; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -32,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; /** A Operator to copy files. */ @@ -43,8 +46,11 @@ public class CopyFileOperator extends AbstractStreamOperator private final Map sourceCatalogConfig; private final Map targetCatalogConfig; - private Catalog sourceCatalog; - private Catalog targetCatalog; + private transient Catalog sourceCatalog; + private transient Catalog targetCatalog; + + private transient Map srcLocations; + private transient Map targetLocations; public CopyFileOperator( Map sourceCatalogConfig, Map targetCatalogConfig) { @@ -58,6 +64,8 @@ public void open() throws Exception { FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); + srcLocations = new HashMap<>(); + targetLocations = new HashMap<>(); } @Override @@ -66,12 +74,29 @@ public void processElement(StreamRecord streamRecord) throws Exce FileIO sourceTableFileIO = sourceCatalog.fileIO(); FileIO targetTableFileIO = targetCatalog.fileIO(); + Path sourceTableRootPath = - sourceCatalog.getTableLocation( - Identifier.fromString(cloneFileInfo.getSourceIdentifier())); + srcLocations.computeIfAbsent( + cloneFileInfo.getSourceIdentifier(), + key -> { + try { + return pathOfTable( + sourceCatalog.getTable(Identifier.fromString(key))); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + }); Path targetTableRootPath = - targetCatalog.getTableLocation( - Identifier.fromString(cloneFileInfo.getTargetIdentifier())); + targetLocations.computeIfAbsent( + cloneFileInfo.getTargetIdentifier(), + key -> { + try { + return pathOfTable( + targetCatalog.getTable(Identifier.fromString(key))); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + }); String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot); @@ -110,6 +135,10 @@ public void processElement(StreamRecord streamRecord) throws Exce output.collect(streamRecord); } + private Path pathOfTable(Table table) { + return new Path(table.options().get(CoreOptions.PATH.key())); + } + @Override public void close() throws Exception { if (sourceCatalog != null) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index e4286eb18172..734a47dead06 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -102,6 +102,7 @@ /** Test for {@link FlinkCatalog}. */ public class FlinkCatalogTest { + private static final String TESTING_LOG_STORE = "testing"; private final ObjectPath path1 = new ObjectPath("db1", "t1"); @@ -348,12 +349,7 @@ public void testCreateFlinkTableWithPath() throws Exception { CatalogTable table1 = createTable(options); assertThatThrownBy(() -> catalog.createTable(this.path1, table1, false)) .hasMessageContaining( - "You specified the Path when creating the table, " - + "but the Path '/unknown/path' is different from where it should be"); - - options.put(PATH.key(), warehouse + "/db1.db/t1"); - CatalogTable table2 = createTable(options); - catalog.createTable(this.path1, table2, false); + "The current catalog FileSystemCatalog does not support specifying the table path when creating a table."); } @ParameterizedTest diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java index f0f4596c61bb..9e5fe7ff9ff7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java @@ -83,7 +83,7 @@ private void innerTest(boolean deletionVectors) throws Exception { .build(); catalog.createTable(identifier, schema, true); FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); - Path location = catalog.getTableLocation(identifier); + Path location = table.location(); Path successFile = new Path(location, "a=0/_SUCCESS"); PartitionMarkDone markDone = PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table).get();