diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 5b1acb59e1ec..a1a8bfe37a42 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -237,6 +237,24 @@ All available procedures are listed below.
CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', retain_min => 10)
CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', max_deletes => 10)
+
+
+ repair |
+
+ -- repair all databases and tables in catalog
+ CALL sys.repair()
+ -- repair all tables in a specific database
+ CALL sys.repair('databaseName')
+ -- repair a table
+ CALL sys.repair('databaseName.tableName')
+ |
+
+ Synchronize information from the file system to Metastore. Argument:
+ empty: all databases and tables in catalog.
+ databaseName : the target database name.
+ tableName: the target table identifier.
+ |
+ CALL sys.repair('test_db.T') |
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index be1551daf0c4..81d829583b63 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -124,5 +124,15 @@ This section introduce all available spark procedures about paimon.
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')
+
+ repair |
+
+ Synchronize information from the file system to Metastore. Argument:
+ database_or_table: empty or the target database name or the target table identifier.
+ |
+
+ CALL sys.repair('test_db.T')
+ |
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 605fd14f644d..63428635e0cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -22,6 +22,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.operation.FileStoreCommit;
@@ -29,6 +30,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
@@ -48,6 +50,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
@@ -124,6 +127,17 @@ protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}
+ protected List listDatabases(Path warehouse) {
+ List databases = new ArrayList<>();
+ for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
+ Path path = status.getPath();
+ if (status.isDir() && isDatabase(path)) {
+ databases.add(database(path));
+ }
+ }
+ return databases;
+ }
+
@Override
public boolean databaseExists(String databaseName) {
if (isSystemDatabase(databaseName)) {
@@ -138,9 +152,7 @@ public boolean databaseExists(String databaseName) {
@Override
public void createDatabase(String name, boolean ignoreIfExists, Map properties)
throws DatabaseAlreadyExistException {
- if (isSystemDatabase(name)) {
- throw new ProcessSystemDatabaseException();
- }
+ checkNotSystemDatabase(name);
if (databaseExists(name)) {
if (ignoreIfExists) {
return;
@@ -179,9 +191,7 @@ public void dropPartition(Identifier identifier, Map partitionSp
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
- if (isSystemDatabase(name)) {
- throw new ProcessSystemDatabaseException();
- }
+ checkNotSystemDatabase(name);
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
@@ -210,8 +220,22 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep
return listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
}
+ protected List listTablesImpl(Path databasePath) {
+ List tables = new ArrayList<>();
+ for (FileStatus status : uncheck(() -> fileIO.listDirectories(databasePath))) {
+ if (status.isDir() && tableExists(status.getPath())) {
+ tables.add(status.getPath().getName());
+ }
+ }
+ return tables;
+ }
+
protected abstract List listTablesImpl(String databaseName);
+ protected boolean tableExists(Path tablePath) {
+ return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
+ }
+
@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
@@ -396,7 +420,7 @@ protected boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}
- private void checkNotSystemTable(Identifier identifier, String method) {
+ protected void checkNotSystemTable(Identifier identifier, String method) {
if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
@@ -439,6 +463,13 @@ private boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}
+ /** Validate database cannot be a system database. */
+ protected void checkNotSystemDatabase(String database) {
+ if (isSystemDatabase(database)) {
+ throw new ProcessSystemDatabaseException();
+ }
+ }
+
/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, String... names) {
@@ -460,7 +491,7 @@ public static void validateCaseInsensitive(
type, illegalNames));
}
- private void validateIdentifierNameCaseInsensitive(Identifier identifier) {
+ protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
validateCaseInsensitive(caseSensitive(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
}
@@ -479,7 +510,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c
validateFieldNameCaseInsensitive(fieldNames);
}
- private void validateFieldNameCaseInsensitive(List fieldNames) {
+ protected void validateFieldNameCaseInsensitive(List fieldNames) {
validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
}
@@ -493,4 +524,21 @@ private void validateAutoCreateClose(Map options) {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}
+
+ private static boolean isDatabase(Path path) {
+ return path.getName().endsWith(DB_SUFFIX);
+ }
+
+ private static String database(Path path) {
+ String name = path.getName();
+ return name.substring(0, name.length() - DB_SUFFIX.length());
+ }
+
+ protected static T uncheck(Callable callable) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 4f63414171f5..96e85d20a009 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -258,6 +258,18 @@ default boolean caseSensitive() {
return true;
}
+ default void repairCatalog() {
+ throw new UnsupportedOperationException();
+ }
+
+ default void repairDatabase(String databaseName) {
+ throw new UnsupportedOperationException();
+ }
+
+ default void repairTable(Identifier identifier) throws TableNotExistException {
+ throw new UnsupportedOperationException();
+ }
+
/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 1e4e5b0ebaaa..60e688bf8346 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -19,7 +19,6 @@
package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
@@ -31,11 +30,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
@@ -58,14 +55,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
@Override
public List listDatabases() {
- List databases = new ArrayList<>();
- for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
- Path path = status.getPath();
- if (status.isDir() && isDatabase(path)) {
- databases.add(database(path));
- }
- }
- return databases;
+ return listDatabases(warehouse);
}
@Override
@@ -99,14 +89,7 @@ protected void dropDatabaseImpl(String name) {
@Override
protected List listTablesImpl(String databaseName) {
- List tables = new ArrayList<>();
- for (FileStatus status :
- uncheck(() -> fileIO.listDirectories(newDatabasePath(databaseName)))) {
- if (status.isDir() && tableExists(status.getPath())) {
- tables.add(status.getPath().getName());
- }
- }
- return tables;
+ return listTablesImpl(newDatabasePath(databaseName));
}
@Override
@@ -118,10 +101,6 @@ public boolean tableExists(Identifier identifier) {
return tableExists(getDataTableLocation(identifier));
}
- private boolean tableExists(Path tablePath) {
- return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
- }
-
@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
return schemaManager(identifier)
@@ -170,23 +149,6 @@ protected void alterTableImpl(Identifier identifier, List changes)
schemaManager(identifier).commitChanges(changes);
}
- private static T uncheck(Callable callable) {
- try {
- return callable.call();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private static boolean isDatabase(Path path) {
- return path.getName().endsWith(DB_SUFFIX);
- }
-
- private static String database(Path path) {
- String name = path.getName();
- return name.substring(0, name.length() - DB_SUFFIX.length());
- }
-
@Override
public void close() throws Exception {}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
new file mode 100644
index 000000000000..d637eb0b70e1
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Repair procedure. Usage:
+ *
+ *
+ * -- repair all databases and tables in catalog
+ * CALL sys.repair()
+ *
+ * -- repair all tables in a specific database
+ * CALL sys.repair('databaseName')
+ *
+ * -- repair a table
+ * CALL sys.repair('databaseName.tableName')
+ *
+ */
+public class RepairProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "repair";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(ProcedureContext procedureContext)
+ throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException {
+ return call(procedureContext, null);
+ }
+
+ public String[] call(ProcedureContext procedureContext, String identifier)
+ throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
+ if (StringUtils.isBlank(identifier)) {
+ catalog.repairCatalog();
+ return new String[] {"Success"};
+ }
+ String[] paths = identifier.split("\\.");
+ switch (paths.length) {
+ case 1:
+ catalog.repairDatabase(paths[0]);
+ break;
+ case 2:
+ catalog.repairTable(Identifier.create(paths[0], paths[1]));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot get splits from '%s' to get database and table",
+ identifier));
+ }
+
+ return new String[] {"Success"};
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 943da3e16e6d..848dd317d43b 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -50,3 +50,4 @@ org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
+org.apache.paimon.flink.procedure.RepairProcedure
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 0d71e70a6479..77a3b40afae4 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -26,6 +26,7 @@
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
@@ -40,9 +41,16 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableType;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataPartitionComputer;
import org.apache.flink.table.hive.LegacyHiveClasses;
import org.apache.hadoop.conf.Configuration;
@@ -72,11 +80,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -89,6 +99,7 @@
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -336,6 +347,10 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
+ return getDataTableSchema(tableLocation);
+ }
+
+ private TableSchema getDataTableSchema(Path tableLocation) {
return new SchemaManager(fileIO, tableLocation)
.latest()
.orElseThrow(
@@ -422,7 +437,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
client.alter_table(fromDB, fromTableName, table);
Path fromPath = getDataTableLocation(fromTable);
- if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
+ if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
@@ -471,6 +486,136 @@ public boolean caseSensitive() {
return false;
}
+ @Override
+ public void repairCatalog() {
+ List databases = listDatabases(new Path(warehouse));
+ for (String database : databases) {
+ repairDatabase(database);
+ }
+ }
+
+ @Override
+ public void repairDatabase(String databaseName) {
+ Path databasePath = repairHmsDatabase(databaseName);
+ List tables = listTablesImpl(databasePath);
+ CompletableFuture allOf =
+ CompletableFuture.allOf(
+ tables.stream()
+ .map(table -> Identifier.create(databaseName, table))
+ .map(
+ identifier ->
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ repairTable(identifier);
+ } catch (TableNotExistException e) {
+ LOG.error(
+ "Table {} does not exist in the paimon.",
+ identifier.getFullName());
+ // ignore
+ }
+ },
+ COMMON_IO_FORK_JOIN_POOL))
+ .toArray(CompletableFuture[]::new));
+ allOf.join();
+ }
+
+ private Path repairHmsDatabase(String databaseName) {
+ checkNotSystemDatabase(databaseName);
+ if (!databaseExistsImpl(databaseName)) {
+ createDatabaseImpl(databaseName, Collections.emptyMap());
+ }
+
+ try {
+ Database database = client.getDatabase(databaseName);
+ return new Path(locationHelper.getDatabaseLocation(database));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to determine if database "
+ + databaseName
+ + " exists in hive metastore.",
+ e);
+ }
+ }
+
+ @Override
+ public void repairTable(Identifier identifier) throws TableNotExistException {
+ checkNotSystemTable(identifier, "repairTable");
+ validateIdentifierNameCaseInsensitive(identifier);
+
+ // Get paimon table from file system.
+ Path paimonTableLocation = getDataTableLocation(identifier);
+ TableSchema tableSchema = getDataTableSchema(paimonTableLocation);
+ validateFieldNameCaseInsensitive(tableSchema.fieldNames());
+ FileStoreTable paimonTable =
+ FileStoreTableFactory.create(
+ fileIO,
+ paimonTableLocation,
+ tableSchema,
+ new CatalogEnvironment(
+ Lock.factory(
+ lockFactory().orElse(null),
+ lockContext().orElse(null),
+ identifier),
+ super.metastoreClientFactory(identifier).orElse(null),
+ lineageMetaFactory));
+
+ try {
+ try {
+ Table table =
+ client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
+ checkArgument(
+ isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table),
+ String.format(
+ "Table %s is not a paimon table in hive metastore.",
+ identifier.getFullName()));
+ updateHmsTablePars(table, tableSchema);
+ updateHmsTable(table, identifier, tableSchema);
+ client.alter_table(
+ identifier.getDatabaseName(), identifier.getObjectName(), table, true);
+ } catch (NoSuchObjectException e) {
+ // hive table does not exist.
+ HashMap newOptions = new HashMap<>(paimonTable.options());
+ copyTableDefaultOptions(newOptions);
+ tableSchema = paimonTable.schema().copy(newOptions);
+ Table table =
+ newHmsTable(
+ identifier,
+ convertToPropertiesPrefixKey(tableSchema.options(), HIVE_PREFIX));
+ updateHmsTable(table, identifier, tableSchema);
+ client.createTable(table);
+ }
+
+ // repair partitions
+ repairPartition(paimonTable, identifier, tableSchema);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void repairPartition(
+ org.apache.paimon.table.Table paimonTable, Identifier identifier, TableSchema schema)
+ throws Exception {
+ if (!schema.partitionKeys().isEmpty()) {
+ MetastoreClient metastoreClient = metastoreClientFactory(identifier).get().create();
+
+ ReadBuilder readBuilder = paimonTable.newReadBuilder();
+ List partitions = readBuilder.newScan().listPartitions();
+ RowDataPartitionComputer partitionComputer =
+ FileStorePathFactory.getPartitionComputer(
+ schema.logicalPartitionType(),
+ new CoreOptions(schema.options()).partitionDefaultName());
+ for (BinaryRow partition : partitions) {
+ LinkedHashMap partitionSpec =
+ partitionComputer.generatePartValues(
+ Preconditions.checkNotNull(
+ partition,
+ "Partition row data is null. This is unexpected."));
+ metastoreClient.addPartition(partitionSpec);
+ }
+ }
+ }
+
@Override
public void close() throws Exception {
client.close();
@@ -523,7 +668,7 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche
sd.setSerdeInfo(serDeInfo);
CoreOptions options = new CoreOptions(schema.options());
- if (options.partitionedTableInMetastore() && schema.partitionKeys().size() > 0) {
+ if (options.partitionedTableInMetastore() && !schema.partitionKeys().isEmpty()) {
Map fieldMap =
schema.fields().stream()
.collect(Collectors.toMap(DataField::name, Function.identity()));
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 5a915d200cbc..ac140d9496a3 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -1167,6 +1167,115 @@ public void testMarkDone() throws Exception {
insertSql.getJobClient().get().cancel();
}
+ @Test
+ public void testRepairTable() throws Exception {
+ TableEnvironment fileCatalog = useFileCatalog();
+ // Database test_db exists in hive metastore
+ hiveShell.execute("use test_db");
+ // When the Hive table does not exist, specify the paimon table to create hive table in hive
+ // metastore.
+ tEnv.executeSql("CALL sys.repair('test_db.t_repair_hive')");
+
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+ .containsExactlyInAnyOrder("dt=2020-01-02/hh=09");
+
+ alterTableInFileSystem(fileCatalog);
+
+ // When the Hive table exists, specify the paimon table to update hive table in hive
+ // metastore.
+ tEnv.executeSql("CALL sys.repair('test_db.t_repair_hive')");
+ assertThat(
+ hiveShell
+ .executeQuery("DESC FORMATTED t_repair_hive")
+ .contains("item_id\tbigint\titem id"))
+ .isTrue();
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+ .containsExactlyInAnyOrder("dt=2020-01-02/hh=09", "dt=2020-01-03/hh=10");
+ }
+
+ @Test
+ public void testRepairTableWithCustomLocation() throws Exception {
+ TableEnvironment fileCatalog = useFileCatalog();
+ // Database exists in hive metastore and uses custom location.
+ String databaseLocation = path + "test_db.db";
+ hiveShell.execute("CREATE DATABASE my_database\n" + "LOCATION '" + databaseLocation + "';");
+ hiveShell.execute("USE my_database");
+
+ // When the Hive table does not exist, specify the paimon table to create hive table in hive
+ // metastore.
+ tEnv.executeSql("CALL sys.repair('my_database.t_repair_hive')").await();
+
+ String tableLocation = databaseLocation + "/t_repair_hive";
+ assertThat(
+ hiveShell
+ .executeQuery("DESC FORMATTED t_repair_hive")
+ .contains("Location: \t" + tableLocation + "\tNULL"))
+ .isTrue();
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+ .containsExactlyInAnyOrder("dt=2020-01-02/hh=09");
+
+ alterTableInFileSystem(fileCatalog);
+
+ // When the Hive table exists, specify the paimon table to update hive table in hive
+ // metastore.
+ tEnv.executeSql("CALL sys.repair('my_database.t_repair_hive')");
+ assertThat(
+ hiveShell
+ .executeQuery("DESC FORMATTED t_repair_hive")
+ .contains("Location: \t" + tableLocation + "\tNULL"))
+ .isTrue();
+ assertThat(
+ hiveShell
+ .executeQuery("DESC FORMATTED t_repair_hive")
+ .contains("item_id\tbigint\titem id"))
+ .isTrue();
+ assertThat(hiveShell.executeQuery("SHOW PARTITIONS t_repair_hive"))
+ .containsExactlyInAnyOrder("dt=2020-01-02/hh=09", "dt=2020-01-03/hh=10");
+ }
+
+ /** Prepare to update a paimon table with a custom path in the paimon file system. */
+ private void alterTableInFileSystem(TableEnvironment tEnv) throws Exception {
+ tEnv.executeSql(
+ "ALTER TABLE t_repair_hive ADD item_id BIGINT COMMENT 'item id' AFTER user_id")
+ .await();
+ tEnv.executeSql("INSERT INTO t_repair_hive VALUES(2, 1, 'click', '2020-01-03', '10')")
+ .await();
+ }
+
+ private TableEnvironment useFileCatalog() throws Exception {
+ String fileCatalog =
+ "CREATE CATALOG my_file WITH ( "
+ + "'type' = 'paimon',\n"
+ + "'warehouse' = '"
+ + path
+ + "' "
+ + ")";
+ TableEnvironment tEnv =
+ TableEnvironmentImpl.create(
+ EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.executeSql(fileCatalog).await();
+
+ tEnv.executeSql("USE CATALOG my_file").await();
+
+ // Prepare a paimon table with a custom path in the paimon file system.
+ tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
+ tEnv.executeSql("USE test_db").await();
+ tEnv.executeSql(
+ "CREATE TABLE t_repair_hive (\n"
+ + " user_id BIGINT,\n"
+ + " behavior STRING,\n"
+ + " dt STRING,\n"
+ + " hh STRING,\n"
+ + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt, hh)"
+ + " WITH (\n"
+ + "'metastore.partitioned-table' = 'true'\n"
+ + ");")
+ .await();
+ tEnv.executeSql("INSERT INTO t_repair_hive VALUES(1, 'login', '2020-01-02', '09')").await();
+ return tEnv;
+ }
+
private void assertNoPrivilege(Executable executable) {
Exception e = assertThrows(Exception.class, executable);
if (e.getCause() != null) {
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 4af371c69b6e..a61642beaa91 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -27,6 +27,7 @@
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
+import org.apache.paimon.spark.procedure.RepairProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -58,6 +59,7 @@ private static Map> initProcedureBuilders() {
procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
procedureBuilders.put("remove_orphan_files", RemoveOrphanFilesProcedure::builder);
procedureBuilders.put("expire_snapshots", ExpireSnapshotsProcedure::builder);
+ procedureBuilders.put("repair", RepairProcedure::builder);
return procedureBuilders.build();
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
index 881e9d7d3c19..87090dd921be 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
@@ -81,7 +81,7 @@ public InternalRow[] call(InternalRow args) {
String format = args.getString(0);
String sourceTable = args.getString(1);
String properties = args.isNullAt(2) ? null : args.getString(2);
- boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);
+ boolean deleteNeed = args.isNullAt(3) || args.getBoolean(3);
String targetTable = args.isNullAt(4) ? null : args.getString(4);
Identifier sourceTableId = Identifier.fromString(sourceTable);
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RepairProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RepairProcedure.java
new file mode 100644
index 000000000000..47e5ceb4fb4f
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RepairProcedure.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Repair Procedure. Usage:
+ *
+ *
+ * CALL sys.repair([database_or_table => 'tableId'])
+ *
+ */
+public class RepairProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {ProcedureParameter.required("database_or_table", StringType)};
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
+ });
+
+ protected RepairProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
+ String identifier = args.getString(0);
+ try {
+ if (StringUtils.isBlank(identifier)) {
+ paimonCatalog.repairCatalog();
+ return new InternalRow[] {newInternalRow(true)};
+ }
+
+ String[] paths = identifier.split("\\.");
+ switch (paths.length) {
+ case 1:
+ paimonCatalog.repairDatabase(paths[0]);
+ break;
+ case 2:
+ paimonCatalog.repairTable(Identifier.create(paths[0], paths[1]));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot get splits from '%s' to get database and table",
+ identifier));
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException("Call repair error", e);
+ }
+ return new InternalRow[] {newInternalRow(true)};
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder() {
+ @Override
+ public RepairProcedure doBuild() {
+ return new RepairProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "RepairProcedure";
+ }
+}