diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index ee2d1f2a5d96..4886678adf6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -19,7 +19,6 @@ package org.apache.paimon.catalog; import org.apache.paimon.CoreOptions; -import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -346,7 +345,7 @@ private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistE TableSchema tableSchema = getDataTableSchema(identifier); return FileStoreTableFactory.create( fileIO, - getDataTableLocation(identifier), + getTableLocation(identifier), tableSchema, new CatalogEnvironment( identifier, @@ -374,7 +373,7 @@ public Map> allTablePaths() { Map tableMap = allPaths.computeIfAbsent(database, d -> new HashMap<>()); for (String table : listTables(database)) { - tableMap.put(table, getDataTableLocation(Identifier.create(database, table))); + tableMap.put(table, getTableLocation(Identifier.create(database, table))); } } return allPaths; @@ -386,8 +385,8 @@ public Map> allTablePaths() { protected abstract TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException; - @VisibleForTesting - public Path getDataTableLocation(Identifier identifier) { + @Override + public Path getTableLocation(Identifier identifier) { return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 7bf645e98633..106f7b15afd7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -160,6 +160,14 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) */ Table getTable(Identifier identifier) throws TableNotExistException; + /** + * Get the table location in this catalog. If the table exists, return the location of the + * table; If the table does not exist, construct the location for table. + * + * @return the table location + */ + Path getTableLocation(Identifier identifier); + /** * Get names of all tables under this database. An empty list is returned if none exists. * @@ -170,13 +178,6 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) */ List listTables(String databaseName) throws DatabaseNotExistException; - /** - * Get the table location in this catalog. - * - * @return the table location - */ - Path getDataTableLocation(Identifier identifier); - /** * Check if a table exists in this catalog. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 2080bb99991b..c2e36dea3065 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -136,8 +136,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } @Override - public Path getDataTableLocation(Identifier identifier) { - return wrapped.getDataTableLocation(identifier); + public Path getTableLocation(Identifier identifier) { + return wrapped.getTableLocation(identifier); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index d04b975dd7c0..14b4d171835c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -99,19 +99,19 @@ public boolean tableExists(Identifier identifier) { } return tableExistsInFileSystem( - getDataTableLocation(identifier), identifier.getBranchNameOrDefault()); + getTableLocation(identifier), identifier.getBranchNameOrDefault()); } @Override public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { return tableSchemaInFileSystem( - getDataTableLocation(identifier), identifier.getBranchNameOrDefault()) + getTableLocation(identifier), identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); } @Override protected void dropTableImpl(Identifier identifier) { - Path path = getDataTableLocation(identifier); + Path path = getTableLocation(identifier); uncheck(() -> fileIO.delete(path, true)); } @@ -121,7 +121,7 @@ public void createTableImpl(Identifier identifier, Schema schema) { } private SchemaManager schemaManager(Identifier identifier) { - Path path = getDataTableLocation(identifier); + Path path = getTableLocation(identifier); CatalogLock catalogLock = lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null); return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()) @@ -135,8 +135,8 @@ private CatalogLockContext assertGetLockContext() { @Override public void renameTableImpl(Identifier fromTable, Identifier toTable) { - Path fromPath = getDataTableLocation(fromTable); - Path toPath = getDataTableLocation(toTable); + Path fromPath = getTableLocation(fromTable); + Path toPath = getTableLocation(toTable); uncheck(() -> fileIO.rename(fromPath, toPath)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index da08309ad69f..bb4f61c638cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -220,7 +220,7 @@ protected void dropTableImpl(Identifier identifier) { LOG.info("Skipping drop, table does not exist: {}", identifier); return; } - Path path = getDataTableLocation(identifier); + Path path = getTableLocation(identifier); try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); @@ -239,7 +239,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) { // create table file getSchemaManager(identifier).createTable(schema); // Update schema metadata - Path path = getDataTableLocation(identifier); + Path path = getTableLocation(identifier); int insertRecord = connections.run( conn -> { @@ -276,11 +276,11 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); - Path fromPath = getDataTableLocation(fromTable); + Path fromPath = getTableLocation(fromTable); if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. - Path toPath = getDataTableLocation(toTable); + Path toPath = getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); } catch (IOException e) { @@ -311,7 +311,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName())) { throw new TableNotExistException(identifier); } - Path tableLocation = getDataTableLocation(identifier); + Path tableLocation = getTableLocation(identifier); return new SchemaManager(fileIO, tableLocation) .latest() .orElseThrow( @@ -354,8 +354,7 @@ public void close() throws Exception { } private SchemaManager getSchemaManager(Identifier identifier) { - return new SchemaManager(fileIO, getDataTableLocation(identifier)) - .withLock(lock(identifier)); + return new SchemaManager(fileIO, getTableLocation(identifier)).withLock(lock(identifier)); } private Map fetchProperties(String databaseName) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index a3208e3f341d..d3554242994b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -67,10 +67,10 @@ public void processElement(StreamRecord streamRecord) throws Exce FileIO sourceTableFileIO = sourceCatalog.fileIO(); FileIO targetTableFileIO = targetCatalog.fileIO(); Path sourceTableRootPath = - sourceCatalog.getDataTableLocation( + sourceCatalog.getTableLocation( Identifier.fromString(cloneFileInfo.getSourceIdentifier())); Path targetTableRootPath = - targetCatalog.getDataTableLocation( + targetCatalog.getTableLocation( Identifier.fromString(cloneFileInfo.getTargetIdentifier())); String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 4ed7c54d8a74..7156fbba233c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -183,7 +183,7 @@ public Optional metastoreClientFactory(Identifier ident } @Override - public Path getDataTableLocation(Identifier identifier) { + public Path getTableLocation(Identifier identifier) { try { String databaseName = identifier.getDatabaseName(); String tableName = identifier.getTableName(); @@ -209,7 +209,7 @@ public Path getDataTableLocation(Identifier identifier) { } return Optional.empty(); }); - return tablePath.orElse(super.getDataTableLocation(identifier)); + return tablePath.orElse(super.getTableLocation(identifier)); } catch (TException e) { throw new RuntimeException("Can not get table " + identifier + " from metastore.", e); } catch (InterruptedException e) { @@ -379,8 +379,7 @@ public boolean tableExists(Identifier identifier) { return isPaimonTable(table) && tableSchemaInFileSystem( - getDataTableLocation(identifier), - identifier.getBranchNameOrDefault()) + getTableLocation(identifier), identifier.getBranchNameOrDefault()) .isPresent(); } @@ -398,7 +397,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis } return tableSchemaInFileSystem( - getDataTableLocation(identifier), identifier.getBranchNameOrDefault()) + getTableLocation(identifier), identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); } @@ -431,7 +430,7 @@ protected void dropTableImpl(Identifier identifier) { // Deletes table directory to avoid schema in filesystem exists after dropping hive // table successfully to keep the table consistency between which in filesystem and // which in Hive metastore. - Path path = getDataTableLocation(identifier); + Path path = getTableLocation(identifier); try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); @@ -466,7 +465,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) { try { clients.execute(client -> client.createTable(createHiveTable(identifier, tableSchema))); } catch (Exception e) { - Path path = getDataTableLocation(identifier); + Path path = getTableLocation(identifier); try { fileIO.deleteDirectoryQuietly(path); } catch (Exception ee) { @@ -495,11 +494,11 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { table.setTableName(toTable.getTableName()); clients.execute(client -> client.alter_table(fromDB, fromTableName, table)); - Path fromPath = getDataTableLocation(fromTable); + Path fromPath = getTableLocation(fromTable); if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. - Path toPath = getDataTableLocation(toTable); + Path toPath = getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); } catch (IOException e) { @@ -616,8 +615,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException { TableSchema tableSchema = tableSchemaInFileSystem( - getDataTableLocation(identifier), - identifier.getBranchNameOrDefault()) + getTableLocation(identifier), identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); Table newTable = createHiveTable(identifier, tableSchema); try { @@ -754,7 +752,7 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche } // update location - locationHelper.specifyTableLocation(table, getDataTableLocation(identifier).toString()); + locationHelper.specifyTableLocation(table, getTableLocation(identifier).toString()); } private void updateHmsTablePars(Table table, TableSchema schema) { @@ -779,9 +777,7 @@ private FieldSchema convertToFieldSchema(DataField dataField) { private SchemaManager schemaManager(Identifier identifier) { return new SchemaManager( - fileIO, - getDataTableLocation(identifier), - identifier.getBranchNameOrDefault()) + fileIO, getTableLocation(identifier), identifier.getBranchNameOrDefault()) .withLock(lock(identifier)); } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java index f12a3d8fabe9..9b1f62449e17 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java @@ -121,7 +121,7 @@ public void testReadWriteTableWithBitmapIndex() throws Catalog.TableNotExistExce protected void foreachIndexReader(Consumer consumer) throws Catalog.TableNotExistException { - Path tableRoot = fileSystemCatalog.getDataTableLocation(Identifier.create("db", "T")); + Path tableRoot = fileSystemCatalog.getTableLocation(Identifier.create("db", "T")); SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot); FileStorePathFactory pathFactory = new FileStorePathFactory(