Skip to content

Commit

Permalink
[core] Introduce DelegateCatalog (#3824)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jul 29, 2024
1 parent a561ea6 commit 775c086
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,6 @@ protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

@Override
public boolean databaseExists(String databaseName) {
if (isSystemDatabase(databaseName)) {
return true;
}

return databaseExistsImpl(databaseName);
}

protected abstract boolean databaseExistsImpl(String databaseName);

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
Expand All @@ -159,13 +148,11 @@ public Map<String, String> loadDatabaseProperties(String name)
if (isSystemDatabase(name)) {
return Collections.emptyMap();
}
if (!databaseExists(name)) {
throw new DatabaseNotExistException(name);
}
return loadDatabasePropertiesImpl(name);
}

protected abstract Map<String, String> loadDatabasePropertiesImpl(String name);
protected abstract Map<String, String> loadDatabasePropertiesImpl(String name)
throws DatabaseNotExistException;

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ default Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier iden
* @param databaseName Name of the database
* @return true if the given database exists in the catalog false otherwise
*/
boolean databaseExists(String databaseName);
default boolean databaseExists(String databaseName) {
try {
loadDatabaseProperties(databaseName);
return true;
} catch (DatabaseNotExistException e) {
return false;
}
}

/**
* Create a database, see {@link Catalog#createDatabase(String name, boolean ignoreIfExists, Map
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/** A {@link Catalog} to delegate all operations to another {@link Catalog}. */
public class DelegateCatalog implements Catalog {

protected final Catalog wrapped;

public DelegateCatalog(Catalog wrapped) {
this.wrapped = wrapped;
}

public Catalog wrapped() {
return wrapped;
}

@Override
public boolean caseSensitive() {
return wrapped.caseSensitive();
}

@Override
public String warehouse() {
return wrapped.warehouse();
}

@Override
public Map<String, String> options() {
return wrapped.options();
}

@Override
public FileIO fileIO() {
return wrapped.fileIO();
}

@Override
public Optional<CatalogLockFactory> lockFactory() {
return wrapped.lockFactory();
}

@Override
public Optional<CatalogLockContext> lockContext() {
return wrapped.lockContext();
}

@Override
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
return wrapped.metastoreClientFactory(identifier);
}

@Override
public List<String> listDatabases() {
return wrapped.listDatabases();
}

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
wrapped.createDatabase(name, ignoreIfExists, properties);
}

@Override
public Map<String, String> loadDatabaseProperties(String name)
throws DatabaseNotExistException {
return wrapped.loadDatabaseProperties(name);
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
wrapped.dropDatabase(name, ignoreIfNotExists, cascade);
}

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
return wrapped.listTables(databaseName);
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
wrapped.dropTable(identifier, ignoreIfNotExists);
}

@Override
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
wrapped.createTable(identifier, schema, ignoreIfExists);
}

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
wrapped.renameTable(fromTable, toTable, ignoreIfNotExists);
}

@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
}

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
return wrapped.getTable(identifier);
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException {
wrapped.dropPartition(identifier, partitions);
}

@Override
public void repairCatalog() {
wrapped.repairCatalog();
}

@Override
public void repairDatabase(String databaseName) {
wrapped.repairDatabase(databaseName);
}

@Override
public void repairTable(Identifier identifier) throws TableNotExistException {
wrapped.repairTable(identifier);
}

@Override
public void close() throws Exception {
wrapped.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ public List<String> listDatabases() {
return uncheck(() -> listDatabasesInFileSystem(warehouse));
}

@Override
protected boolean databaseExistsImpl(String databaseName) {
return uncheck(() -> fileIO.exists(newDatabasePath(databaseName)));
}

@Override
protected void createDatabaseImpl(String name, Map<String, String> properties) {
if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) {
Expand All @@ -81,7 +76,11 @@ protected void createDatabaseImpl(String name, Map<String, String> properties) {
}

@Override
public Map<String, String> loadDatabasePropertiesImpl(String name) {
public Map<String, String> loadDatabasePropertiesImpl(String name)
throws DatabaseNotExistException {
if (!uncheck(() -> fileIO.exists(newDatabasePath(name)))) {
throw new DatabaseNotExistException(name);
}
return Collections.emptyMap();
}

Expand Down
40 changes: 9 additions & 31 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,10 @@ public List<String> listDatabases() {
}

@Override
protected boolean databaseExistsImpl(String databaseName) {
return JdbcUtils.databaseExists(connections, catalogKey, databaseName);
}

@Override
protected Map<String, String> loadDatabasePropertiesImpl(String databaseName) {
if (!databaseExists(databaseName)) {
throw new RuntimeException(String.format("Database does not exist: %s", databaseName));
protected Map<String, String> loadDatabasePropertiesImpl(String databaseName)
throws DatabaseNotExistException {
if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) {
throw new DatabaseNotExistException(databaseName);
}
Map<String, String> properties = Maps.newHashMap();
properties.putAll(fetchProperties(databaseName));
Expand All @@ -179,10 +175,6 @@ protected Map<String, String> loadDatabasePropertiesImpl(String databaseName) {

@Override
protected void createDatabaseImpl(String name, Map<String, String> properties) {
if (databaseExists(name)) {
throw new RuntimeException(String.format("Database already exists: %s", name));
}

Map<String, String> createProps = new HashMap<>();
createProps.put(DATABASE_EXISTS_PROPERTY, "true");
if (properties != null && !properties.isEmpty()) {
Expand All @@ -206,9 +198,6 @@ protected void dropDatabaseImpl(String name) {

@Override
protected List<String> listTablesImpl(String databaseName) {
if (!databaseExists(databaseName)) {
throw new RuntimeException(String.format("Database does not exist: %s", databaseName));
}
return fetch(
row -> row.getString(JdbcUtils.TABLE_NAME),
JdbcUtils.LIST_TABLES_SQL,
Expand Down Expand Up @@ -312,9 +301,6 @@ protected void alterTableImpl(
Identifier identifier, String branchName, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
assertMainBranch(branchName);
if (!tableExists(identifier)) {
throw new RuntimeException("Table is not exists " + identifier.getFullName());
}
SchemaManager schemaManager = getSchemaManager(identifier);
schemaManager.commitChanges(changes);
}
Expand All @@ -323,7 +309,11 @@ protected void alterTableImpl(
protected TableSchema getDataTableSchema(Identifier identifier, String branchName)
throws TableNotExistException {
assertMainBranch(branchName);
if (!tableExists(identifier)) {
if (!JdbcUtils.tableExists(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getObjectName())) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
Expand All @@ -333,15 +323,6 @@ protected TableSchema getDataTableSchema(Identifier identifier, String branchNam
() -> new RuntimeException("There is no paimon table in " + tableLocation));
}

@Override
public boolean tableExists(Identifier identifier) {
if (isSystemTable(identifier)) {
return super.tableExists(identifier);
}
return JdbcUtils.tableExists(
connections, catalogKey, identifier.getDatabaseName(), identifier.getObjectName());
}

@Override
public boolean caseSensitive() {
return false;
Expand Down Expand Up @@ -383,9 +364,6 @@ private SchemaManager getSchemaManager(Identifier identifier) {
}

private Map<String, String> fetchProperties(String databaseName) {
if (!databaseExists(databaseName)) {
throw new RuntimeException(String.format("Database does not exist: %s", databaseName));
}
List<Map.Entry<String, String>> entries =
fetch(
row ->
Expand Down
Loading

0 comments on commit 775c086

Please sign in to comment.