From 88be8648c9973e5711ff7ffe540b87865c985dbd Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 30 Dec 2024 19:52:14 +0800 Subject: [PATCH] [rest] Partition methods should check table first in RESTCatalog --- .../paimon/catalog/AbstractCatalog.java | 3 +- .../org/apache/paimon/catalog/Catalog.java | 4 +- .../org/apache/paimon/rest/RESTCatalog.java | 150 +++++++++--------- .../apache/paimon/rest/RESTCatalogTest.java | 15 +- 4 files changed, 82 insertions(+), 90 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index c63d92a144c9..02e662350ffd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -166,8 +166,7 @@ public void createPartition(Identifier identifier, Map partition FileStoreTable table = (FileStoreTable) getTable(tableIdentifier); if (table.partitionKeys().isEmpty() || !table.coreOptions().partitionedTableInMetastore()) { - throw new UnsupportedOperationException( - "The table is not partitioned table in metastore."); + return; } MetastoreClient.Factory metastoreFactory = diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 904c96910746..0e1482c87b80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -247,11 +247,11 @@ void createPartition(Identifier identifier, Map partitionSpec) * Drop the partition of the specify table. * * @param identifier path of the table to drop partition - * @param partitions the partition to be deleted + * @param partition the partition to be deleted * @throws TableNotExistException if the table does not exist * @throws PartitionNotExistException if the partition does not exist */ - void dropPartition(Identifier identifier, Map partitions) + void dropPartition(Identifier identifier, Map partition) throws TableNotExistException, PartitionNotExistException; /** diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index c430e303b276..1a3d47cb2603 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -31,7 +31,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; -import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -65,7 +64,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; import org.apache.paimon.table.object.ObjectTable; -import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; @@ -85,9 +84,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.Collectors; -import static org.apache.paimon.CoreOptions.createCommitUser; +import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; +import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; @@ -360,6 +359,12 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists) @Override public void createPartition(Identifier identifier, Map partitionSpec) throws TableNotExistException { + Table table = getTable(identifier); + Options options = Options.fromMap(table.options()); + if (!options.get(METASTORE_PARTITIONED_TABLE)) { + return; + } + try { CreatePartitionRequest request = new CreatePartitionRequest(identifier, partitionSpec); client.post( @@ -376,27 +381,77 @@ public void createPartition(Identifier identifier, Map partition } @Override - public void dropPartition(Identifier identifier, Map partitions) + public void dropPartition(Identifier identifier, Map partition) throws TableNotExistException, PartitionNotExistException { checkNotSystemTable(identifier, "dropPartition"); - dropPartitionMetadata(identifier, partitions); + Table table = getTable(identifier); - cleanPartitionsInFileSystem(table, partitions); + Options options = Options.fromMap(table.options()); + if (options.get(METASTORE_PARTITIONED_TABLE)) { + try { + client.delete( + resourcePaths.partitions( + identifier.getDatabaseName(), identifier.getTableName()), + new DropPartitionRequest(partition), + headers()); + } catch (NoSuchResourceException ignore) { + throw new PartitionNotExistException(identifier, partition); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } + } + + try (BatchTableCommit commit = + table.newBatchWriteBuilder().withOverwrite(partition).newCommit()) { + commit.commit(Collections.emptyList()); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override public List listPartitions(Identifier identifier) throws TableNotExistException { - FileStoreTable table = (FileStoreTable) getTable(identifier); - boolean whetherSupportListPartitions = - Boolean.parseBoolean( - table.options().get(CoreOptions.METASTORE_PARTITIONED_TABLE.key())); - if (whetherSupportListPartitions) { - RowType rowType = table.schema().logicalPartitionType(); - return listPartitionsFromServer(identifier, rowType); - } else { - return getTable(identifier).newReadBuilder().newScan().listPartitionEntries(); + Table table = getTable(identifier); + Options options = Options.fromMap(table.options()); + if (!options.get(METASTORE_PARTITIONED_TABLE)) { + return table.newReadBuilder().newScan().listPartitionEntries(); } + + ListPartitionsResponse response; + try { + response = + client.get( + resourcePaths.partitions( + identifier.getDatabaseName(), identifier.getTableName()), + ListPartitionsResponse.class, + headers()); + } catch (NoSuchResourceException e) { + throw new TableNotExistException(identifier); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); + } + + if (response == null || response.getPartitions() == null) { + return Collections.emptyList(); + } + + RowType partitionType = table.rowType().project(table.partitionKeys()); + InternalRowSerializer serializer = new InternalRowSerializer(partitionType); + String defaultName = options.get(PARTITION_DEFAULT_NAME); + List result = new ArrayList<>(); + for (PartitionResponse partition : response.getPartitions()) { + GenericRow row = + convertSpecToInternalRow(partition.getSpec(), partitionType, defaultName); + result.add( + new PartitionEntry( + serializer.toBinaryRow(row).copy(), + partition.getRecordCount(), + partition.getFileSizeInBytes(), + partition.getFileCount(), + partition.getLastFileCreationTime())); + } + return result; } @Override @@ -444,41 +499,6 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx return table; } - private List listPartitionsFromServer(Identifier identifier, RowType rowType) - throws TableNotExistException { - try { - ListPartitionsResponse response = - client.get( - resourcePaths.partitions( - identifier.getDatabaseName(), identifier.getTableName()), - ListPartitionsResponse.class, - headers()); - if (response != null && response.getPartitions() != null) { - return response.getPartitions().stream() - .map(p -> convertToPartitionEntry(p, rowType)) - .collect(Collectors.toList()); - } else { - return Collections.emptyList(); - } - } catch (NoSuchResourceException e) { - throw new TableNotExistException(identifier); - } catch (ForbiddenException e) { - throw new TableNoPermissionException(identifier, e); - } - } - - private void cleanPartitionsInFileSystem(Table table, Map partitions) { - FileStoreTable fileStoreTable = (FileStoreTable) table; - try (FileStoreCommit commit = - fileStoreTable - .store() - .newCommit( - createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) { - commit.dropPartitions( - Collections.singletonList(partitions), BatchWriteBuilder.COMMIT_IDENTIFIER); - } - } - private GetTableResponse getTableResponse(Identifier identifier) throws TableNotExistException { try { return client.get( @@ -492,23 +512,6 @@ private GetTableResponse getTableResponse(Identifier identifier) throws TableNot } } - private boolean dropPartitionMetadata(Identifier identifier, Map partitions) - throws TableNoPermissionException, PartitionNotExistException { - try { - DropPartitionRequest request = new DropPartitionRequest(partitions); - client.delete( - resourcePaths.partitions( - identifier.getDatabaseName(), identifier.getTableName()), - request, - headers()); - return true; - } catch (NoSuchResourceException ignore) { - throw new PartitionNotExistException(identifier, partitions); - } catch (ForbiddenException e) { - throw new TableNoPermissionException(identifier, e); - } - } - private static Map configHeaders(Map properties) { return RESTUtil.extractPrefixMap(properties, "header."); } @@ -540,17 +543,6 @@ private ScheduledExecutorService tokenRefreshExecutor() { return refreshExecutor; } - private PartitionEntry convertToPartitionEntry(PartitionResponse partition, RowType rowType) { - InternalRowSerializer serializer = new InternalRowSerializer(rowType); - GenericRow row = convertSpecToInternalRow(partition.getSpec(), rowType, null); - return new PartitionEntry( - serializer.toBinaryRow(row).copy(), - partition.getRecordCount(), - partition.getFileSizeInBytes(), - partition.getFileCount(), - partition.getLastFileCreationTime()); - } - private static FileIO getFileIOFromOptions(CatalogContext context) { try { Options options = context.options(); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 67103aaa5204..c41c1d2a9c1b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -34,7 +34,6 @@ import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; import org.apache.paimon.rest.responses.ListTablesResponse; -import org.apache.paimon.rest.responses.PartitionResponse; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; @@ -332,10 +331,12 @@ public void testDropTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() throws E @Test public void testCreatePartition() throws Exception { String databaseName = MockRESTMessage.databaseName(); + GetTableResponse response = MockRESTMessage.getTableResponse(); + mockResponse(mapper.writeValueAsString(response), 200); + Map partitionSpec = new HashMap<>(); partitionSpec.put("p1", "v1"); - PartitionResponse response = MockRESTMessage.partitionResponse(); - mockResponse(mapper.writeValueAsString(response), 200); + mockResponse(mapper.writeValueAsString(MockRESTMessage.partitionResponse()), 200); assertDoesNotThrow( () -> restCatalog.createPartition( @@ -386,11 +387,12 @@ public void testDropPartition() throws Exception { @Test public void testDropPartitionWhenPartitionNoExist() throws Exception { String databaseName = MockRESTMessage.databaseName(); + GetTableResponse response = MockRESTMessage.getTableResponseEnablePartition(); + mockResponse(mapper.writeValueAsString(response), 200); + Map partitionSpec = new HashMap<>(); - GetTableResponse response = MockRESTMessage.getTableResponse(); partitionSpec.put(response.getSchema().primaryKeys().get(0), "1"); mockResponse(mapper.writeValueAsString(""), 404); - mockResponse(mapper.writeValueAsString(response), 200); assertThrows( Catalog.PartitionNotExistException.class, () -> @@ -418,7 +420,6 @@ public void testDropPartitionWhenTableNoExist() throws Exception { Map partitionSpec = new HashMap<>(); GetTableResponse response = MockRESTMessage.getTableResponse(); partitionSpec.put(response.getSchema().primaryKeys().get(0), "1"); - mockResponse(mapper.writeValueAsString(""), 200); mockResponse("", 404); assertThrows( Catalog.TableNotExistException.class, @@ -442,7 +443,7 @@ public void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception @Test public void testListPartitionsFromFile() throws Exception { String databaseName = MockRESTMessage.databaseName(); - GetTableResponse response = MockRESTMessage.getTableResponse(); + GetTableResponse response = MockRESTMessage.getTableResponseEnablePartition(); mockResponse(mapper.writeValueAsString(response), 200); mockResponse(mapper.writeValueAsString(response), 200); List partitionEntries =