Skip to content

Commit

Permalink
move some methods to CatalogUtils from AbstractCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Dec 24, 2024
1 parent 047da11 commit 0d0aa61
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -59,8 +58,10 @@

import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -94,31 +95,16 @@ public FileIO fileIO() {
return fileIO;
}

public Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
}

String lock = catalogOptions.get(LOCK_TYPE);
if (lock == null) {
return defaultLockFactory();
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
return CatalogUtils.lockContext(catalogOptions);
}

protected boolean lockEnabled() {
return catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
return CatalogUtils.lockEnabled(catalogOptions, fileIO);
}

protected boolean allowCustomTablePath() {
Expand Down Expand Up @@ -428,7 +414,8 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist
identifier,
tableMeta.uuid,
Lock.factory(
lockFactory().orElse(null),
lockFactory(catalogOptions, fileIO(), defaultLockFactory())
.orElse(null),
lockContext().orElse(null),
identifier),
metastoreClientFactory(identifier).orElse(null)));
Expand Down Expand Up @@ -472,7 +459,7 @@ public void createFormatTable(Identifier identifier, Schema schema) {
* @return The warehouse path for the database
*/
public Path newDatabasePath(String database) {
return newDatabasePath(warehouse(), database);
return CatalogUtils.newDatabasePath(warehouse(), database);
}

public Map<String, Map<String, Path>> allTablePaths() {
Expand Down Expand Up @@ -507,16 +494,6 @@ public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName());
}

protected static void checkNotBranch(Identifier identifier, String method) {
if (identifier.getBranchName() != null) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for branch table '%s', "
+ "please modify the table with the default branch.",
method, identifier));
}
}

protected void assertMainBranch(Identifier identifier) {
if (identifier.getBranchName() != null
&& !DEFAULT_MAIN_BRANCH.equals(identifier.getBranchName())) {
Expand All @@ -525,39 +502,10 @@ protected void assertMainBranch(Identifier identifier) {
}
}

protected static boolean isTableInSystemDatabase(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable();
}

protected static void checkNotSystemTable(Identifier identifier, String method) {
if (isTableInSystemDatabase(identifier)) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for system table '%s', please use data table.",
method, identifier));
}
}

private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}

public static Path newTableLocation(String warehouse, Identifier identifier) {
checkNotBranch(identifier, "newTableLocation");
checkNotSystemTable(identifier, "newTableLocation");
return new Path(
newDatabasePath(warehouse, identifier.getDatabaseName()),
identifier.getTableName());
}

public static Path newDatabasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}

public static boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}

/** Validate database cannot be a system database. */
protected void checkNotSystemDatabase(String database) {
if (isSystemDatabase(database)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@

package org.apache.paimon.catalog;

import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;

import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;

/** Utils for {@link Catalog}. */
Expand Down Expand Up @@ -60,4 +68,68 @@ public static String table(String path) {
public static Map<String, String> tableDefaultOptions(Map<String, String> options) {
return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX);
}

public static boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}

public static boolean isTableInSystemDatabase(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable();
}

public static void checkNotSystemTable(Identifier identifier, String method) {
if (isTableInSystemDatabase(identifier)) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for system table '%s', please use data table.",
method, identifier));
}
}

public static Path newDatabasePath(String warehouse, String database) {
return new Path(warehouse, database + DB_SUFFIX);
}

public static Path newTableLocation(String warehouse, Identifier identifier) {
checkNotBranch(identifier, "newTableLocation");
checkNotSystemTable(identifier, "newTableLocation");
return new Path(
newDatabasePath(warehouse, identifier.getDatabaseName()),
identifier.getTableName());
}

public static void checkNotBranch(Identifier identifier, String method) {
if (identifier.getBranchName() != null) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for branch table '%s', "
+ "please modify the table with the default branch.",
method, identifier));
}
}

