Skip to content

Commit

Permalink
support get table follow Abstract Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Dec 24, 2024
1 parent 4e2e82e commit 047da11
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 11 deletions.
141 changes: 139 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@

package org.apache.paimon.rest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.auth.AuthSession;
Expand All @@ -47,8 +54,14 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
Expand All @@ -58,13 +71,18 @@
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;

/** A catalog implementation for REST. */
Expand Down Expand Up @@ -122,7 +140,7 @@ public RESTCatalog(CatalogContext catalogContext) {
options, catalogContext.preferIO(), catalogContext.fallbackIO());
this.resourcePaths =
ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX));
this.fileIO = getFileIOFromOptions(catalogContext);
this.fileIO = getFileIOFromOptions(context);
}

// todo: whether it's ok
Expand Down Expand Up @@ -245,7 +263,13 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
throw new UnsupportedOperationException();
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
return getAllInSystemDatabase(identifier);
} else if (identifier.isSystemTable()) {
return getSystemTable(identifier);
} else {
return getDataOrFormatTable(identifier);
}
}

protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Expand Down Expand Up @@ -358,6 +382,119 @@ private void updateTable(Identifier fromTable, Identifier toTable, List<SchemaCh
headers());
}

private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExistException {
String tableName = identifier.getTableName();
Supplier<Map<String, Map<String, Path>>> getAllTablePathsFunction =
() -> {
try {
Map<String, Map<String, Path>> allPaths = new HashMap<>();
for (String database : listDatabases()) {
Map<String, Path> tableMap =
allPaths.computeIfAbsent(database, d -> new HashMap<>());
for (String table : listTables(database)) {
Path tableLocation =
getTableLocation(Identifier.create(database, table));
tableMap.put(table, tableLocation);
}
}
return allPaths;
} catch (DatabaseNotExistException e) {
throw new RuntimeException("Database is deleted while listing", e);
}
};
Table table =
SystemTableLoader.loadGlobal(
tableName, fileIO, getAllTablePathsFunction, context.options());
if (table == null) {
throw new TableNotExistException(identifier);
}
return table;
}

private Table getSystemTable(Identifier identifier) throws TableNotExistException {
Table originTable =
getDataOrFormatTable(
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null));
if (!(originTable instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
String.format(
"Only data table support system tables, but this table %s is %s.",
identifier, originTable.getClass()));
}
Table table =
SystemTableLoader.load(
Preconditions.checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
return table;
}

private Path getTableLocation(Identifier identifier) {
return new Path(
new Path(warehouse(), identifier.getDatabaseName() + DB_SUFFIX),
identifier.getTableName());
}

private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableSchema tableSchema = getDataTableSchema(identifier);
String uuid = null;
FileStoreTable table =
FileStoreTableFactory.create(
fileIO,
getTableLocation(identifier),
tableSchema,
new CatalogEnvironment(
identifier,
uuid,
Lock.factory(
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
null)); // todo: whether need MetastoreClient.Factory
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.objectFileIO(this.fileIO)
.build();
}
return table;
}

private boolean lockEnabled() {
return context.options().getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}

private Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
}

String lock = context.options().get(LOCK_TYPE);
if (lock == null) {
return Optional.empty();
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

private Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(context.options()));
}

private ScheduledExecutorService tokenRefreshExecutor() {
if (refreshExecutor == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

Expand Down Expand Up @@ -81,4 +82,12 @@ public static AlterDatabaseResponse alterDatabaseResponse() {
return new AlterDatabaseResponse(
Lists.newArrayList("remove"), Lists.newArrayList("add"), new ArrayList<>());
}

public static ListTablesResponse listTablesResponse() {
return new ListTablesResponse(Lists.newArrayList("table"));
}

public static ListTablesResponse listTablesEmptyResponse() {
return new ListTablesResponse(Lists.newArrayList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -36,7 +37,9 @@
import okhttp3.mockwebserver.MockWebServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -51,7 +54,6 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/** Test for REST Catalog. */
public class RESTCatalogTest {
Expand All @@ -61,6 +63,8 @@ public class RESTCatalogTest {
private RESTCatalog restCatalog;
private RESTCatalog mockRestCatalog;
private CatalogContext context;
private String warehouseStr;
@Rule public TemporaryFolder folder = new TemporaryFolder();

@Before
public void setUp() throws IOException {
Expand All @@ -72,10 +76,14 @@ public void setUp() throws IOException {
String initToken = "init_token";
options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
warehouseStr = folder.getRoot().getPath();
String mockResponse =
String.format(
"{\"defaults\": {\"%s\": \"%s\"}}",
RESTCatalogInternalOptions.PREFIX.key(), "prefix");
"{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
RESTCatalogInternalOptions.PREFIX.key(),
"prefix",
CatalogOptions.WAREHOUSE.key(),
warehouseStr);
mockResponse(mockResponse, 200);
context = CatalogContext.create(options);
restCatalog = new RESTCatalog(context);
Expand All @@ -90,7 +98,7 @@ public void tearDown() throws IOException {
@Test
public void testInitFailWhenDefineWarehouse() {
Options options = new Options();
options.set(CatalogOptions.WAREHOUSE, "/a/b/c");
options.set(CatalogOptions.WAREHOUSE, warehouseStr);
assertThrows(
IllegalArgumentException.class,
() -> new RESTCatalog(CatalogContext.create(options)));
Expand Down Expand Up @@ -169,8 +177,9 @@ public void testDropDatabaseWhenNoExistAndIgnoreIfNotExistsIsTrue() throws Excep
public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws Exception {
String name = MockRESTMessage.databaseName();
boolean cascade = false;
ListTablesResponse response = MockRESTMessage.listTablesEmptyResponse();
mockResponse(mapper.writeValueAsString(response), 200);
mockResponse("", 200);
when(mockRestCatalog.listTables(name)).thenReturn(new ArrayList<>());
assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false, cascade));
verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false), eq(cascade));
verify(mockRestCatalog, times(1)).listTables(eq(name));
Expand All @@ -180,10 +189,8 @@ public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws Exception {
public void testDropDatabaseWhenCascadeIsFalseAndTablesExist() throws Exception {
String name = MockRESTMessage.databaseName();
boolean cascade = false;
mockResponse("", 200);
List<String> tables = new ArrayList<>();
tables.add("t1");
when(mockRestCatalog.listTables(name)).thenReturn(tables);
ListTablesResponse response = MockRESTMessage.listTablesResponse();
mockResponse(mapper.writeValueAsString(response), 200);
assertThrows(
Catalog.DatabaseNotEmptyException.class,
() -> mockRestCatalog.dropDatabase(name, false, cascade));
Expand Down

0 comments on commit 047da11

Please sign in to comment.