diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index c70a9c7d69d1..61103559b15b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -18,14 +18,21 @@ package org.apache.paimon.rest; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; +import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.rest.auth.AuthSession; @@ -47,8 +54,14 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; +import org.apache.paimon.table.object.ObjectTable; +import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -58,13 +71,18 @@ import java.io.UncheckedIOException; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; +import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; /** A catalog implementation for REST. */ @@ -245,7 +263,13 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep @Override public Table getTable(Identifier identifier) throws TableNotExistException { - throw new UnsupportedOperationException(); + if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { + return getAllInSystemDatabase(identifier); + } else if (identifier.isSystemTable()) { + return getSystemTable(identifier); + } else { + return getDataOrFormatTable(identifier); + } } protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { @@ -358,6 +382,119 @@ private void updateTable(Identifier fromTable, Identifier toTable, List>> getAllTablePathsFunction = + () -> { + try { + Map> allPaths = new HashMap<>(); + for (String database : listDatabases()) { + Map tableMap = + allPaths.computeIfAbsent(database, d -> new HashMap<>()); + for (String table : listTables(database)) { + Path tableLocation = + getTableLocation(Identifier.create(database, table)); + tableMap.put(table, tableLocation); + } + } + return allPaths; + } catch (DatabaseNotExistException e) { + throw new RuntimeException("Database is deleted while listing", e); + } + }; + Table table = + SystemTableLoader.loadGlobal( + tableName, fileIO, getAllTablePathsFunction, context.options()); + if (table == null) { + throw new TableNotExistException(identifier); + } + return table; + } + + private Table getSystemTable(Identifier identifier) throws TableNotExistException { + Table originTable = + getDataOrFormatTable( + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getBranchName(), + null)); + if (!(originTable instanceof FileStoreTable)) { + throw new UnsupportedOperationException( + String.format( + "Only data table support system tables, but this table %s is %s.", + identifier, originTable.getClass())); + } + Table table = + SystemTableLoader.load( + Preconditions.checkNotNull(identifier.getSystemTableName()), + (FileStoreTable) originTable); + if (table == null) { + throw new TableNotExistException(identifier); + } + return table; + } + + private Path getTableLocation(Identifier identifier) { + return new Path( + new Path(warehouse(), identifier.getDatabaseName() + DB_SUFFIX), + identifier.getTableName()); + } + + private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException { + Preconditions.checkArgument(identifier.getSystemTableName() == null); + TableSchema tableSchema = getDataTableSchema(identifier); + String uuid = null; + FileStoreTable table = + FileStoreTableFactory.create( + fileIO, + getTableLocation(identifier), + tableSchema, + new CatalogEnvironment( + identifier, + uuid, + Lock.factory( + lockFactory().orElse(null), + lockContext().orElse(null), + identifier), + null)); // todo: whether need MetastoreClient.Factory + CoreOptions options = table.coreOptions(); + if (options.type() == TableType.OBJECT_TABLE) { + String objectLocation = options.objectLocation(); + checkNotNull(objectLocation, "Object location should not be null for object table."); + table = + ObjectTable.builder() + .underlyingTable(table) + .objectLocation(objectLocation) + .objectFileIO(this.fileIO) + .build(); + } + return table; + } + + private boolean lockEnabled() { + return context.options().getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore()); + } + + private Optional lockFactory() { + if (!lockEnabled()) { + return Optional.empty(); + } + + String lock = context.options().get(LOCK_TYPE); + if (lock == null) { + return Optional.empty(); + } + + return Optional.of( + FactoryUtil.discoverFactory( + AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock)); + } + + private Optional lockContext() { + return Optional.of(CatalogLockContext.fromOptions(context.options())); + } + private ScheduledExecutorService tokenRefreshExecutor() { if (refreshExecutor == null) { synchronized (this) {