Skip to content

Commit

Permalink
[core][hive] Introduce repair metastore procedure
Browse files Browse the repository at this point in the history
This closes apache#3355
  • Loading branch information
zhuangchong authored and sunxiaojian committed May 28, 2024
1 parent 6303dc2 commit bbee24b
Show file tree
Hide file tree
Showing 12 changed files with 548 additions and 52 deletions.
18 changes: 18 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,24 @@ All available procedures are listed below.
CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', retain_min => 10)<br/><br/>
CALL sys.expire_snapshots(`table` => 'default.T', older_than => '2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
</td>
</tr>
<tr>
<td>repair</td>
<td>
-- repair all databases and tables in catalog<br/>
CALL sys.repair()<br/><br/>
-- repair all tables in a specific database<br/>
CALL sys.repair('databaseName')<br/><br/>
-- repair a table<br/>
CALL sys.repair('databaseName.tableName')<br/><br/>
</td>
<td>
Synchronize information from the file system to Metastore. Argument:
<li>empty: all databases and tables in catalog.</li>
<li>databaseName : the target database name.</li>
<li>tableName: the target table identifier.</li>
</td>
<td>CALL sys.repair('test_db.T')</td>
</tr>
</tbody>
</table>
10 changes: 10 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,15 @@ This section introduce all available spark procedures about paimon.
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')
</td>
</tr>
<tr>
<td>repair</td>
<td>
Synchronize information from the file system to Metastore. Argument:
<li>database_or_table: empty or the target database name or the target table identifier.</li>
</td>
<td>
CALL sys.repair('test_db.T')
</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -48,6 +50,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
Expand Down Expand Up @@ -124,6 +127,17 @@ protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

protected List<String> listDatabases(Path warehouse) {
List<String> databases = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
Path path = status.getPath();
if (status.isDir() && isDatabase(path)) {
databases.add(database(path));
}
}
return databases;
}

@Override
public boolean databaseExists(String databaseName) {
if (isSystemDatabase(databaseName)) {
Expand All @@ -138,9 +152,7 @@ public boolean databaseExists(String databaseName) {
@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
checkNotSystemDatabase(name);
if (databaseExists(name)) {
if (ignoreIfExists) {
return;
Expand Down Expand Up @@ -179,9 +191,7 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
checkNotSystemDatabase(name);
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
Expand Down Expand Up @@ -210,8 +220,22 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
return listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
}

protected List<String> listTablesImpl(Path databasePath) {
List<String> tables = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listDirectories(databasePath))) {
if (status.isDir() && tableExists(status.getPath())) {
tables.add(status.getPath().getName());
}
}
return tables;
}

protected abstract List<String> listTablesImpl(String databaseName);

protected boolean tableExists(Path tablePath) {
return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
Expand Down Expand Up @@ -396,7 +420,7 @@ protected boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}

private void checkNotSystemTable(Identifier identifier, String method) {
protected void checkNotSystemTable(Identifier identifier, String method) {
if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -439,6 +463,13 @@ private boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}

/** Validate database cannot be a system database. */
protected void checkNotSystemDatabase(String database) {
if (isSystemDatabase(database)) {
throw new ProcessSystemDatabaseException();
}
}

/** Validate database, table and field names must be lowercase when not case-sensitive. */
public static void validateCaseInsensitive(
boolean caseSensitive, String type, String... names) {
Expand All @@ -460,7 +491,7 @@ public static void validateCaseInsensitive(
type, illegalNames));
}

private void validateIdentifierNameCaseInsensitive(Identifier identifier) {
protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
validateCaseInsensitive(caseSensitive(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
}
Expand All @@ -479,7 +510,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
validateFieldNameCaseInsensitive(fieldNames);
}

private void validateFieldNameCaseInsensitive(List<String> fieldNames) {
protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
}

Expand All @@ -493,4 +524,21 @@ private void validateAutoCreateClose(Map<String, String> options) {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private static boolean isDatabase(Path path) {
return path.getName().endsWith(DB_SUFFIX);
}

private static String database(Path path) {
String name = path.getName();
return name.substring(0, name.length() - DB_SUFFIX.length());
}

protected static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
12 changes: 12 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ default boolean caseSensitive() {
return true;
}

default void repairCatalog() {
throw new UnsupportedOperationException();
}

default void repairDatabase(String databaseName) {
throw new UnsupportedOperationException();
}

default void repairTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
}

/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
Expand All @@ -31,11 +30,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;

Expand All @@ -58,14 +55,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {

@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
Path path = status.getPath();
if (status.isDir() && isDatabase(path)) {
databases.add(database(path));
}
}
return databases;
return listDatabases(warehouse);
}

@Override
Expand Down Expand Up @@ -99,14 +89,7 @@ protected void dropDatabaseImpl(String name) {

@Override
protected List<String> listTablesImpl(String databaseName) {
List<String> tables = new ArrayList<>();
for (FileStatus status :
uncheck(() -> fileIO.listDirectories(newDatabasePath(databaseName)))) {
if (status.isDir() && tableExists(status.getPath())) {
tables.add(status.getPath().getName());
}
}
return tables;
return listTablesImpl(newDatabasePath(databaseName));
}

@Override
Expand All @@ -118,10 +101,6 @@ public boolean tableExists(Identifier identifier) {
return tableExists(getDataTableLocation(identifier));
}

private boolean tableExists(Path tablePath) {
return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
}

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
return schemaManager(identifier)
Expand Down Expand Up @@ -170,23 +149,6 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
schemaManager(identifier).commitChanges(changes);
}

private static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static boolean isDatabase(Path path) {
return path.getName().endsWith(DB_SUFFIX);
}

private static String database(Path path) {
String name = path.getName();
return name.substring(0, name.length() - DB_SUFFIX.length());
}

@Override
public void close() throws Exception {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Repair procedure. Usage:
*
* <pre><code>
* -- repair all databases and tables in catalog
* CALL sys.repair()
*
* -- repair all tables in a specific database
* CALL sys.repair('databaseName')
*
* -- repair a table
* CALL sys.repair('databaseName.tableName')
* </code></pre>
*/
public class RepairProcedure extends ProcedureBase {

public static final String IDENTIFIER = "repair";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext)
throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException {
return call(procedureContext, null);
}

public String[] call(ProcedureContext procedureContext, String identifier)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
if (StringUtils.isBlank(identifier)) {
catalog.repairCatalog();
return new String[] {"Success"};
}
String[] paths = identifier.split("\\.");
switch (paths.length) {
case 1:
catalog.repairDatabase(paths[0]);
break;
case 2:
catalog.repairTable(Identifier.create(paths[0], paths[1]));
break;
default:
throw new IllegalArgumentException(
String.format(
"Cannot get splits from '%s' to get database and table",
identifier));
}

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
Loading

0 comments on commit bbee24b

Please sign in to comment.