Skip to content

Commit

Permalink
[core] Move common code from FileSystemCatalog and HiveCatalog into A…
Browse files Browse the repository at this point in the history
…bstractCatalog (#2124)
  • Loading branch information
tsreaper authored Oct 13, 2023
1 parent a67048e commit fa68a1e
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -84,6 +86,149 @@ protected AbstractCatalog(FileIO fileIO, Map<String, String> options) {
options.get(key)));
}

@Override
public boolean databaseExists(String databaseName) {
if (isSystemDatabase(databaseName)) {
return true;
}

return databaseExistsImpl(databaseName);
}

protected abstract boolean databaseExistsImpl(String databaseName);

@Override
public void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
if (databaseExists(name)) {
if (ignoreIfExists) {
return;
}
throw new DatabaseAlreadyExistException(name);
}

createDatabaseImpl(name);
}

protected abstract void createDatabaseImpl(String name);

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
}
throw new DatabaseNotExistException(name);
}

if (!cascade && listTables(name).size() > 0) {
throw new DatabaseNotEmptyException(name);
}

dropDatabaseImpl(name);
}

protected abstract void dropDatabaseImpl(String name);

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
if (isSystemDatabase(databaseName)) {
return GLOBAL_TABLES;
}
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(databaseName);
}

return listTablesImpl(databaseName);
}

protected abstract List<String> listTablesImpl(String databaseName);

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotSystemTable(identifier, "dropTable");
if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(identifier);
}

dropTableImpl(identifier);
}

protected abstract void dropTableImpl(Identifier identifier);

@Override
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
checkNotSystemTable(identifier, "createTable");
if (!databaseExists(identifier.getDatabaseName())) {
throw new DatabaseNotExistException(identifier.getDatabaseName());
}

if (tableExists(identifier)) {
if (ignoreIfExists) {
return;
}
throw new TableAlreadyExistException(identifier);
}

copyTableDefaultOptions(schema.options());

createTableImpl(identifier, schema);
}

protected abstract void createTableImpl(Identifier identifier, Schema schema);

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
checkNotSystemTable(fromTable, "renameTable");
checkNotSystemTable(toTable, "renameTable");

if (!tableExists(fromTable)) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(fromTable);
}

if (tableExists(toTable)) {
throw new TableAlreadyExistException(toTable);
}

renameTableImpl(fromTable, toTable);
}

protected abstract void renameTableImpl(Identifier fromTable, Identifier toTable);

@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(identifier);
}

alterTableImpl(identifier, changes);
}

protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException;

@Nullable
private LineageMeta findAndCreateLineageMeta(Options options, ClassLoader classLoader) {
return options.getOptional(LINEAGE_META)
Expand Down Expand Up @@ -175,8 +320,12 @@ private static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER);
}

protected void checkNotSystemTable(Identifier identifier, String method) {
if (isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier)) {
protected boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}

private void checkNotSystemTable(Identifier identifier, String method) {
if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for system table '%s', please use data table.",
Expand Down Expand Up @@ -213,7 +362,7 @@ public static Path databasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}

protected boolean isSystemDatabase(String database) {
private boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,58 +65,22 @@ public List<String> listDatabases() {
}

@Override
public boolean databaseExists(String databaseName) {
if (isSystemDatabase(databaseName)) {
return true;
}
protected boolean databaseExistsImpl(String databaseName) {
return uncheck(() -> fileIO.exists(databasePath(databaseName)));
}

@Override
public void createDatabase(String name, boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
if (databaseExists(name)) {
if (ignoreIfExists) {
return;
}
throw new DatabaseAlreadyExistException(name);
}
protected void createDatabaseImpl(String name) {
uncheck(() -> fileIO.mkdirs(databasePath(name)));
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
if (isSystemDatabase(name)) {
throw new ProcessSystemDatabaseException();
}
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
}

throw new DatabaseNotExistException(name);
}

if (!cascade && listTables(name).size() > 0) {
throw new DatabaseNotEmptyException(name);
}

protected void dropDatabaseImpl(String name) {
uncheck(() -> fileIO.delete(databasePath(name), true));
}

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
if (isSystemDatabase(databaseName)) {
return GLOBAL_TABLES;
}
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(databaseName);
}

protected List<String> listTablesImpl(String databaseName) {
List<String> tables = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listStatus(databasePath(databaseName)))) {
if (status.isDir() && tableExists(status.getPath())) {
Expand All @@ -127,86 +91,48 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
}

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Path path = getDataTableLocation(identifier);
return new SchemaManager(fileIO, path)
.latest()
.orElseThrow(() -> new TableNotExistException(identifier));
public boolean tableExists(Identifier identifier) {
if (isSystemTable(identifier)) {
return super.tableExists(identifier);
}

return tableExists(getDataTableLocation(identifier));
}

private boolean tableExists(Path tablePath) {
return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotSystemTable(identifier, "dropTable");
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Path path = getDataTableLocation(identifier);
if (!tableExists(path)) {
if (ignoreIfNotExists) {
return;
}

throw new TableNotExistException(identifier);
}
return new SchemaManager(fileIO, path)
.latest()
.orElseThrow(() -> new TableNotExistException(identifier));
}

@Override
protected void dropTableImpl(Identifier identifier) {
Path path = getDataTableLocation(identifier);
uncheck(() -> fileIO.delete(path, true));
}

@Override
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
checkNotSystemTable(identifier, "createTable");
if (!databaseExists(identifier.getDatabaseName())) {
throw new DatabaseNotExistException(identifier.getDatabaseName());
}

public void createTableImpl(Identifier identifier, Schema schema) {
Path path = getDataTableLocation(identifier);
if (tableExists(path)) {
if (ignoreIfExists) {
return;
}

throw new TableAlreadyExistException(identifier);
}

copyTableDefaultOptions(schema.options());

uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
}

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
checkNotSystemTable(fromTable, "renameTable");
checkNotSystemTable(toTable, "renameTable");
public void renameTableImpl(Identifier fromTable, Identifier toTable) {
Path fromPath = getDataTableLocation(fromTable);
if (!tableExists(fromPath)) {
if (ignoreIfNotExists) {
return;
}

throw new TableNotExistException(fromTable);
}

Path toPath = getDataTableLocation(toTable);
if (tableExists(toPath)) {
throw new TableAlreadyExistException(toTable);
}

uncheck(() -> fileIO.rename(fromPath, toPath));
}

@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
if (!tableExists(getDataTableLocation(identifier))) {
throw new TableNotExistException(identifier);
}

new SchemaManager(fileIO, getDataTableLocation(identifier)).commitChanges(changes);
}

Expand Down
Loading

0 comments on commit fa68a1e

Please sign in to comment.