Skip to content

Commit

Permalink
support get table follow Abstract Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Dec 19, 2024
1 parent 67067d3 commit a682c0f
Showing 1 changed file with 138 additions and 1 deletion.
139 changes: 138 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@

package org.apache.paimon.rest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.auth.AuthSession;
Expand All @@ -47,8 +54,14 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
Expand All @@ -58,13 +71,18 @@
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;

/** A catalog implementation for REST. */
Expand Down Expand Up @@ -245,7 +263,13 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
return getAllInSystemDatabase(identifier);
} else if (identifier.isSystemTable()) {
return getSystemTable(identifier);
} else {
return getDataOrFormatTable(identifier);
}
}

protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Expand Down Expand Up @@ -358,6 +382,119 @@ private void updateTable(Identifier fromTable, Identifier toTable, List<SchemaCh
headers());
}

private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExistException {
String tableName = identifier.getTableName();
Supplier<Map<String, Map<String, Path>>> getAllTablePathsFunction =
() -> {
try {
Map<String, Map<String, Path>> allPaths = new HashMap<>();
for (String database : listDatabases()) {
Map<String, Path> tableMap =
allPaths.computeIfAbsent(database, d -> new HashMap<>());
for (String table : listTables(database)) {
Path tableLocation =
getTableLocation(Identifier.create(database, table));
tableMap.put(table, tableLocation);
}
}
return allPaths;
} catch (DatabaseNotExistException e) {
throw new RuntimeException("Database is deleted while listing", e);
}
};
Table table =
SystemTableLoader.loadGlobal(
tableName, fileIO, getAllTablePathsFunction, context.options());
if (table == null) {
throw new TableNotExistException(identifier);
}
return table;
}

private Table getSystemTable(Identifier identifier) throws TableNotExistException {
Table originTable =
getDataOrFormatTable(
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null));
if (!(originTable instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
String.format(
"Only data table support system tables, but this table %s is %s.",
identifier, originTable.getClass()));
}
Table table =
SystemTableLoader.load(
Preconditions.checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
return table;
}

private Path getTableLocation(Identifier identifier) {
return new Path(
new Path(warehouse(), identifier.getDatabaseName() + DB_SUFFIX),
identifier.getTableName());
}

private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableSchema tableSchema = getDataTableSchema(identifier);
String uuid = null;
FileStoreTable table =
FileStoreTableFactory.create(
fileIO,
getTableLocation(identifier),
tableSchema,
new CatalogEnvironment(
identifier,
uuid,
Lock.factory(
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
null)); // todo: whether need MetastoreClient.Factory
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.objectFileIO(this.fileIO)
.build();
}
return table;
}

private boolean lockEnabled() {
return context.options().getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}

private Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
}

String lock = context.options().get(LOCK_TYPE);
if (lock == null) {
return Optional.empty();
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

private Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(context.options()));
}

private ScheduledExecutorService tokenRefreshExecutor() {
if (refreshExecutor == null) {
synchronized (this) {
Expand Down

0 comments on commit a682c0f

Please sign in to comment.