From 7df888005e9c82fc337480956ddca6e54529fc55 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 1 Aug 2024 12:34:00 +0800 Subject: [PATCH] [core] Identifier can now recognize branch and system tables (#3862) --- .../org/apache/paimon/utils/StringUtils.java | 4 +- .../paimon/catalog/AbstractCatalog.java | 125 +++++++----------- .../apache/paimon/catalog/CachingCatalog.java | 17 ++- .../org/apache/paimon/catalog/Catalog.java | 2 +- .../apache/paimon/catalog/CatalogUtils.java | 8 +- .../paimon/catalog/FileSystemCatalog.java | 33 ++--- .../org/apache/paimon/catalog/Identifier.java | 103 +++++++++++++-- .../org/apache/paimon/jdbc/JdbcCatalog.java | 21 ++- .../apache/paimon/schema/SchemaManager.java | 37 ++++-- .../paimon/table/AbstractFileStoreTable.java | 14 +- .../paimon/catalog/CatalogTestBase.java | 10 +- .../paimon/table/SchemaEvolutionTest.java | 2 +- .../apache/paimon/flink/BranchSqlITCase.java | 11 +- .../paimon/flink/CatalogTableITCase.java | 5 +- .../org/apache/paimon/hive/HiveCatalog.java | 47 +++---- 15 files changed, 243 insertions(+), 196 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java index aeea97ffd9f6..4d0d5b6722b2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java @@ -373,7 +373,7 @@ public static String randomNumericString(int len) { * @return an array of parsed Strings, {@code null} if null String input */ public static String[] split(final String str, final String separatorChars) { - return splitWorker(str, separatorChars, -1, false); + return split(str, separatorChars, -1, false); } /** @@ -388,7 +388,7 @@ public static String[] split(final String str, final String separatorChars) { * separators; if {@code false}, adjacent separators are treated as one separator. * @return an array of parsed Strings, {@code null} if null String input */ - private static String[] splitWorker( + public static String[] split( final String str, final String separatorChars, final int max, diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 062e935328fc..ee2d1f2a5d96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -38,8 +38,7 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; -import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; @@ -156,6 +155,7 @@ protected abstract Map loadDatabasePropertiesImpl(String name) @Override public void dropPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { + checkNotSystemTable(identifier, "dropPartition"); Table table = getTable(identifier); FileStoreTable fileStoreTable = (FileStoreTable) table; try (FileStoreCommit commit = @@ -181,7 +181,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade throw new DatabaseNotExistException(name); } - if (!cascade && listTables(name).size() > 0) { + if (!cascade && !listTables(name).isEmpty()) { throw new DatabaseNotEmptyException(name); } @@ -282,14 +282,6 @@ public void alterTable( validateIdentifierNameCaseInsensitive(identifier); validateFieldNameCaseInsensitiveInSchemaChange(changes); - Optional> optionalBranchName = - getOriginalIdentifierAndBranch(identifier); - String branchName = DEFAULT_MAIN_BRANCH; - if (optionalBranchName.isPresent()) { - identifier = optionalBranchName.get().getLeft(); - branchName = optionalBranchName.get().getRight(); - } - if (!tableExists(identifier)) { if (ignoreIfNotExists) { return; @@ -297,11 +289,10 @@ public void alterTable( throw new TableNotExistException(identifier); } - alterTableImpl(identifier, branchName, changes); + alterTableImpl(identifier, changes); } - protected abstract void alterTableImpl( - Identifier identifier, String branchName, List changes) + protected abstract void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException; @Nullable @@ -317,7 +308,7 @@ private LineageMetaFactory findAndCreateLineageMeta(Options options, ClassLoader @Override public Table getTable(Identifier identifier) throws TableNotExistException { if (isSystemDatabase(identifier.getDatabaseName())) { - String tableName = identifier.getObjectName(); + String tableName = identifier.getTableName(); Table table = SystemTableLoader.loadGlobal( tableName, @@ -330,12 +321,17 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } return table; } else if (isSpecifiedSystemTable(identifier)) { - String[] splits = tableAndSystemName(identifier); - String tableName = splits[0]; - String type = splits[1]; FileStoreTable originTable = - getDataTable(new Identifier(identifier.getDatabaseName(), tableName)); - Table table = SystemTableLoader.load(type, originTable); + getDataTable( + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getBranchName(), + null)); + Table table = + SystemTableLoader.load( + Preconditions.checkNotNull(identifier.getSystemTableName()), + originTable); if (table == null) { throw new TableNotExistException(identifier); } @@ -346,15 +342,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException { - Optional> optionalBranchName = - getOriginalIdentifierAndBranch(identifier); - String branch = DEFAULT_MAIN_BRANCH; - if (optionalBranchName.isPresent()) { - identifier = optionalBranchName.get().getLeft(); - branch = optionalBranchName.get().getRight(); - } - - TableSchema tableSchema = getDataTableSchema(identifier, branch); + Preconditions.checkArgument(identifier.getSystemTableName() == null); + TableSchema tableSchema = getDataTableSchema(identifier); return FileStoreTableFactory.create( fileIO, getDataTableLocation(identifier), @@ -394,35 +383,16 @@ public Map> allTablePaths() { } } - protected abstract TableSchema getDataTableSchema(Identifier identifier, String branchName) + protected abstract TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException; @VisibleForTesting public Path getDataTableLocation(Identifier identifier) { - return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getObjectName()); - } - - private static Optional> getOriginalIdentifierAndBranch( - Identifier identifier) { - String tableName = identifier.getObjectName(); - if (tableName.contains(BRANCH_PREFIX)) { - int idx = tableName.indexOf(BRANCH_PREFIX); - String branchName = tableName.substring(idx + BRANCH_PREFIX.length()); - if (StringUtils.isNullOrWhitespaceOnly(branchName)) { - return Optional.empty(); - } else { - return Optional.of( - Pair.of( - Identifier.create( - identifier.getDatabaseName(), tableName.substring(0, idx)), - branchName)); - } - } - return Optional.empty(); + return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName()); } - protected void checkNotBranch(Identifier identifier, String method) { - if (getOriginalIdentifierAndBranch(identifier).isPresent()) { + protected static void checkNotBranch(Identifier identifier, String method) { + if (identifier.getBranchName() != null) { throw new IllegalArgumentException( String.format( "Cannot '%s' for branch table '%s', " @@ -431,23 +401,23 @@ protected void checkNotBranch(Identifier identifier, String method) { } } - protected void assertMainBranch(String branchName) { - if (!DEFAULT_MAIN_BRANCH.equals(branchName)) { + protected void assertMainBranch(Identifier identifier) { + if (identifier.getBranchName() != null + && !DEFAULT_MAIN_BRANCH.equals(identifier.getBranchName())) { throw new UnsupportedOperationException( this.getClass().getName() + " currently does not support table branches"); } } public static boolean isSpecifiedSystemTable(Identifier identifier) { - return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER) - && !getOriginalIdentifierAndBranch(identifier).isPresent(); + return identifier.getSystemTableName() != null; } protected static boolean isSystemTable(Identifier identifier) { return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier); } - protected void checkNotSystemTable(Identifier identifier, String method) { + protected static void checkNotSystemTable(Identifier identifier, String method) { if (isSystemTable(identifier)) { throw new IllegalArgumentException( String.format( @@ -460,26 +430,12 @@ private void copyTableDefaultOptions(Map options) { tableDefaultOptions.forEach(options::putIfAbsent); } - public static String[] tableAndSystemName(Identifier identifier) { - String[] splits = StringUtils.split(identifier.getObjectName(), SYSTEM_TABLE_SPLITTER); - if (splits.length != 2) { - throw new IllegalArgumentException( - "System table can only contain one '$' separator, but this is: " - + identifier.getObjectName()); - } - return splits; - } - public static Path newTableLocation(String warehouse, Identifier identifier) { - if (isSpecifiedSystemTable(identifier)) { - throw new IllegalArgumentException( - String.format( - "Table name[%s] cannot contain '%s' separator", - identifier.getObjectName(), SYSTEM_TABLE_SPLITTER)); - } + checkNotBranch(identifier, "newTableLocation"); + checkNotSystemTable(identifier, "newTableLocation"); return new Path( newDatabasePath(warehouse, identifier.getDatabaseName()), - identifier.getObjectName()); + identifier.getTableName()); } public static Path newDatabasePath(String warehouse, String database) { @@ -548,18 +504,29 @@ protected List listDatabasesInFileSystem(Path warehouse) throws IOExcept protected List listTablesInFileSystem(Path databasePath) throws IOException { List tables = new ArrayList<>(); for (FileStatus status : fileIO.listDirectories(databasePath)) { - if (status.isDir() && tableExistsInFileSystem(status.getPath())) { + if (status.isDir() && tableExistsInFileSystem(status.getPath(), DEFAULT_MAIN_BRANCH)) { tables.add(status.getPath().getName()); } } return tables; } - protected boolean tableExistsInFileSystem(Path tablePath) { - return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty(); + protected boolean tableExistsInFileSystem(Path tablePath, String branchName) { + return !new SchemaManager(fileIO, tablePath, branchName).listAllIds().isEmpty(); } - public Optional tableSchemaInFileSystem(Path tablePath) { - return new SchemaManager(fileIO, tablePath).latest(); + public Optional tableSchemaInFileSystem(Path tablePath, String branchName) { + return new SchemaManager(fileIO, tablePath, branchName) + .latest() + .map( + s -> { + if (!DEFAULT_MAIN_BRANCH.equals(branchName)) { + Options branchOptions = new Options(s.options()); + branchOptions.set(CoreOptions.BRANCH, branchName); + return s.copy(branchOptions.toMap()); + } else { + return s; + } + }); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index b11765a9a1f9..b229963f515f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -22,6 +22,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -39,7 +40,6 @@ import java.util.Map; import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable; -import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName; import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES; @@ -142,18 +142,21 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } if (isSpecifiedSystemTable(identifier)) { - String[] splits = tableAndSystemName(identifier); - String tableName = splits[0]; - String type = splits[1]; - Identifier originIdentifier = - Identifier.create(identifier.getDatabaseName(), tableName); + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getBranchName(), + null); Table originTable = tableCache.getIfPresent(originIdentifier); if (originTable == null) { originTable = wrapped.getTable(originIdentifier); tableCache.put(originIdentifier, originTable); } - table = SystemTableLoader.load(type, (FileStoreTable) originTable); + table = + SystemTableLoader.load( + Preconditions.checkNotNull(identifier.getSystemTableName()), + (FileStoreTable) originTable); if (table == null) { throw new TableNotExistException(identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index ac229464b008..072af2a4e7df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -48,9 +48,9 @@ public interface Catalog extends AutoCloseable { String DEFAULT_DATABASE = "default"; - String BRANCH_PREFIX = "$branch_"; String SYSTEM_TABLE_SPLITTER = "$"; String SYSTEM_DATABASE_NAME = "sys"; + String SYSTEM_BRANCH_PREFIX = "branch_"; String COMMENT_PROP = "comment"; String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; String DB_LOCATION_PROP = "location"; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index ec263f90a566..39f81833a9eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -37,18 +37,18 @@ public static String warehouse(String path) { } public static String database(Path path) { - return SchemaManager.fromPath(path.toString(), false).getDatabaseName(); + return SchemaManager.identifierFromPath(path.toString(), false).getDatabaseName(); } public static String database(String path) { - return SchemaManager.fromPath(path, false).getDatabaseName(); + return SchemaManager.identifierFromPath(path, false).getDatabaseName(); } public static String table(Path path) { - return SchemaManager.fromPath(path.toString(), false).getObjectName(); + return SchemaManager.identifierFromPath(path.toString(), false).getObjectName(); } public static String table(String path) { - return SchemaManager.fromPath(path, false).getObjectName(); + return SchemaManager.identifierFromPath(path, false).getObjectName(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 64f38a106c93..d04b975dd7c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -18,7 +18,6 @@ package org.apache.paimon.catalog; -import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.Lock; @@ -37,7 +36,6 @@ import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** A catalog implementation for {@link FileIO}. */ public class FileSystemCatalog extends AbstractCatalog { @@ -100,24 +98,14 @@ public boolean tableExists(Identifier identifier) { return super.tableExists(identifier); } - return tableExistsInFileSystem(getDataTableLocation(identifier)); + return tableExistsInFileSystem( + getDataTableLocation(identifier), identifier.getBranchNameOrDefault()); } @Override - public TableSchema getDataTableSchema(Identifier identifier, String branchName) - throws TableNotExistException { - return schemaManager(identifier, branchName) - .latest() - .map( - s -> { - if (!DEFAULT_MAIN_BRANCH.equals(branchName)) { - Options branchOptions = new Options(s.options()); - branchOptions.set(CoreOptions.BRANCH, branchName); - return s.copy(branchOptions.toMap()); - } else { - return s; - } - }) + public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + return tableSchemaInFileSystem( + getDataTableLocation(identifier), identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); } @@ -129,14 +117,14 @@ protected void dropTableImpl(Identifier identifier) { @Override public void createTableImpl(Identifier identifier, Schema schema) { - uncheck(() -> schemaManager(identifier, DEFAULT_MAIN_BRANCH).createTable(schema)); + uncheck(() -> schemaManager(identifier).createTable(schema)); } - private SchemaManager schemaManager(Identifier identifier, String branchName) { + private SchemaManager schemaManager(Identifier identifier) { Path path = getDataTableLocation(identifier); CatalogLock catalogLock = lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null); - return new SchemaManager(fileIO, path, branchName) + return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()) .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); } @@ -153,10 +141,9 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) { } @Override - protected void alterTableImpl( - Identifier identifier, String branchName, List changes) + protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - schemaManager(identifier, branchName).commitChanges(changes); + schemaManager(identifier).commitChanges(changes); } protected static T uncheck(Callable callable) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java index 15eb31b002e8..c5473a7143c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java @@ -21,8 +21,12 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Objects; @@ -41,11 +45,35 @@ public class Identifier implements Serializable { public static final String UNKNOWN_DATABASE = "unknown"; private final String database; - private final String table; + private final String object; + + private transient String table; + private transient String branch; + private transient String systemTable; + + public Identifier(String database, String object) { + this.database = database; + this.object = object; + } - public Identifier(String database, String table) { + public Identifier( + String database, String table, @Nullable String branch, @Nullable String systemTable) { this.database = database; + + StringBuilder builder = new StringBuilder(table); + if (branch != null) { + builder.append(Catalog.SYSTEM_TABLE_SPLITTER) + .append(Catalog.SYSTEM_BRANCH_PREFIX) + .append(branch); + } + if (systemTable != null) { + builder.append(Catalog.SYSTEM_TABLE_SPLITTER).append(systemTable); + } + this.object = builder.toString(); + this.table = table; + this.branch = branch; + this.systemTable = systemTable; } public String getDatabaseName() { @@ -53,13 +81,64 @@ public String getDatabaseName() { } public String getObjectName() { - return table; + return object; } public String getFullName() { return UNKNOWN_DATABASE.equals(this.database) - ? table - : String.format("%s.%s", database, table); + ? object + : String.format("%s.%s", database, object); + } + + public String getTableName() { + splitObjectName(); + return table; + } + + public @Nullable String getBranchName() { + splitObjectName(); + return branch; + } + + public String getBranchNameOrDefault() { + String branch = getBranchName(); + return branch == null ? BranchManager.DEFAULT_MAIN_BRANCH : branch; + } + + public @Nullable String getSystemTableName() { + splitObjectName(); + return systemTable; + } + + private void splitObjectName() { + if (table != null) { + return; + } + + String[] splits = StringUtils.split(object, Catalog.SYSTEM_TABLE_SPLITTER, -1, true); + if (splits.length == 1) { + table = object; + branch = null; + systemTable = null; + } else if (splits.length == 2) { + table = splits[0]; + if (splits[1].startsWith(Catalog.SYSTEM_BRANCH_PREFIX)) { + branch = splits[1].substring(Catalog.SYSTEM_BRANCH_PREFIX.length()); + systemTable = null; + } else { + branch = null; + systemTable = splits[1]; + } + } else if (splits.length == 3) { + Preconditions.checkArgument( + splits[1].startsWith(Catalog.SYSTEM_BRANCH_PREFIX), + "System table can only contain one '$' separator, but this is: " + object); + table = splits[0]; + branch = splits[1].substring(Catalog.SYSTEM_BRANCH_PREFIX.length()); + systemTable = splits[2]; + } else { + throw new IllegalArgumentException("Invalid object name: " + object); + } } public String getEscapedFullName() { @@ -68,11 +147,11 @@ public String getEscapedFullName() { public String getEscapedFullName(char escapeChar) { return String.format( - "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar, table, escapeChar); + "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar, object, escapeChar); } - public static Identifier create(String db, String table) { - return new Identifier(db, table); + public static Identifier create(String db, String object) { + return new Identifier(db, object); } public static Identifier fromString(String fullName) { @@ -84,7 +163,7 @@ public static Identifier fromString(String fullName) { if (paths.length != 2) { throw new IllegalArgumentException( String.format( - "Cannot get splits from '%s' to get database and table", fullName)); + "Cannot get splits from '%s' to get database and object", fullName)); } return new Identifier(paths[0], paths[1]); @@ -99,17 +178,17 @@ public boolean equals(Object o) { return false; } Identifier that = (Identifier) o; - return Objects.equals(database, that.database) && Objects.equals(table, that.table); + return Objects.equals(database, that.database) && Objects.equals(object, that.object); } @Override public int hashCode() { - return Objects.hash(database, table); + return Objects.hash(database, object); } @Override public String toString() { - return "Identifier{" + "database='" + database + '\'' + ", table='" + table + '\'' + '}'; + return String.format("Identifier{database='%s', object='%s'}", database, object); } public static RowType schema() { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 556c071f2e75..da08309ad69f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -214,7 +214,7 @@ protected void dropTableImpl(Identifier identifier) { JdbcUtils.DROP_TABLE_SQL, catalogKey, identifier.getDatabaseName(), - identifier.getObjectName()); + identifier.getTableName()); if (deletedRecords == 0) { LOG.info("Skipping drop, table does not exist: {}", identifier); @@ -248,7 +248,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) { JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) { sql.setString(1, catalogKey); sql.setString(2, identifier.getDatabaseName()); - sql.setString(3, identifier.getObjectName()); + sql.setString(3, identifier.getTableName()); return sql.executeUpdate(); } }); @@ -277,7 +277,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { updateTable(connections, catalogKey, fromTable, toTable); Path fromPath = getDataTableLocation(fromTable); - if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { + if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. Path toPath = getDataTableLocation(toTable); @@ -297,23 +297,18 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { } @Override - protected void alterTableImpl( - Identifier identifier, String branchName, List changes) + protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - assertMainBranch(branchName); + assertMainBranch(identifier); SchemaManager schemaManager = getSchemaManager(identifier); schemaManager.commitChanges(changes); } @Override - protected TableSchema getDataTableSchema(Identifier identifier, String branchName) - throws TableNotExistException { - assertMainBranch(branchName); + protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + assertMainBranch(identifier); if (!JdbcUtils.tableExists( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getObjectName())) { + connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName())) { throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 3c9db09aa0a6..56773c8215b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -193,7 +193,8 @@ public TableSchema commitChanges(List changes) latest().orElseThrow( () -> new Catalog.TableNotExistException( - fromPath(branchPath(), true))); + identifierFromPath( + tableRoot.toString(), true, branch))); Map newOptions = new HashMap<>(oldTableSchema.options()); List newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); @@ -219,13 +220,14 @@ public TableSchema commitChanges(List changes) SchemaChange.Move move = addColumn.move(); if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) { throw new Catalog.ColumnAlreadyExistException( - fromPath(branchPath(), true), addColumn.fieldName()); + identifierFromPath(tableRoot.toString(), true, branch), + addColumn.fieldName()); } Preconditions.checkArgument( addColumn.dataType().isNullable(), "Column %s cannot specify NOT NULL in the %s table.", addColumn.fieldName(), - fromPath(branchPath(), true).getFullName()); + identifierFromPath(tableRoot.toString(), true, branch).getFullName()); int id = highestFieldId.incrementAndGet(); DataType dataType = ReassignFieldId.reassign(addColumn.dataType(), highestFieldId); @@ -256,7 +258,8 @@ public TableSchema commitChanges(List changes) validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName()); if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { throw new Catalog.ColumnAlreadyExistException( - fromPath(branchPath(), true), rename.fieldName()); + identifierFromPath(tableRoot.toString(), true, branch), + rename.fieldName()); } updateNestedColumn( @@ -275,7 +278,8 @@ public TableSchema commitChanges(List changes) if (!newFields.removeIf( f -> f.name().equals(((DropColumn) change).fieldName()))) { throw new Catalog.ColumnNotExistException( - fromPath(branchPath(), true), drop.fieldName()); + identifierFromPath(tableRoot.toString(), true, branch), + drop.fieldName()); } if (newFields.isEmpty()) { throw new IllegalArgumentException("Cannot drop all fields in table"); @@ -511,7 +515,8 @@ private void updateNestedColumn( } if (!found) { throw new Catalog.ColumnNotExistException( - fromPath(branchPath(), true), Arrays.toString(updateFieldNames)); + identifierFromPath(tableRoot.toString(), true, branch), + Arrays.toString(updateFieldNames)); } } @@ -587,13 +592,22 @@ public static void checkAlterTablePath(String key) { } } - public static Identifier fromPath(String tablePath, boolean ignoreIfUnknownDatabase) { + public static Identifier identifierFromPath(String tablePath, boolean ignoreIfUnknownDatabase) { + return identifierFromPath(tablePath, ignoreIfUnknownDatabase, null); + } + + public static Identifier identifierFromPath( + String tablePath, boolean ignoreIfUnknownDatabase, @Nullable String branchName) { + if (DEFAULT_MAIN_BRANCH.equals(branchName)) { + branchName = null; + } + String[] paths = tablePath.split("/"); if (paths.length < 2) { if (!ignoreIfUnknownDatabase) { throw new IllegalArgumentException( String.format( - "Path '%s' is not a legacy path, please use catalog table path instead: 'warehouse_path/your_database.db/your_table'.", + "Path '%s' is not a valid path, please use catalog table path instead: 'warehouse_path/your_database.db/your_table'.", tablePath)); } return new Identifier(UNKNOWN_DATABASE, paths[0]); @@ -605,12 +619,13 @@ public static Identifier fromPath(String tablePath, boolean ignoreIfUnknownDatab if (!ignoreIfUnknownDatabase) { throw new IllegalArgumentException( String.format( - "Path '%s' is not a legacy path, please use catalog table path instead: 'warehouse_path/your_database.db/your_table'.", + "Path '%s' is not a valid path, please use catalog table path instead: 'warehouse_path/your_database.db/your_table'.", tablePath)); } - return new Identifier(UNKNOWN_DATABASE, paths[paths.length - 1]); + return new Identifier(UNKNOWN_DATABASE, paths[paths.length - 1], branchName, null); } database = database.substring(0, index); - return new Identifier(database, paths[paths.length - 1]); + + return new Identifier(database, paths[paths.length - 1], branchName, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d30fd73081a1..bad718a04585 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -116,16 +116,22 @@ public OptionalLong latestSnapshotId() { @Override public String name() { - Identifier identifier = catalogEnvironment.identifier(); - return identifier == null ? location().getName() : identifier.getObjectName(); + return identifier().getObjectName(); } @Override public String fullName() { + return identifier().getFullName(); + } + + public Identifier identifier() { Identifier identifier = catalogEnvironment.identifier(); return identifier == null - ? SchemaManager.fromPath(location().toUri().toString(), true).getFullName() - : identifier.getFullName(); + ? SchemaManager.identifierFromPath( + location().toUri().toString(), + true, + options().get(CoreOptions.BRANCH.key())) + : identifier; } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 19c9e83c45bc..dd0d180506e5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -239,7 +239,7 @@ public void testCreateTable() throws Exception { DEFAULT_TABLE_SCHEMA, false)) .withMessage( - "Cannot 'createTable' for system table 'Identifier{database='test_db', table='$system_table'}', please use data table."); + "Cannot 'createTable' for system table 'Identifier{database='test_db', object='$system_table'}', please use data table."); // Create table throws DatabaseNotExistException when database does not exist assertThatExceptionOfType(Catalog.DatabaseNotExistException.class) @@ -367,7 +367,7 @@ public void testDropTable() throws Exception { catalog.dropTable( Identifier.create("test_db", "$system_table"), false)) .withMessage( - "Cannot 'dropTable' for system table 'Identifier{database='test_db', table='$system_table'}', please use data table."); + "Cannot 'dropTable' for system table 'Identifier{database='test_db', object='$system_table'}', please use data table."); // Drop table throws TableNotExistException when table does not exist and ignoreIfNotExists // is false @@ -402,7 +402,7 @@ public void testRenameTable() throws Exception { toTable, false)) .withMessage( - "Cannot 'renameTable' for system table 'Identifier{database='test_db', table='$system_table'}', please use data table."); + "Cannot 'renameTable' for system table 'Identifier{database='test_db', object='$system_table'}', please use data table."); assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy( @@ -412,7 +412,7 @@ public void testRenameTable() throws Exception { Identifier.create("test_db", "$system_table"), false)) .withMessage( - "Cannot 'renameTable' for system table 'Identifier{database='test_db', table='$system_table'}', please use data table."); + "Cannot 'renameTable' for system table 'Identifier{database='test_db', object='$system_table'}', please use data table."); // Rename table throws TableNotExistException when table does not exist assertThatExceptionOfType(Catalog.TableNotExistException.class) @@ -466,7 +466,7 @@ public void testAlterTable() throws Exception { SchemaChange.addColumn("col2", DataTypes.DATE())), false)) .withMessage( - "Cannot 'alterTable' for system table 'Identifier{database='test_db', table='$system_table'}', please use data table."); + "Cannot 'alterTable' for system table 'Identifier{database='test_db', object='$system_table'}', please use data table."); // Alter table throws TableNotExistException when table does not exist assertThatExceptionOfType(Catalog.TableNotExistException.class) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 4db178eab84a..8771377f2c02 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -76,7 +76,7 @@ public class SchemaEvolutionTest { @BeforeEach public void beforeEach() { tablePath = new Path(tempDir.toUri()); - identifier = SchemaManager.fromPath(tablePath.toString(), true); + identifier = SchemaManager.identifierFromPath(tablePath.toString(), true); schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); commitUser = UUID.randomUUID().toString(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 80ca03d8ce95..d005dc4e50a8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -32,7 +32,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT cases for table with branches using SQL. */ public class BranchSqlITCase extends CatalogITCaseBase { @@ -378,13 +378,8 @@ public void testDifferentRowTypes() throws Exception { sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)"); sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )"); - try { - sql("INSERT INTO t VALUES (1, 10, 'apple')"); - fail("Expecting exceptions"); - } catch (Exception e) { - assertThat(e) - .hasMessageContaining("Branch main and pk does not have the same row type"); - } + assertThatThrownBy(() -> sql("INSERT INTO t VALUES (1, 10, 'apple')")) + .hasMessageContaining("Branch main and pk does not have the same row type"); } private List collectResult(String sql) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index cb39ae3c0f47..199f26ad17a7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -172,11 +172,10 @@ public void testCreateSystemTable() { assertThatThrownBy(() -> sql("CREATE TABLE T$snapshots (a INT, b INT)")) .hasRootCauseMessage( "Cannot 'createTable' for system table " - + "'Identifier{database='default', table='T$snapshots'}', please use data table."); + + "'Identifier{database='default', object='T$snapshots'}', please use data table."); assertThatThrownBy(() -> sql("CREATE TABLE T$aa$bb (a INT, b INT)")) .hasRootCauseMessage( - "Cannot 'createTable' for system table " - + "'Identifier{database='default', table='T$aa$bb'}', please use data table."); + "System table can only contain one '$' separator, but this is: T$aa$bb"); } @Test diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index e9fc9f32cc2c..667311ad42c3 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -185,7 +185,7 @@ public Optional metastoreClientFactory(Identifier ident public Path getDataTableLocation(Identifier identifier) { try { String databaseName = identifier.getDatabaseName(); - String tableName = identifier.getObjectName(); + String tableName = identifier.getTableName(); Optional tablePath = clients.run( client -> { @@ -363,7 +363,7 @@ public boolean tableExists(Identifier identifier) { client -> client.getTable( identifier.getDatabaseName(), - identifier.getObjectName())); + identifier.getTableName())); } catch (NoSuchObjectException e) { return false; } catch (TException e) { @@ -376,7 +376,11 @@ public boolean tableExists(Identifier identifier) { "Interrupted in call to tableExists " + identifier.getFullName(), e); } - return isPaimonTable(table); + return isPaimonTable(table) + && tableSchemaInFileSystem( + getDataTableLocation(identifier), + identifier.getBranchNameOrDefault()) + .isPresent(); } private static boolean isPaimonTable(Table table) { @@ -387,19 +391,15 @@ private static boolean isPaimonTable(Table table) { } @Override - public TableSchema getDataTableSchema(Identifier identifier, String branchName) - throws TableNotExistException { - assertMainBranch(branchName); - return getDataTableSchema(identifier); - } + public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + assertMainBranch(identifier); - private TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { if (!tableExists(identifier)) { throw new TableNotExistException(identifier); } - Path tableLocation = getDataTableLocation(identifier); - return tableSchemaInFileSystem(tableLocation) + return tableSchemaInFileSystem( + getDataTableLocation(identifier), identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); } @@ -418,7 +418,7 @@ protected void dropTableImpl(Identifier identifier) { client -> client.dropTable( identifier.getDatabaseName(), - identifier.getObjectName(), + identifier.getTableName(), true, false, true)); @@ -490,10 +490,10 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema) { protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { String fromDB = fromTable.getDatabaseName(); - String fromTableName = fromTable.getObjectName(); + String fromTableName = fromTable.getTableName(); Table table = clients.run(client -> client.getTable(fromDB, fromTableName)); table.setDbName(toTable.getDatabaseName()); - table.setTableName(toTable.getObjectName()); + table.setTableName(toTable.getTableName()); clients.execute(client -> client.alter_table(fromDB, fromTableName, table)); Path fromPath = getDataTableLocation(fromTable); @@ -516,7 +516,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { clients.execute( client -> client.alter_table( - toTable.getDatabaseName(), toTable.getObjectName(), table)); + toTable.getDatabaseName(), toTable.getTableName(), table)); } } catch (TException e) { throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); @@ -527,10 +527,9 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { } @Override - protected void alterTableImpl( - Identifier identifier, String branchName, List changes) + protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - assertMainBranch(branchName); + assertMainBranch(identifier); final SchemaManager schemaManager = schemaManager(identifier); // first commit changes to underlying files @@ -542,7 +541,7 @@ protected void alterTableImpl( client -> client.getTable( identifier.getDatabaseName(), - identifier.getObjectName())); + identifier.getTableName())); alterTableToHms(table, identifier, schema); } catch (Exception te) { schemaManager.deleteSchema(schema.id()); @@ -558,7 +557,7 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new client -> client.alter_table( identifier.getDatabaseName(), - identifier.getObjectName(), + identifier.getTableName(), table, true)); } @@ -615,7 +614,9 @@ public void repairTable(Identifier identifier) throws TableNotExistException { validateIdentifierNameCaseInsensitive(identifier); TableSchema tableSchema = - tableSchemaInFileSystem(getDataTableLocation(identifier)) + tableSchemaInFileSystem( + getDataTableLocation(identifier), + identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); Table newTable = createHiveTable(identifier, tableSchema); try { @@ -625,7 +626,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException { client -> client.getTable( identifier.getDatabaseName(), - identifier.getObjectName())); + identifier.getTableName())); checkArgument( isPaimonTable(table), "Table %s is not a paimon table in hive metastore.", @@ -673,7 +674,7 @@ private Table newHmsTable(Identifier identifier, Map tableParame TableType.class); Table table = new Table( - identifier.getObjectName(), + identifier.getTableName(), identifier.getDatabaseName(), // current linux user System.getProperty("user.name"),