From fa68a1e91ee77aeef0c0b99160cc093e56d2405a Mon Sep 17 00:00:00 2001 From: tsreaper Date: Fri, 13 Oct 2023 09:53:07 +0800 Subject: [PATCH] [core] Move common code from FileSystemCatalog and HiveCatalog into AbstractCatalog (#2124) --- .../paimon/catalog/AbstractCatalog.java | 155 ++++++++++++++++- .../paimon/catalog/FileSystemCatalog.java | 116 +++---------- .../org/apache/paimon/hive/HiveCatalog.java | 157 +++++------------- 3 files changed, 213 insertions(+), 215 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 29cd157470b1..0c12b6131a94 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -26,6 +26,8 @@ import org.apache.paimon.lineage.LineageMetaFactory; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; @@ -84,6 +86,149 @@ protected AbstractCatalog(FileIO fileIO, Map options) { options.get(key))); } + @Override + public boolean databaseExists(String databaseName) { + if (isSystemDatabase(databaseName)) { + return true; + } + + return databaseExistsImpl(databaseName); + } + + protected abstract boolean databaseExistsImpl(String databaseName); + + @Override + public void createDatabase(String name, boolean ignoreIfExists) + throws DatabaseAlreadyExistException { + if (isSystemDatabase(name)) { + throw new ProcessSystemDatabaseException(); + } + if (databaseExists(name)) { + if (ignoreIfExists) { + return; + } + throw new DatabaseAlreadyExistException(name); + } + + createDatabaseImpl(name); + } + + protected abstract void createDatabaseImpl(String name); + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException { + if (isSystemDatabase(name)) { + throw new ProcessSystemDatabaseException(); + } + if (!databaseExists(name)) { + if (ignoreIfNotExists) { + return; + } + throw new DatabaseNotExistException(name); + } + + if (!cascade && listTables(name).size() > 0) { + throw new DatabaseNotEmptyException(name); + } + + dropDatabaseImpl(name); + } + + protected abstract void dropDatabaseImpl(String name); + + @Override + public List listTables(String databaseName) throws DatabaseNotExistException { + if (isSystemDatabase(databaseName)) { + return GLOBAL_TABLES; + } + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(databaseName); + } + + return listTablesImpl(databaseName); + } + + protected abstract List listTablesImpl(String databaseName); + + @Override + public void dropTable(Identifier identifier, boolean ignoreIfNotExists) + throws TableNotExistException { + checkNotSystemTable(identifier, "dropTable"); + if (!tableExists(identifier)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(identifier); + } + + dropTableImpl(identifier); + } + + protected abstract void dropTableImpl(Identifier identifier); + + @Override + public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + checkNotSystemTable(identifier, "createTable"); + if (!databaseExists(identifier.getDatabaseName())) { + throw new DatabaseNotExistException(identifier.getDatabaseName()); + } + + if (tableExists(identifier)) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(identifier); + } + + copyTableDefaultOptions(schema.options()); + + createTableImpl(identifier, schema); + } + + protected abstract void createTableImpl(Identifier identifier, Schema schema); + + @Override + public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException { + checkNotSystemTable(fromTable, "renameTable"); + checkNotSystemTable(toTable, "renameTable"); + + if (!tableExists(fromTable)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(fromTable); + } + + if (tableExists(toTable)) { + throw new TableAlreadyExistException(toTable); + } + + renameTableImpl(fromTable, toTable); + } + + protected abstract void renameTableImpl(Identifier fromTable, Identifier toTable); + + @Override + public void alterTable( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + checkNotSystemTable(identifier, "alterTable"); + if (!tableExists(identifier)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(identifier); + } + + alterTableImpl(identifier, changes); + } + + protected abstract void alterTableImpl(Identifier identifier, List changes) + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException; + @Nullable private LineageMeta findAndCreateLineageMeta(Options options, ClassLoader classLoader) { return options.getOptional(LINEAGE_META) @@ -175,8 +320,12 @@ private static boolean isSpecifiedSystemTable(Identifier identifier) { return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER); } - protected void checkNotSystemTable(Identifier identifier, String method) { - if (isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier)) { + protected boolean isSystemTable(Identifier identifier) { + return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier); + } + + private void checkNotSystemTable(Identifier identifier, String method) { + if (isSystemTable(identifier)) { throw new IllegalArgumentException( String.format( "Cannot '%s' for system table '%s', please use data table.", @@ -213,7 +362,7 @@ public static Path databasePath(String warehouse, String database) { return new Path(warehouse, database + DB_SUFFIX); } - protected boolean isSystemDatabase(String database) { + private boolean isSystemDatabase(String database) { return SYSTEM_DATABASE_NAME.equals(database); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index a9dfb6e290eb..e2e88ce0c212 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -65,58 +65,22 @@ public List listDatabases() { } @Override - public boolean databaseExists(String databaseName) { - if (isSystemDatabase(databaseName)) { - return true; - } + protected boolean databaseExistsImpl(String databaseName) { return uncheck(() -> fileIO.exists(databasePath(databaseName))); } @Override - public void createDatabase(String name, boolean ignoreIfExists) - throws DatabaseAlreadyExistException { - if (isSystemDatabase(name)) { - throw new ProcessSystemDatabaseException(); - } - if (databaseExists(name)) { - if (ignoreIfExists) { - return; - } - throw new DatabaseAlreadyExistException(name); - } + protected void createDatabaseImpl(String name) { uncheck(() -> fileIO.mkdirs(databasePath(name))); } @Override - public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) - throws DatabaseNotExistException, DatabaseNotEmptyException { - if (isSystemDatabase(name)) { - throw new ProcessSystemDatabaseException(); - } - if (!databaseExists(name)) { - if (ignoreIfNotExists) { - return; - } - - throw new DatabaseNotExistException(name); - } - - if (!cascade && listTables(name).size() > 0) { - throw new DatabaseNotEmptyException(name); - } - + protected void dropDatabaseImpl(String name) { uncheck(() -> fileIO.delete(databasePath(name), true)); } @Override - public List listTables(String databaseName) throws DatabaseNotExistException { - if (isSystemDatabase(databaseName)) { - return GLOBAL_TABLES; - } - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(databaseName); - } - + protected List listTablesImpl(String databaseName) { List tables = new ArrayList<>(); for (FileStatus status : uncheck(() -> fileIO.listStatus(databasePath(databaseName)))) { if (status.isDir() && tableExists(status.getPath())) { @@ -127,11 +91,12 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep } @Override - public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { - Path path = getDataTableLocation(identifier); - return new SchemaManager(fileIO, path) - .latest() - .orElseThrow(() -> new TableNotExistException(identifier)); + public boolean tableExists(Identifier identifier) { + if (isSystemTable(identifier)) { + return super.tableExists(identifier); + } + + return tableExists(getDataTableLocation(identifier)); } private boolean tableExists(Path tablePath) { @@ -139,74 +104,35 @@ private boolean tableExists(Path tablePath) { } @Override - public void dropTable(Identifier identifier, boolean ignoreIfNotExists) - throws TableNotExistException { - checkNotSystemTable(identifier, "dropTable"); + public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { Path path = getDataTableLocation(identifier); - if (!tableExists(path)) { - if (ignoreIfNotExists) { - return; - } - - throw new TableNotExistException(identifier); - } + return new SchemaManager(fileIO, path) + .latest() + .orElseThrow(() -> new TableNotExistException(identifier)); + } + @Override + protected void dropTableImpl(Identifier identifier) { + Path path = getDataTableLocation(identifier); uncheck(() -> fileIO.delete(path, true)); } @Override - public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) - throws TableAlreadyExistException, DatabaseNotExistException { - checkNotSystemTable(identifier, "createTable"); - if (!databaseExists(identifier.getDatabaseName())) { - throw new DatabaseNotExistException(identifier.getDatabaseName()); - } - + public void createTableImpl(Identifier identifier, Schema schema) { Path path = getDataTableLocation(identifier); - if (tableExists(path)) { - if (ignoreIfExists) { - return; - } - - throw new TableAlreadyExistException(identifier); - } - - copyTableDefaultOptions(schema.options()); - uncheck(() -> new SchemaManager(fileIO, path).createTable(schema)); } @Override - public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException { - checkNotSystemTable(fromTable, "renameTable"); - checkNotSystemTable(toTable, "renameTable"); + public void renameTableImpl(Identifier fromTable, Identifier toTable) { Path fromPath = getDataTableLocation(fromTable); - if (!tableExists(fromPath)) { - if (ignoreIfNotExists) { - return; - } - - throw new TableNotExistException(fromTable); - } - Path toPath = getDataTableLocation(toTable); - if (tableExists(toPath)) { - throw new TableAlreadyExistException(toTable); - } - uncheck(() -> fileIO.rename(fromPath, toPath)); } @Override - public void alterTable( - Identifier identifier, List changes, boolean ignoreIfNotExists) + protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - checkNotSystemTable(identifier, "alterTable"); - if (!tableExists(getDataTableLocation(identifier))) { - throw new TableNotExistException(identifier); - } - new SchemaManager(fileIO, getDataTableLocation(identifier)).commitChanges(changes); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 8893960eaf57..59aaebe65b89 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -42,14 +42,12 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -171,10 +169,7 @@ public List listDatabases() { } @Override - public boolean databaseExists(String databaseName) { - if (isSystemDatabase(databaseName)) { - return true; - } + protected boolean databaseExistsImpl(String databaseName) { try { client.getDatabase(databaseName); return true; @@ -187,93 +182,81 @@ public boolean databaseExists(String databaseName) { } @Override - public void createDatabase(String name, boolean ignoreIfExists) - throws DatabaseAlreadyExistException { - if (isSystemDatabase(name)) { - throw new ProcessSystemDatabaseException(); - } + protected void createDatabaseImpl(String name) { try { client.createDatabase(convertToDatabase(name)); - locationHelper.createPathIfRequired(databasePath(name), fileIO); - } catch (AlreadyExistsException e) { - if (!ignoreIfExists) { - throw new DatabaseAlreadyExistException(name, e); - } } catch (TException | IOException e) { throw new RuntimeException("Failed to create database " + name, e); } } @Override - public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) - throws DatabaseNotExistException, DatabaseNotEmptyException { - if (isSystemDatabase(name)) { - throw new ProcessSystemDatabaseException(); - } + protected void dropDatabaseImpl(String name) { try { - if (!cascade && client.getAllTables(name).size() > 0) { - throw new DatabaseNotEmptyException(name); - } - locationHelper.dropPathIfRequired(databasePath(name), fileIO); client.dropDatabase(name, true, false, true); - } catch (NoSuchObjectException | UnknownDBException e) { - if (!ignoreIfNotExists) { - throw new DatabaseNotExistException(name, e); - } } catch (TException | IOException e) { throw new RuntimeException("Failed to drop database " + name, e); } } @Override - public List listTables(String databaseName) throws DatabaseNotExistException { - if (isSystemDatabase(databaseName)) { - return GLOBAL_TABLES; - } + protected List listTablesImpl(String databaseName) { try { return client.getAllTables(databaseName).stream() .filter( tableName -> { Identifier identifier = new Identifier(databaseName, tableName); // the environment here may not be able to access non-paimon - // tables. - // so we just check the schema file first - return schemaFileExists(identifier) - && paimonTableExists(identifier); + // tables, so we just check the schema file first + return schemaFileExists(identifier) && tableExists(identifier); }) .collect(Collectors.toList()); - } catch (UnknownDBException e) { - throw new DatabaseNotExistException(databaseName, e); } catch (TException e) { throw new RuntimeException("Failed to list all tables in database " + databaseName, e); } } + @Override + public boolean tableExists(Identifier identifier) { + if (isSystemTable(identifier)) { + return super.tableExists(identifier); + } + + Table table; + try { + table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); + } catch (NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException( + "Cannot determine if table " + identifier.getFullName() + " is a paimon table.", + e); + } + + return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table); + } + + private static boolean isPaimonTable(Table table) { + return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) + && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); + } + @Override public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { - if (!paimonTableExists(identifier)) { + if (!tableExists(identifier)) { throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); return new SchemaManager(fileIO, tableLocation) .latest() - .orElseThrow(() -> new RuntimeException("There is no paimond in " + tableLocation)); + .orElseThrow( + () -> new RuntimeException("There is no paimon table in " + tableLocation)); } @Override - public void dropTable(Identifier identifier, boolean ignoreIfNotExists) - throws TableNotExistException { - checkNotSystemTable(identifier, "dropTable"); - if (!paimonTableExists(identifier)) { - if (ignoreIfNotExists) { - return; - } else { - throw new TableNotExistException(identifier); - } - } - + protected void dropTableImpl(Identifier identifier) { try { client.dropTable( identifier.getDatabaseName(), identifier.getObjectName(), true, false, true); @@ -294,27 +277,11 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists) } @Override - public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) - throws TableAlreadyExistException, DatabaseNotExistException { - checkNotSystemTable(identifier, "createTable"); - String databaseName = identifier.getDatabaseName(); - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(databaseName); - } - if (paimonTableExists(identifier)) { - if (ignoreIfExists) { - return; - } else { - throw new TableAlreadyExistException(identifier); - } - } - + protected void createTableImpl(Identifier identifier, Schema schema) { checkFieldNamesUpperCase(schema.rowType().getFieldNames()); + // first commit changes to underlying files // if changes on Hive fails there is no harm to perform the same changes to files again - - copyTableDefaultOptions(schema.options()); - TableSchema tableSchema; try { tableSchema = schemaManager(identifier).createTable(schema); @@ -325,6 +292,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx + " to underlying files.", e); } + Table table = newHmsTable( identifier, @@ -351,22 +319,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx } @Override - public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException { - checkNotSystemTable(fromTable, "renameTable"); - checkNotSystemTable(toTable, "renameTable"); - if (!paimonTableExists(fromTable)) { - if (ignoreIfNotExists) { - return; - } else { - throw new TableNotExistException(fromTable); - } - } - - if (paimonTableExists(toTable)) { - throw new TableAlreadyExistException(toTable); - } - + protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { checkIdentifierUpperCase(toTable); String fromDB = fromTable.getDatabaseName(); @@ -401,18 +354,8 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore } @Override - public void alterTable( - Identifier identifier, List changes, boolean ignoreIfNotExists) + protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - checkNotSystemTable(identifier, "alterTable"); - if (!paimonTableExists(identifier)) { - if (ignoreIfNotExists) { - return; - } else { - throw new TableNotExistException(identifier); - } - } - checkFieldNamesUpperCaseInSchemaChange(changes); final SchemaManager schemaManager = schemaManager(identifier); @@ -579,26 +522,6 @@ private boolean schemaFileExists(Identifier identifier) { return new SchemaManager(fileIO, getDataTableLocation(identifier)).latest().isPresent(); } - private boolean paimonTableExists(Identifier identifier) { - Table table; - try { - table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); - } catch (NoSuchObjectException e) { - return false; - } catch (TException e) { - throw new RuntimeException( - "Cannot determine if table " + identifier.getFullName() + " is a paimon table.", - e); - } - - return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table); - } - - private static boolean isPaimonTable(Table table) { - return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) - && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); - } - private SchemaManager schemaManager(Identifier identifier) { checkIdentifierUpperCase(identifier); return new SchemaManager(fileIO, getDataTableLocation(identifier))