diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 63428635e0cd..f93198fb8f96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -42,6 +42,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -50,7 +51,6 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.stream.Collectors; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; @@ -127,17 +127,6 @@ protected boolean lockEnabled() { return catalogOptions.get(LOCK_ENABLED); } - protected List listDatabases(Path warehouse) { - List databases = new ArrayList<>(); - for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) { - Path path = status.getPath(); - if (status.isDir() && isDatabase(path)) { - databases.add(database(path)); - } - } - return databases; - } - @Override public boolean databaseExists(String databaseName) { if (isSystemDatabase(databaseName)) { @@ -220,22 +209,8 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep return listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList()); } - protected List listTablesImpl(Path databasePath) { - List tables = new ArrayList<>(); - for (FileStatus status : uncheck(() -> fileIO.listDirectories(databasePath))) { - if (status.isDir() && tableExists(status.getPath())) { - tables.add(status.getPath().getName()); - } - } - return tables; - } - protected abstract List listTablesImpl(String databaseName); - protected boolean tableExists(Path tablePath) { - return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty(); - } - @Override public void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException { @@ -525,20 +500,35 @@ private void validateAutoCreateClose(Map options) { CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } - private static boolean isDatabase(Path path) { - return path.getName().endsWith(DB_SUFFIX); - } + // =============================== Meta in File System ===================================== - private static String database(Path path) { - String name = path.getName(); - return name.substring(0, name.length() - DB_SUFFIX.length()); + protected List listDatabasesInFileSystem(Path warehouse) throws IOException { + List databases = new ArrayList<>(); + for (FileStatus status : fileIO.listDirectories(warehouse)) { + Path path = status.getPath(); + if (status.isDir() && path.getName().endsWith(DB_SUFFIX)) { + String fileName = path.getName(); + databases.add(fileName.substring(0, fileName.length() - DB_SUFFIX.length())); + } + } + return databases; } - protected static T uncheck(Callable callable) { - try { - return callable.call(); - } catch (Exception e) { - throw new RuntimeException(e); + protected List listTablesInFileSystem(Path databasePath) throws IOException { + List tables = new ArrayList<>(); + for (FileStatus status : fileIO.listDirectories(databasePath)) { + if (status.isDir() && tableExistsInFileSystem(status.getPath())) { + tables.add(status.getPath().getName()); + } } + return tables; + } + + protected boolean tableExistsInFileSystem(Path tablePath) { + return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty(); + } + + public Optional tableSchemaInFileSystem(Path tablePath) { + return new SchemaManager(fileIO, tablePath).latest(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 60e688bf8346..f0e5572e3238 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE; @@ -55,7 +56,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) { @Override public List listDatabases() { - return listDatabases(warehouse); + return uncheck(() -> listDatabasesInFileSystem(warehouse)); } @Override @@ -89,7 +90,7 @@ protected void dropDatabaseImpl(String name) { @Override protected List listTablesImpl(String databaseName) { - return listTablesImpl(newDatabasePath(databaseName)); + return uncheck(() -> listTablesInFileSystem(newDatabasePath(databaseName))); } @Override @@ -98,7 +99,7 @@ public boolean tableExists(Identifier identifier) { return super.tableExists(identifier); } - return tableExists(getDataTableLocation(identifier)); + return tableExistsInFileSystem(getDataTableLocation(identifier)); } @Override @@ -149,6 +150,14 @@ protected void alterTableImpl(Identifier identifier, List changes) schemaManager(identifier).commitChanges(changes); } + protected static T uncheck(Callable callable) { + try { + return callable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public void close() throws Exception {} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 77a3b40afae4..25cb8784d407 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -41,16 +41,9 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.CatalogEnvironment; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableType; -import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.RowDataPartitionComputer; import org.apache.flink.table.hive.LegacyHiveClasses; import org.apache.hadoop.conf.Configuration; @@ -80,13 +73,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -99,7 +90,6 @@ import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; -import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; @@ -333,12 +323,14 @@ public boolean tableExists(Identifier identifier) { e); } - return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table); + return isPaimonTable(table); } private static boolean isPaimonTable(Table table) { - return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) - && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); + boolean isPaimonTable = + INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) + && OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat()); + return isPaimonTable || LegacyHiveClasses.isPaimonTable(table); } @Override @@ -347,14 +339,8 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); - return getDataTableSchema(tableLocation); - } - - private TableSchema getDataTableSchema(Path tableLocation) { - return new SchemaManager(fileIO, tableLocation) - .latest() - .orElseThrow( - () -> new RuntimeException("There is no paimon table in " + tableLocation)); + return tableSchemaInFileSystem(tableLocation) + .orElseThrow(() -> new TableNotExistException(identifier)); } private boolean usingExternalTable() { @@ -408,13 +394,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) { e); } - Table table = - newHmsTable( - identifier, - convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); try { - updateHmsTable(table, identifier, tableSchema); - client.createTable(table); + client.createTable(createHiveTable(identifier, tableSchema)); } catch (Exception e) { Path path = getDataTableLocation(identifier); try { @@ -426,6 +407,15 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } } + private Table createHiveTable(Identifier identifier, TableSchema tableSchema) { + Table table = + newHmsTable( + identifier, + convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); + updateHmsTable(table, identifier, tableSchema); + return table; + } + @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { @@ -469,18 +459,21 @@ protected void alterTableImpl(Identifier identifier, List changes) TableSchema schema = schemaManager.commitChanges(changes); try { - // sync to hive hms Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); - updateHmsTablePars(table, schema); - updateHmsTable(table, identifier, schema); - client.alter_table( - identifier.getDatabaseName(), identifier.getObjectName(), table, true); + alterTableToHms(table, identifier, schema); } catch (Exception te) { schemaManager.deleteSchema(schema.id()); throw new RuntimeException(te); } } + private void alterTableToHms(Table table, Identifier identifier, TableSchema newSchema) + throws TException { + updateHmsTablePars(table, newSchema); + updateHmsTable(table, identifier, newSchema); + client.alter_table(identifier.getDatabaseName(), identifier.getObjectName(), table, true); + } + @Override public boolean caseSensitive() { return false; @@ -488,7 +481,12 @@ public boolean caseSensitive() { @Override public void repairCatalog() { - List databases = listDatabases(new Path(warehouse)); + List databases = null; + try { + databases = listDatabasesInFileSystem(new Path(warehouse)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } for (String database : databases) { repairDatabase(database); } @@ -496,45 +494,31 @@ public void repairCatalog() { @Override public void repairDatabase(String databaseName) { - Path databasePath = repairHmsDatabase(databaseName); - List tables = listTablesImpl(databasePath); - CompletableFuture allOf = - CompletableFuture.allOf( - tables.stream() - .map(table -> Identifier.create(databaseName, table)) - .map( - identifier -> - CompletableFuture.runAsync( - () -> { - try { - repairTable(identifier); - } catch (TableNotExistException e) { - LOG.error( - "Table {} does not exist in the paimon.", - identifier.getFullName()); - // ignore - } - }, - COMMON_IO_FORK_JOIN_POOL)) - .toArray(CompletableFuture[]::new)); - allOf.join(); - } - - private Path repairHmsDatabase(String databaseName) { checkNotSystemDatabase(databaseName); + + // create database if needed if (!databaseExistsImpl(databaseName)) { createDatabaseImpl(databaseName, Collections.emptyMap()); } + // tables from file system + List tables; try { - Database database = client.getDatabase(databaseName); - return new Path(locationHelper.getDatabaseLocation(database)); + tables = + listTablesInFileSystem( + new Path( + locationHelper.getDatabaseLocation( + client.getDatabase(databaseName)))); } catch (Exception e) { - throw new RuntimeException( - "Failed to determine if database " - + databaseName - + " exists in hive metastore.", - e); + throw new RuntimeException(e); + } + + // repair tables + for (String table : tables) { + try { + repairTable(Identifier.create(databaseName, table)); + } catch (TableNotExistException ignore) { + } } } @@ -543,79 +527,43 @@ public void repairTable(Identifier identifier) throws TableNotExistException { checkNotSystemTable(identifier, "repairTable"); validateIdentifierNameCaseInsensitive(identifier); - // Get paimon table from file system. - Path paimonTableLocation = getDataTableLocation(identifier); - TableSchema tableSchema = getDataTableSchema(paimonTableLocation); - validateFieldNameCaseInsensitive(tableSchema.fieldNames()); - FileStoreTable paimonTable = - FileStoreTableFactory.create( - fileIO, - paimonTableLocation, - tableSchema, - new CatalogEnvironment( - Lock.factory( - lockFactory().orElse(null), - lockContext().orElse(null), - identifier), - super.metastoreClientFactory(identifier).orElse(null), - lineageMetaFactory)); - + TableSchema tableSchema = + tableSchemaInFileSystem(getDataTableLocation(identifier)) + .orElseThrow(() -> new TableNotExistException(identifier)); + Table newTable = createHiveTable(identifier, tableSchema); try { try { Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName()); checkArgument( - isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table), - String.format( - "Table %s is not a paimon table in hive metastore.", - identifier.getFullName())); - updateHmsTablePars(table, tableSchema); - updateHmsTable(table, identifier, tableSchema); - client.alter_table( - identifier.getDatabaseName(), identifier.getObjectName(), table, true); + isPaimonTable(table), + "Table %s is not a paimon table in hive metastore.", + identifier.getFullName()); + if (!newTable.getSd().getCols().equals(table.getSd().getCols())) { + alterTableToHms(table, identifier, tableSchema); + } } catch (NoSuchObjectException e) { // hive table does not exist. - HashMap newOptions = new HashMap<>(paimonTable.options()); - copyTableDefaultOptions(newOptions); - tableSchema = paimonTable.schema().copy(newOptions); - Table table = - newHmsTable( - identifier, - convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX)); - updateHmsTable(table, identifier, tableSchema); - client.createTable(table); + client.createTable(newTable); } // repair partitions - repairPartition(paimonTable, identifier, tableSchema); + if (!tableSchema.partitionKeys().isEmpty() && !newTable.getPartitionKeys().isEmpty()) { + // Do not close client, it is for HiveCatalog + @SuppressWarnings("resource") + HiveMetastoreClient metastoreClient = + new HiveMetastoreClient(identifier, tableSchema, client); + List partitions = + getTable(identifier).newReadBuilder().newScan().listPartitions(); + for (BinaryRow partition : partitions) { + metastoreClient.addPartition(partition); + } + } } catch (Exception e) { throw new RuntimeException(e); } } - private void repairPartition( - org.apache.paimon.table.Table paimonTable, Identifier identifier, TableSchema schema) - throws Exception { - if (!schema.partitionKeys().isEmpty()) { - MetastoreClient metastoreClient = metastoreClientFactory(identifier).get().create(); - - ReadBuilder readBuilder = paimonTable.newReadBuilder(); - List partitions = readBuilder.newScan().listPartitions(); - RowDataPartitionComputer partitionComputer = - FileStorePathFactory.getPartitionComputer( - schema.logicalPartitionType(), - new CoreOptions(schema.options()).partitionDefaultName()); - for (BinaryRow partition : partitions) { - LinkedHashMap partitionSpec = - partitionComputer.generatePartValues( - Preconditions.checkNotNull( - partition, - "Partition row data is null. This is unexpected.")); - metastoreClient.addPartition(partitionSpec); - } - } - } - @Override public void close() throws Exception { client.close(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 031b1848a01e..76b1f3cf2c96 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -45,7 +45,7 @@ public class HiveMetastoreClient implements MetastoreClient { private final IMetaStoreClient client; private final StorageDescriptor sd; - private HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client) + public HiveMetastoreClient(Identifier identifier, TableSchema schema, IMetaStoreClient client) throws Exception { this.identifier = identifier; this.partitionComputer =