Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-22] Introduce repair metastore procedure #3355

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,24 @@ All available procedures are listed below.
CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', retain_min => 10)<br/><br/>
CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
</td>
</tr>
<tr>
<td>repair</td>
<td>
-- repair all databases and tables in catalog<br/>
CALL sys.repair()<br/><br/>
-- repair all tables in a specific database<br/>
CALL sys.repair('databaseName')<br/><br/>
-- repair a table<br/>
CALL sys.repair('databaseName.tableName')<br/><br/>
</td>
<td>
Synchronize information from the file system to Metastore. Argument:
<li>empty: all databases and tables in catalog.</li>
<li>databaseName : the target database name.</li>
<li>tableName: the target table identifier.</li>
</td>
<td>CALL sys.repair('test_db.T')</td>
</tr>
</tbody>
</table>
10 changes: 10 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,15 @@ This section introduce all available spark procedures about paimon.
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')
</td>
</tr>
<tr>
<td>repair</td>
<td>
Synchronize information from the file system to Metastore. Argument:
<li>database_or_table: empty or the target database name or the target table identifier.</li>
</td>
<td>
CALL sys.repair('test_db.T')
</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -48,6 +51,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
Expand Down Expand Up @@ -124,6 +128,17 @@ protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

protected List<String> listDatabases(Path warehouse) {
List<String> databases = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
Path path = status.getPath();
if (status.isDir() && isDatabase(path)) {
databases.add(database(path));
}
}
return databases;
}

@Override
public boolean databaseExists(String databaseName) {
if (isSystemDatabase(databaseName)) {
Expand All @@ -138,9 +153,7 @@ public boolean databaseExists(String databaseName) {
@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
checkNotSystemDatabase(name);
if (databaseExists(name)) {
if (ignoreIfExists) {
return;
Expand Down Expand Up @@ -179,9 +192,7 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
checkNotSystemDatabase(name);
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
Expand All @@ -207,11 +218,30 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
throw new DatabaseNotExistException(databaseName);
}

return listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
return listTables(databaseName, false).stream().sorted().collect(Collectors.toList());
}

protected List<String> listTables(String databaseName, boolean ignoreMetastore) {
if (!ignoreMetastore) {
return listTablesImpl(databaseName);
}

List<String> tables = new ArrayList<>();
for (FileStatus status :
uncheck(() -> fileIO.listDirectories(newDatabasePath(databaseName)))) {
if (status.isDir() && tableExists(status.getPath())) {
tables.add(status.getPath().getName());
}
}
return tables;
}

protected abstract List<String> listTablesImpl(String databaseName);

protected boolean tableExists(Path tablePath) {
return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
Expand Down Expand Up @@ -339,16 +369,23 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
}

private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier);
protected FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException {
return getDataTable(identifier, false);
}

protected FileStoreTable getDataTable(Identifier identifier, boolean ignoreIfMetastoreNotExists)
throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier, ignoreIfMetastoreNotExists);
MetastoreClient.Factory metastoreClientFactory =
ignoreIfMetastoreNotExists ? null : metastoreClientFactory(identifier).orElse(null);
return FileStoreTableFactory.create(
fileIO,
getDataTableLocation(identifier),
tableSchema,
new CatalogEnvironment(
Lock.factory(
lockFactory().orElse(null), lockContext().orElse(null), identifier),
metastoreClientFactory(identifier).orElse(null),
metastoreClientFactory,
lineageMetaFactory));
}

Expand Down Expand Up @@ -379,8 +416,22 @@ public Map<String, Map<String, Path>> allTablePaths() {
}
}

protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;
protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
return getDataTableSchema(identifier, false);
}

protected TableSchema getDataTableSchema(
Identifier identifier, boolean ignoreIfMetastoreNotExists)
throws TableNotExistException {
if (!ignoreIfMetastoreNotExists && !tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
return new SchemaManager(fileIO, tableLocation)
.latest()
.orElseThrow(
() -> new RuntimeException("There is no paimon table in " + tableLocation));
}

@VisibleForTesting
public Path getDataTableLocation(Identifier identifier) {
Expand All @@ -395,7 +446,7 @@ protected boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}

private void checkNotSystemTable(Identifier identifier, String method) {
protected void checkNotSystemTable(Identifier identifier, String method) {
if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -438,6 +489,12 @@ private boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}

protected void checkNotSystemDatabase(String database) {
if (isSystemDatabase(database)) {
throw new ProcessSystemDatabaseException();
}
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, String... names) {
Expand All @@ -459,7 +516,7 @@ public static void validateCaseInsensitive(
type, illegalNames));
}

private void validateIdentifierNameCaseInsensitive(Identifier identifier) {
protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
validateCaseInsensitive(caseSensitive(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
}
Expand All @@ -478,7 +535,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
validateFieldNameCaseInsensitive(fieldNames);
}

private void validateFieldNameCaseInsensitive(List<String> fieldNames) {
protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
}

Expand All @@ -492,4 +549,21 @@ private void validateAutoCreateClose(Map<String, String> options) {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private static boolean isDatabase(Path path) {
return path.getName().endsWith(DB_SUFFIX);
}

private static String database(Path path) {
String name = path.getName();
return name.substring(0, name.length() - DB_SUFFIX.length());
}

protected static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
12 changes: 12 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ default boolean caseSensitive() {
return true;
}

default void repairCatalog() {
throw new UnsupportedOperationException();
}

default void repairDatabase(String databaseName) {
throw new UnsupportedOperationException();
}

default void repairTable(Identifier identifier) throws TableNotExistException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Querying the paimon table by identifier in the file system may not exist. TableNotExistException is the exception thrown on the getDataTable method.

throw new UnsupportedOperationException();
}

/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
Expand All @@ -31,11 +30,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;

Expand All @@ -58,14 +55,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {

@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
Path path = status.getPath();
if (status.isDir() && isDatabase(path)) {
databases.add(database(path));
}
}
return databases;
return listDatabases(warehouse);
}

@Override
Expand Down Expand Up @@ -99,14 +89,7 @@ protected void dropDatabaseImpl(String name) {

@Override
protected List<String> listTablesImpl(String databaseName) {
List<String> tables = new ArrayList<>();
for (FileStatus status :
uncheck(() -> fileIO.listDirectories(newDatabasePath(databaseName)))) {
if (status.isDir() && tableExists(status.getPath())) {
tables.add(status.getPath().getName());
}
}
return tables;
return listTables(databaseName, true);
}

@Override
Expand All @@ -118,10 +101,6 @@ public boolean tableExists(Identifier identifier) {
return tableExists(getDataTableLocation(identifier));
}

private boolean tableExists(Path tablePath) {
return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
}

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
return schemaManager(identifier)
Expand Down Expand Up @@ -170,23 +149,6 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
schemaManager(identifier).commitChanges(changes);
}

private static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static boolean isDatabase(Path path) {
return path.getName().endsWith(DB_SUFFIX);
}

private static String database(Path path) {
String name = path.getName();
return name.substring(0, name.length() - DB_SUFFIX.length());
}

@Override
public void close() throws Exception {}

Expand Down
13 changes: 0 additions & 13 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -317,18 +316,6 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
schemaManager.commitChanges(changes);
}

@Override
protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
return new SchemaManager(fileIO, tableLocation)
.latest()
.orElseThrow(
() -> new RuntimeException("There is no paimon table in " + tableLocation));
}

@Override
public boolean tableExists(Identifier identifier) {
if (isSystemTable(identifier)) {
Expand Down
Loading
Loading