diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index f436254f1dcd..5e9eec28c91a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -299,7 +299,9 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx GetTableResponse.class, headers()); } catch (AlreadyExistsException e) { - throw new TableAlreadyExistException(identifier); + if (!ignoreIfExists) { + throw new TableAlreadyExistException(identifier); + } } } @@ -389,12 +391,49 @@ Map fetchOptionsFromServer( return response.merge(clientProperties); } - private static Map configHeaders(Map properties) { - return RESTUtil.extractPrefixMap(properties, "header."); + // todo: how know which exception to throw + @VisibleForTesting + void updateTable(Identifier fromTable, Identifier toTable, List changes) { + UpdateTableRequest request = + new UpdateTableRequest(fromTable, toTable, new SchemaChanges(changes)); + client.post( + resourcePaths.table(fromTable.getDatabaseName(), fromTable.getTableName()), + request, + GetTableResponse.class, + headers()); } - private Map headers() { - return catalogAuth.getHeaders(); + @VisibleForTesting + Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException { + Preconditions.checkArgument(identifier.getSystemTableName() == null); + TableSchema tableSchema = getDataTableSchema(identifier); + String uuid = null; + FileStoreTable table = + FileStoreTableFactory.create( + fileIO, + newTableLocation(warehouse(), identifier), + tableSchema, + new CatalogEnvironment( + identifier, + uuid, + Lock.factory( + lockFactory(context.options(), fileIO, Optional.empty()) + .orElse(null), + lockContext(context.options()).orElse(null), + identifier), + null)); // todo: whether need MetastoreClient.Factory + CoreOptions options = table.coreOptions(); + if (options.type() == TableType.OBJECT_TABLE) { + String objectLocation = options.objectLocation(); + checkNotNull(objectLocation, "Object location should not be null for object table."); + table = + ObjectTable.builder() + .underlyingTable(table) + .objectLocation(objectLocation) + .objectFileIO(this.fileIO) + .build(); + } + return table; } protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { @@ -413,15 +452,12 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE } } - // todo: how know which exception to throw - private void updateTable(Identifier fromTable, Identifier toTable, List changes) { - UpdateTableRequest request = - new UpdateTableRequest(fromTable, toTable, new SchemaChanges(changes)); - client.post( - resourcePaths.table(fromTable.getDatabaseName(), fromTable.getTableName()), - request, - GetTableResponse.class, - headers()); + private static Map configHeaders(Map properties) { + return RESTUtil.extractPrefixMap(properties, "header."); + } + + private Map headers() { + return catalogAuth.getHeaders(); } private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExistException { @@ -465,38 +501,6 @@ private Table getSystemTable(Identifier identifier) throws TableNotExistExceptio return CatalogUtils.getSystemTable(identifier, originTable); } - private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException { - Preconditions.checkArgument(identifier.getSystemTableName() == null); - TableSchema tableSchema = getDataTableSchema(identifier); - String uuid = null; - FileStoreTable table = - FileStoreTableFactory.create( - fileIO, - newTableLocation(warehouse(), identifier), - tableSchema, - new CatalogEnvironment( - identifier, - uuid, - Lock.factory( - lockFactory(context.options(), fileIO, Optional.empty()) - .orElse(null), - lockContext(context.options()).orElse(null), - identifier), - null)); // todo: whether need MetastoreClient.Factory - CoreOptions options = table.coreOptions(); - if (options.type() == TableType.OBJECT_TABLE) { - String objectLocation = options.objectLocation(); - checkNotNull(objectLocation, "Object location should not be null for object table."); - table = - ObjectTable.builder() - .underlyingTable(table) - .objectLocation(objectLocation) - .objectFileIO(this.fileIO) - .build(); - } - return table; - } - private ScheduledExecutorService tokenRefreshExecutor() { if (refreshExecutor == null) { synchronized (this) { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java index fa66e754ac85..d9220bceb4dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java @@ -18,6 +18,8 @@ package org.apache.paimon.rest; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.schema.TableSchemaSerializer; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeJsonParser; @@ -44,6 +46,11 @@ public static ObjectMapper create() { public static Module createPaimonRestJacksonModule() { SimpleModule module = new SimpleModule("Paimon_REST"); + registerJsonObjects( + module, + TableSchema.class, + TableSchemaSerializer.INSTANCE, + TableSchemaSerializer.INSTANCE); registerJsonObjects( module, DataField.class, diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index 0a9c5bc2d3b6..45cce4c1a786 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -18,6 +18,7 @@ package org.apache.paimon.rest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; @@ -28,17 +29,23 @@ import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -120,6 +127,11 @@ public static CreateTableRequest createTableRequest(String name) { public static UpdateTableRequest updateTableRequest(String fromTableName, String toTableName) { Identifier fromIdentifier = Identifier.create(databaseName(), fromTableName); Identifier toIdentifier = Identifier.create(databaseName(), toTableName); + SchemaChanges changes = new SchemaChanges(getChanges()); + return new UpdateTableRequest(fromIdentifier, toIdentifier, changes); + } + + public static List getChanges() { // add option SchemaChange addOption = SchemaChange.setOption("snapshot.time-retained", "2h"); // remove option @@ -127,6 +139,9 @@ public static UpdateTableRequest updateTableRequest(String fromTableName, String // add column SchemaChange addColumn = SchemaChange.addColumn("col1_after", DataTypes.ARRAY(DataTypes.STRING())); + SchemaChange addColumnMap = + SchemaChange.addColumn( + "col1_map_type", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); RowType rowType = RowType.of( new DataType[] { @@ -139,37 +154,32 @@ public static UpdateTableRequest updateTableRequest(String fromTableName, String DataTypes.MULTISET(DataTypes.VARCHAR(8)) }, new String[] {"pt", "a", "b", "c", "d", "e", "f"}); - SchemaChange addColumnMap = - SchemaChange.addColumn( - "col11_map_type", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())); - SchemaChange addColumnRowType = SchemaChange.addColumn("col11_row_type", rowType); - // // add a column after col1 - // SchemaChange.Move after = SchemaChange.Move.after("col1_after", "col1"); - // SchemaChange addColumnAfterField = - // SchemaChange.addColumn("col7", DataTypes.STRING(), "", after); - // // rename column - // SchemaChange renameColumn = SchemaChange.renameColumn("col3", "col3_new_name"); - // // drop column - // SchemaChange dropColumn = SchemaChange.dropColumn("col6"); - // // update column comment - // SchemaChange updateColumnComment = - // SchemaChange.updateColumnComment(new String[] {"col4"}, "col4 field"); - // // update nested column comment - // SchemaChange updateNestedColumnComment = - // SchemaChange.updateColumnComment(new String[] {"col5", "f1"}, "col5 f1 - // field"); - // // update column type - // SchemaChange updateColumnType = SchemaChange.updateColumnType("col4", - // DataTypes.DOUBLE()); - // // update column position, you need to pass in a parameter of type Move - // SchemaChange updateColumnPosition = - // SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4")); - // // update column nullability - // SchemaChange updateColumnNullability = - // SchemaChange.updateColumnNullability(new String[] {"col4"}, false); - // // update nested column nullability - // SchemaChange updateNestedColumnNullability = - // SchemaChange.updateColumnNullability(new String[] {"col5", "f2"}, false); + SchemaChange addColumnRowType = SchemaChange.addColumn("col_row_type", rowType); + // add a column after col1 + SchemaChange.Move after = SchemaChange.Move.after("col1_after", "col1"); + SchemaChange addColumnAfterField = + SchemaChange.addColumn("col7", DataTypes.STRING(), "", after); + // rename column + SchemaChange renameColumn = SchemaChange.renameColumn("col3", "col3_new_name"); + // drop column + SchemaChange dropColumn = SchemaChange.dropColumn("col6"); + // update column comment + SchemaChange updateColumnComment = + SchemaChange.updateColumnComment(new String[] {"col4"}, "col4 field"); + // update nested column comment + SchemaChange updateNestedColumnComment = + SchemaChange.updateColumnComment(new String[] {"col5", "f1"}, "col5 f1 field"); + // update column type + SchemaChange updateColumnType = SchemaChange.updateColumnType("col4", DataTypes.DOUBLE()); + // update column position, you need to pass in a parameter of type Move + SchemaChange updateColumnPosition = + SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4")); + // update column nullability + SchemaChange updateColumnNullability = + SchemaChange.updateColumnNullability(new String[] {"col4"}, false); + // update nested column nullability + SchemaChange updateNestedColumnNullability = + SchemaChange.updateColumnNullability(new String[] {"col5", "f2"}, false); List schemaChanges = new ArrayList<>(); schemaChanges.add(addOption); @@ -177,16 +187,34 @@ public static UpdateTableRequest updateTableRequest(String fromTableName, String schemaChanges.add(addColumn); schemaChanges.add(addColumnMap); schemaChanges.add(addColumnRowType); - // schemaChanges.add(addColumnAfterField); - // schemaChanges.add(renameColumn); - // schemaChanges.add(dropColumn); - // schemaChanges.add(updateColumnComment); - // schemaChanges.add(updateNestedColumnComment); - // schemaChanges.add(updateColumnType); - // schemaChanges.add(updateColumnPosition); - // schemaChanges.add(updateColumnNullability); - // schemaChanges.add(updateNestedColumnNullability); - SchemaChanges changes = new SchemaChanges(schemaChanges); - return new UpdateTableRequest(fromIdentifier, toIdentifier, changes); + schemaChanges.add(addColumnAfterField); + schemaChanges.add(renameColumn); + schemaChanges.add(dropColumn); + schemaChanges.add(updateColumnComment); + schemaChanges.add(updateNestedColumnComment); + schemaChanges.add(updateColumnType); + schemaChanges.add(updateColumnPosition); + schemaChanges.add(updateColumnNullability); + schemaChanges.add(updateNestedColumnNullability); + return schemaChanges; + } + + public static GetTableResponse getTableResponse() { + return new GetTableResponse("location", tableSchema()); + } + + private static TableSchema tableSchema() { + List fields = + Arrays.asList( + new DataField(0, "f0", new IntType()), + new DataField(1, "f1", new IntType())); + List partitionKeys = Collections.singletonList("f0"); + List primaryKeys = Arrays.asList("f0", "f1"); + Map options = new HashMap<>(); + options.put("option-1", "value-1"); + options.put("option-2", "value-2"); + // set path for test as if not set system will add one + options.put(CoreOptions.PATH.key(), "/a/b/c"); + return new TableSchema(1, fields, 1, partitionKeys, primaryKeys, options, "comment"); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 7fe3255f43a9..a3c4ab34012d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -21,14 +21,19 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Database; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListTablesResponse; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -50,6 +55,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -225,6 +232,133 @@ public void testAlterDatabaseWhenDatabaseNotExistAndIgnoreIfNotExistsIsTrue() th assertDoesNotThrow(() -> mockRestCatalog.alterDatabase(name, new ArrayList<>(), true)); } + @Test + public void testListTables() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + ListTablesResponse response = MockRESTMessage.listTablesResponse(); + mockResponse(mapper.writeValueAsString(response), 200); + List result = restCatalog.listTables(databaseName); + assertEquals(response.getTables().size(), result.size()); + } + + @Test + public void testGetTable() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + GetTableResponse response = MockRESTMessage.getTableResponse(); + mockResponse(mapper.writeValueAsString(response), 200); + Table result = mockRestCatalog.getTable(Identifier.create(databaseName, "table")); + assertEquals(response.getSchema().options().size(), result.options().size()); + verify(mockRestCatalog, times(1)).getDataOrFormatTable(any()); + } + + @Test + public void testCreateTable() throws Exception { + CreateTableRequest request = MockRESTMessage.createTableRequest("table"); + GetTableResponse response = MockRESTMessage.getTableResponse(); + mockResponse(mapper.writeValueAsString(response), 200); + assertDoesNotThrow( + () -> restCatalog.createTable(request.getIdentifier(), request.getSchema(), false)); + } + + @Test + public void testCreateTableWhenTableAlreadyExistAndIgnoreIfExistsIsFalse() throws Exception { + CreateTableRequest request = MockRESTMessage.createTableRequest("table"); + mockResponse("", 409); + assertThrows( + Catalog.TableAlreadyExistException.class, + () -> restCatalog.createTable(request.getIdentifier(), request.getSchema(), false)); + } + + @Test + public void testRenameTable() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + String fromTableName = "fromTable"; + String toTableName = "toTable"; + GetTableResponse response = MockRESTMessage.getTableResponse(); + mockResponse(mapper.writeValueAsString(response), 200); + assertDoesNotThrow( + () -> + mockRestCatalog.renameTable( + Identifier.create(databaseName, fromTableName), + Identifier.create(databaseName, toTableName), + true)); + verify(mockRestCatalog, times(1)).updateTable(any(), any(), anyList()); + } + + @Test + public void testRenameTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + String fromTableName = "fromTable"; + String toTableName = "toTable"; + mockResponse("", 404); + assertThrows( + Catalog.TableNotExistException.class, + () -> + mockRestCatalog.renameTable( + Identifier.create(databaseName, fromTableName), + Identifier.create(databaseName, toTableName), + false)); + } + + @Test + public void testRenameTableWhenToTableAlreadyExist() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + String fromTableName = "fromTable"; + String toTableName = "toTable"; + mockResponse("", 409); + assertThrows( + Catalog.TableAlreadyExistException.class, + () -> + mockRestCatalog.renameTable( + Identifier.create(databaseName, fromTableName), + Identifier.create(databaseName, toTableName), + false)); + } + + @Test + public void testAlterTable() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + List changes = MockRESTMessage.getChanges(); + GetTableResponse response = MockRESTMessage.getTableResponse(); + mockResponse(mapper.writeValueAsString(response), 200); + assertDoesNotThrow( + () -> + mockRestCatalog.alterTable( + Identifier.create(databaseName, "t1"), changes, true)); + verify(mockRestCatalog, times(1)).updateTable(any(), any(), anyList()); + } + + @Test + public void testAlterTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + List changes = MockRESTMessage.getChanges(); + mockResponse("", 404); + assertThrows( + Catalog.TableNotExistException.class, + () -> + mockRestCatalog.alterTable( + Identifier.create(databaseName, "t1"), changes, false)); + } + + @Test + public void testDropTable() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + String tableName = "table"; + mockResponse("", 200); + assertDoesNotThrow( + () -> restCatalog.dropTable(Identifier.create(databaseName, tableName), true)); + } + + @Test + public void testDropTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() throws Exception { + String databaseName = MockRESTMessage.databaseName(); + String tableName = "table"; + mockResponse("", 404); + assertThrows( + Catalog.TableNotExistException.class, + () -> restCatalog.dropTable(Identifier.create(databaseName, tableName), false)); + } + private void mockResponse(String mockResponse, int httpCode) { MockResponse mockResponseObj = new MockResponse() diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java index 987bb1cd4c5f..86de472a0c02 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java @@ -27,7 +27,9 @@ import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; @@ -165,5 +167,23 @@ public void updateTableRequestParseTest() throws Exception { UpdateTableRequest parseData = mapper.readValue(requestStr, UpdateTableRequest.class); assertEquals(request.getFromIdentifier(), parseData.getFromIdentifier()); assertEquals(request.getToIdentifier(), parseData.getToIdentifier()); + assertEquals(request.getChanges(), parseData.getChanges()); + } + + @Test + public void getTableResponseParseTest() throws Exception { + GetTableResponse response = MockRESTMessage.getTableResponse(); + String responseStr = mapper.writeValueAsString(response); + GetTableResponse parseData = mapper.readValue(responseStr, GetTableResponse.class); + assertEquals(response.getLocation(), parseData.getLocation()); + assertEquals(response.getSchema(), parseData.getSchema()); + } + + @Test + public void listTablesResponseParseTest() throws Exception { + ListTablesResponse response = MockRESTMessage.listTablesResponse(); + String responseStr = mapper.writeValueAsString(response); + ListTablesResponse parseData = mapper.readValue(responseStr, ListTablesResponse.class); + assertEquals(response.getTables(), parseData.getTables()); } }