diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index c70a9c7d69d1..ef073420108b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -18,14 +18,21 @@ package org.apache.paimon.rest; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; +import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogLockContext; +import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.rest.auth.AuthSession; @@ -47,8 +54,14 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; +import org.apache.paimon.table.object.ObjectTable; +import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -58,13 +71,18 @@ import java.io.UncheckedIOException; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; +import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; /** A catalog implementation for REST. */ @@ -122,7 +140,7 @@ public RESTCatalog(CatalogContext catalogContext) { options, catalogContext.preferIO(), catalogContext.fallbackIO()); this.resourcePaths = ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX)); - this.fileIO = getFileIOFromOptions(catalogContext); + this.fileIO = getFileIOFromOptions(context); } // todo: whether it's ok @@ -245,7 +263,13 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep @Override public Table getTable(Identifier identifier) throws TableNotExistException { - throw new UnsupportedOperationException(); + if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { + return getAllInSystemDatabase(identifier); + } else if (identifier.isSystemTable()) { + return getSystemTable(identifier); + } else { + return getDataOrFormatTable(identifier); + } } protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { @@ -358,6 +382,119 @@ private void updateTable(Identifier fromTable, Identifier toTable, List>> getAllTablePathsFunction = + () -> { + try { + Map> allPaths = new HashMap<>(); + for (String database : listDatabases()) { + Map tableMap = + allPaths.computeIfAbsent(database, d -> new HashMap<>()); + for (String table : listTables(database)) { + Path tableLocation = + getTableLocation(Identifier.create(database, table)); + tableMap.put(table, tableLocation); + } + } + return allPaths; + } catch (DatabaseNotExistException e) { + throw new RuntimeException("Database is deleted while listing", e); + } + }; + Table table = + SystemTableLoader.loadGlobal( + tableName, fileIO, getAllTablePathsFunction, context.options()); + if (table == null) { + throw new TableNotExistException(identifier); + } + return table; + } + + private Table getSystemTable(Identifier identifier) throws TableNotExistException { + Table originTable = + getDataOrFormatTable( + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getBranchName(), + null)); + if (!(originTable instanceof FileStoreTable)) { + throw new UnsupportedOperationException( + String.format( + "Only data table support system tables, but this table %s is %s.", + identifier, originTable.getClass())); + } + Table table = + SystemTableLoader.load( + Preconditions.checkNotNull(identifier.getSystemTableName()), + (FileStoreTable) originTable); + if (table == null) { + throw new TableNotExistException(identifier); + } + return table; + } + + private Path getTableLocation(Identifier identifier) { + return new Path( + new Path(warehouse(), identifier.getDatabaseName() + DB_SUFFIX), + identifier.getTableName()); + } + + private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException { + Preconditions.checkArgument(identifier.getSystemTableName() == null); + TableSchema tableSchema = getDataTableSchema(identifier); + String uuid = null; + FileStoreTable table = + FileStoreTableFactory.create( + fileIO, + getTableLocation(identifier), + tableSchema, + new CatalogEnvironment( + identifier, + uuid, + Lock.factory( + lockFactory().orElse(null), + lockContext().orElse(null), + identifier), + null)); // todo: whether need MetastoreClient.Factory + CoreOptions options = table.coreOptions(); + if (options.type() == TableType.OBJECT_TABLE) { + String objectLocation = options.objectLocation(); + checkNotNull(objectLocation, "Object location should not be null for object table."); + table = + ObjectTable.builder() + .underlyingTable(table) + .objectLocation(objectLocation) + .objectFileIO(this.fileIO) + .build(); + } + return table; + } + + private boolean lockEnabled() { + return context.options().getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore()); + } + + private Optional lockFactory() { + if (!lockEnabled()) { + return Optional.empty(); + } + + String lock = context.options().get(LOCK_TYPE); + if (lock == null) { + return Optional.empty(); + } + + return Optional.of( + FactoryUtil.discoverFactory( + AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock)); + } + + private Optional lockContext() { + return Optional.of(CatalogLockContext.fromOptions(context.options())); + } + private ScheduledExecutorService tokenRefreshExecutor() { if (refreshExecutor == null) { synchronized (this) { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index 5a2a4fb8ac65..5f40b15f4f9d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -25,6 +25,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -81,4 +82,12 @@ public static AlterDatabaseResponse alterDatabaseResponse() { return new AlterDatabaseResponse( Lists.newArrayList("remove"), Lists.newArrayList("add"), new ArrayList<>()); } + + public static ListTablesResponse listTablesResponse() { + return new ListTablesResponse(Lists.newArrayList("table")); + } + + public static ListTablesResponse listTablesEmptyResponse() { + return new ListTablesResponse(Lists.newArrayList()); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 0bd5e8e42e8f..7fe3255f43a9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -28,6 +28,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -36,7 +37,9 @@ import okhttp3.mockwebserver.MockWebServer; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.util.ArrayList; @@ -51,7 +54,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** Test for REST Catalog. */ public class RESTCatalogTest { @@ -61,6 +63,8 @@ public class RESTCatalogTest { private RESTCatalog restCatalog; private RESTCatalog mockRestCatalog; private CatalogContext context; + private String warehouseStr; + @Rule public TemporaryFolder folder = new TemporaryFolder(); @Before public void setUp() throws IOException { @@ -72,10 +76,14 @@ public void setUp() throws IOException { String initToken = "init_token"; options.set(RESTCatalogOptions.TOKEN, initToken); options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1); + warehouseStr = folder.getRoot().getPath(); String mockResponse = String.format( - "{\"defaults\": {\"%s\": \"%s\"}}", - RESTCatalogInternalOptions.PREFIX.key(), "prefix"); + "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}", + RESTCatalogInternalOptions.PREFIX.key(), + "prefix", + CatalogOptions.WAREHOUSE.key(), + warehouseStr); mockResponse(mockResponse, 200); context = CatalogContext.create(options); restCatalog = new RESTCatalog(context); @@ -90,7 +98,7 @@ public void tearDown() throws IOException { @Test public void testInitFailWhenDefineWarehouse() { Options options = new Options(); - options.set(CatalogOptions.WAREHOUSE, "/a/b/c"); + options.set(CatalogOptions.WAREHOUSE, warehouseStr); assertThrows( IllegalArgumentException.class, () -> new RESTCatalog(CatalogContext.create(options))); @@ -169,8 +177,9 @@ public void testDropDatabaseWhenNoExistAndIgnoreIfNotExistsIsTrue() throws Excep public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws Exception { String name = MockRESTMessage.databaseName(); boolean cascade = false; + ListTablesResponse response = MockRESTMessage.listTablesEmptyResponse(); + mockResponse(mapper.writeValueAsString(response), 200); mockResponse("", 200); - when(mockRestCatalog.listTables(name)).thenReturn(new ArrayList<>()); assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false, cascade)); verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false), eq(cascade)); verify(mockRestCatalog, times(1)).listTables(eq(name)); @@ -180,10 +189,8 @@ public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws Exception { public void testDropDatabaseWhenCascadeIsFalseAndTablesExist() throws Exception { String name = MockRESTMessage.databaseName(); boolean cascade = false; - mockResponse("", 200); - List tables = new ArrayList<>(); - tables.add("t1"); - when(mockRestCatalog.listTables(name)).thenReturn(tables); + ListTablesResponse response = MockRESTMessage.listTablesResponse(); + mockResponse(mapper.writeValueAsString(response), 200); assertThrows( Catalog.DatabaseNotEmptyException.class, () -> mockRestCatalog.dropDatabase(name, false, cascade));