Skip to content

Commit

Permalink
[core][hive] Optimize codes for repairing metastore procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed May 24, 2024
1 parent 6011456 commit 592bd24
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -50,7 +51,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
Expand Down Expand Up @@ -127,17 +127,6 @@ protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

protected List<String> listDatabases(Path warehouse) {
List<String> databases = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
Path path = status.getPath();
if (status.isDir() && isDatabase(path)) {
databases.add(database(path));
}
}
return databases;
}

@Override
public boolean databaseExists(String databaseName) {
if (isSystemDatabase(databaseName)) {
Expand Down Expand Up @@ -220,22 +209,8 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
return listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
}

protected List<String> listTablesImpl(Path databasePath) {
List<String> tables = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listDirectories(databasePath))) {
if (status.isDir() && tableExists(status.getPath())) {
tables.add(status.getPath().getName());
}
}
return tables;
}

protected abstract List<String> listTablesImpl(String databaseName);

protected boolean tableExists(Path tablePath) {
return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
Expand Down Expand Up @@ -525,20 +500,35 @@ private void validateAutoCreateClose(Map<String, String> options) {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private static boolean isDatabase(Path path) {
return path.getName().endsWith(DB_SUFFIX);
}
// =============================== Meta in File System =====================================

private static String database(Path path) {
String name = path.getName();
return name.substring(0, name.length() - DB_SUFFIX.length());
protected List<String> listDatabasesInFileSystem(Path warehouse) throws IOException {
List<String> databases = new ArrayList<>();
for (FileStatus status : fileIO.listDirectories(warehouse)) {
Path path = status.getPath();
if (status.isDir() && path.getName().endsWith(DB_SUFFIX)) {
String fileName = path.getName();
databases.add(fileName.substring(0, fileName.length() - DB_SUFFIX.length()));
}
}
return databases;
}

protected static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
protected List<String> listTablesInFileSystem(Path databasePath) throws IOException {
List<String> tables = new ArrayList<>();
for (FileStatus status : fileIO.listDirectories(databasePath)) {
if (status.isDir() && tableExistsInFileSystem(status.getPath())) {
tables.add(status.getPath().getName());
}
}
return tables;
}

protected boolean tableExistsInFileSystem(Path tablePath) {
return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
}

public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath) {
return new SchemaManager(fileIO, tablePath).latest();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;

Expand All @@ -55,7 +56,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {

@Override
public List<String> listDatabases() {
return listDatabases(warehouse);
return uncheck(() -> listDatabasesInFileSystem(warehouse));
}

@Override
Expand Down Expand Up @@ -89,7 +90,7 @@ protected void dropDatabaseImpl(String name) {

@Override
protected List<String> listTablesImpl(String databaseName) {
return listTablesImpl(newDatabasePath(databaseName));
return uncheck(() -> listTablesInFileSystem(newDatabasePath(databaseName)));
}

@Override
Expand All @@ -98,7 +99,7 @@ public boolean tableExists(Identifier identifier) {
return super.tableExists(identifier);
}

return tableExists(getDataTableLocation(identifier));
return tableExistsInFileSystem(getDataTableLocation(identifier));
}

@Override
Expand Down Expand Up @@ -149,6 +150,14 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
schemaManager(identifier).commitChanges(changes);
}

protected static <T> T uncheck(Callable<T> callable) {
try {
return callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {}

Expand Down
Loading

0 comments on commit 592bd24

Please sign in to comment.