public static Optional<CatalogLockFactory> lockFactory(
Options options, FileIO fileIO, Optional<CatalogLockFactory> defaultLockFactoryOpt) {
boolean lockEnabled = lockEnabled(options, fileIO);
if (!lockEnabled) {
return Optional.empty();
}

String lock = options.get(LOCK_TYPE);
if (lock == null) {
return defaultLockFactoryOpt;
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

public static Optional<CatalogLockContext> lockContext(Options options) {
return Optional.of(CatalogLockContext.fromOptions(options));
}

public static boolean lockEnabled(Options options, FileIO fileIO) {
return options.getOptional(LOCK_ENABLED).orElse(fileIO != null && fileIO.isObjectStore());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;

/** A catalog implementation for {@link FileIO}. */
Expand Down Expand Up @@ -123,7 +124,9 @@ public void createTableImpl(Identifier identifier, Schema schema) {
private SchemaManager schemaManager(Identifier identifier) {
Path path = getTableLocation(identifier);
CatalogLock catalogLock =
lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null);
lockFactory(catalogOptions, fileIO(), defaultLockFactory())
.map(fac -> fac.createLock(assertGetLockContext()))
.orElse(null);
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault())
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}
Expand Down
48 changes: 9 additions & 39 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
Expand Down Expand Up @@ -79,9 +75,10 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.apache.paimon.catalog.CatalogUtils.lockContext;
import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
import static org.apache.paimon.catalog.CatalogUtils.newTableLocation;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;

Expand Down Expand Up @@ -393,7 +390,8 @@ private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExist
allPaths.computeIfAbsent(database, d -> new HashMap<>());
for (String table : listTables(database)) {
Path tableLocation =
getTableLocation(Identifier.create(database, table));
newTableLocation(
warehouse(), Identifier.create(database, table));
tableMap.put(table, tableLocation);
}
}
Expand Down Expand Up @@ -435,27 +433,22 @@ private Table getSystemTable(Identifier identifier) throws TableNotExistExceptio
return table;
}

private Path getTableLocation(Identifier identifier) {
return new Path(
new Path(warehouse(), identifier.getDatabaseName() + DB_SUFFIX),
identifier.getTableName());
}

private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableSchema tableSchema = getDataTableSchema(identifier);
String uuid = null;
FileStoreTable table =
FileStoreTableFactory.create(
fileIO,
getTableLocation(identifier),
newTableLocation(warehouse(), identifier),
tableSchema,
new CatalogEnvironment(
identifier,
uuid,
Lock.factory(
lockFactory().orElse(null),
lockContext().orElse(null),
lockFactory(context.options(), fileIO, Optional.empty())
.orElse(null),
lockContext(context.options()).orElse(null),
identifier),
null)); // todo: whether need MetastoreClient.Factory
CoreOptions options = table.coreOptions();
Expand All @@ -472,29 +465,6 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx
return table;
}

private boolean lockEnabled() {
return context.options().getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}

private Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
}

String lock = context.options().get(LOCK_TYPE);
if (lock == null) {
return Optional.empty();
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

private Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(context.options()));
}

private ScheduledExecutorService tokenRefreshExecutor() {
if (refreshExecutor == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
Expand Down Expand Up @@ -632,7 +636,8 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier)
identifier,
tableMeta.uuid(),
Lock.factory(
lockFactory().orElse(null),
lockFactory(catalogOptions, fileIO(), defaultLockFactory())
.orElse(null),
lockContext().orElse(null),
identifier),
metastoreClientFactory(identifier).orElse(null)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.paimon.hive;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void preCreateTable(Table table) throws MetaException {
org.apache.hadoop.fs.Path hadoopPath =
getDnsPath(new org.apache.hadoop.fs.Path(warehouse), conf);
warehouse = hadoopPath.toUri().toString();
location = AbstractCatalog.newTableLocation(warehouse, identifier).toUri().toString();
location = CatalogUtils.newTableLocation(warehouse, identifier).toUri().toString();
table.getSd().setLocation(location);
}

Expand Down
Loading

0 comments on commit 0d0aa61

Please sign in to comment.