Skip to content

Commit

Permalink
[rest] Partition methods should check table first in RESTCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 30, 2024
1 parent de7a50d commit 88be864
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public void createPartition(Identifier identifier, Map<String, String> partition
FileStoreTable table = (FileStoreTable) getTable(tableIdentifier);

if (table.partitionKeys().isEmpty() || !table.coreOptions().partitionedTableInMetastore()) {
throw new UnsupportedOperationException(
"The table is not partitioned table in metastore.");
return;
}

MetastoreClient.Factory metastoreFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ void createPartition(Identifier identifier, Map<String, String> partitionSpec)
* Drop the partition of the specify table.
*
* @param identifier path of the table to drop partition
* @param partitions the partition to be deleted
* @param partition the partition to be deleted
* @throws TableNotExistException if the table does not exist
* @throws PartitionNotExistException if the partition does not exist
*/
void dropPartition(Identifier identifier, Map<String, String> partitions)
void dropPartition(Identifier identifier, Map<String, String> partition)
throws TableNotExistException, PartitionNotExistException;

/**
Expand Down
150 changes: 71 additions & 79 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -65,7 +64,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -85,9 +84,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
Expand Down Expand Up @@ -360,6 +359,12 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
@Override
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Table table = getTable(identifier);
Options options = Options.fromMap(table.options());
if (!options.get(METASTORE_PARTITIONED_TABLE)) {
return;
}

try {
CreatePartitionRequest request = new CreatePartitionRequest(identifier, partitionSpec);
client.post(
Expand All @@ -376,27 +381,77 @@ public void createPartition(Identifier identifier, Map<String, String> partition
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
public void dropPartition(Identifier identifier, Map<String, String> partition)
throws TableNotExistException, PartitionNotExistException {
checkNotSystemTable(identifier, "dropPartition");
dropPartitionMetadata(identifier, partitions);

Table table = getTable(identifier);
cleanPartitionsInFileSystem(table, partitions);
Options options = Options.fromMap(table.options());
if (options.get(METASTORE_PARTITIONED_TABLE)) {
try {
client.delete(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
new DropPartitionRequest(partition),
headers());
} catch (NoSuchResourceException ignore) {
throw new PartitionNotExistException(identifier, partition);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

try (BatchTableCommit commit =
table.newBatchWriteBuilder().withOverwrite(partition).newCommit()) {
commit.commit(Collections.emptyList());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
FileStoreTable table = (FileStoreTable) getTable(identifier);
boolean whetherSupportListPartitions =
Boolean.parseBoolean(
table.options().get(CoreOptions.METASTORE_PARTITIONED_TABLE.key()));
if (whetherSupportListPartitions) {
RowType rowType = table.schema().logicalPartitionType();
return listPartitionsFromServer(identifier, rowType);
} else {
return getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
Table table = getTable(identifier);
Options options = Options.fromMap(table.options());
if (!options.get(METASTORE_PARTITIONED_TABLE)) {
return table.newReadBuilder().newScan().listPartitionEntries();
}

ListPartitionsResponse response;
try {
response =
client.get(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
ListPartitionsResponse.class,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}

if (response == null || response.getPartitions() == null) {
return Collections.emptyList();
}

RowType partitionType = table.rowType().project(table.partitionKeys());
InternalRowSerializer serializer = new InternalRowSerializer(partitionType);
String defaultName = options.get(PARTITION_DEFAULT_NAME);
List<PartitionEntry> result = new ArrayList<>();
for (PartitionResponse partition : response.getPartitions()) {
GenericRow row =
convertSpecToInternalRow(partition.getSpec(), partitionType, defaultName);
result.add(
new PartitionEntry(
serializer.toBinaryRow(row).copy(),
partition.getRecordCount(),
partition.getFileSizeInBytes(),
partition.getFileCount(),
partition.getLastFileCreationTime()));
}
return result;
}

@Override
Expand Down Expand Up @@ -444,41 +499,6 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx
return table;
}

private List<PartitionEntry> listPartitionsFromServer(Identifier identifier, RowType rowType)
throws TableNotExistException {
try {
ListPartitionsResponse response =
client.get(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
ListPartitionsResponse.class,
headers());
if (response != null && response.getPartitions() != null) {
return response.getPartitions().stream()
.map(p -> convertToPartitionEntry(p, rowType))
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

private void cleanPartitionsInFileSystem(Table table, Map<String, String> partitions) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
try (FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
commit.dropPartitions(
Collections.singletonList(partitions), BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}

private GetTableResponse getTableResponse(Identifier identifier) throws TableNotExistException {
try {
return client.get(
Expand All @@ -492,23 +512,6 @@ private GetTableResponse getTableResponse(Identifier identifier) throws TableNot
}
}

private boolean dropPartitionMetadata(Identifier identifier, Map<String, String> partitions)
throws TableNoPermissionException, PartitionNotExistException {
try {
DropPartitionRequest request = new DropPartitionRequest(partitions);
client.delete(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
return true;
} catch (NoSuchResourceException ignore) {
throw new PartitionNotExistException(identifier, partitions);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

private static Map<String, String> configHeaders(Map<String, String> properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
}
Expand Down Expand Up @@ -540,17 +543,6 @@ private ScheduledExecutorService tokenRefreshExecutor() {
return refreshExecutor;
}

private PartitionEntry convertToPartitionEntry(PartitionResponse partition, RowType rowType) {
InternalRowSerializer serializer = new InternalRowSerializer(rowType);
GenericRow row = convertSpecToInternalRow(partition.getSpec(), rowType, null);
return new PartitionEntry(
serializer.toBinaryRow(row).copy(),
partition.getRecordCount(),
partition.getFileSizeInBytes(),
partition.getFileCount(),
partition.getLastFileCreationTime());
}

private static FileIO getFileIOFromOptions(CatalogContext context) {
try {
Options options = context.options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;

Expand Down Expand Up @@ -332,10 +331,12 @@ public void testDropTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() throws E
@Test
public void testCreatePartition() throws Exception {
String databaseName = MockRESTMessage.databaseName();
GetTableResponse response = MockRESTMessage.getTableResponse();
mockResponse(mapper.writeValueAsString(response), 200);

Map<String, String> partitionSpec = new HashMap<>();
partitionSpec.put("p1", "v1");
PartitionResponse response = MockRESTMessage.partitionResponse();
mockResponse(mapper.writeValueAsString(response), 200);
mockResponse(mapper.writeValueAsString(MockRESTMessage.partitionResponse()), 200);
assertDoesNotThrow(
() ->
restCatalog.createPartition(
Expand Down Expand Up @@ -386,11 +387,12 @@ public void testDropPartition() throws Exception {
@Test
public void testDropPartitionWhenPartitionNoExist() throws Exception {
String databaseName = MockRESTMessage.databaseName();
GetTableResponse response = MockRESTMessage.getTableResponseEnablePartition();
mockResponse(mapper.writeValueAsString(response), 200);

Map<String, String> partitionSpec = new HashMap<>();
GetTableResponse response = MockRESTMessage.getTableResponse();
partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
mockResponse(mapper.writeValueAsString(""), 404);
mockResponse(mapper.writeValueAsString(response), 200);
assertThrows(
Catalog.PartitionNotExistException.class,
() ->
Expand Down Expand Up @@ -418,7 +420,6 @@ public void testDropPartitionWhenTableNoExist() throws Exception {
Map<String, String> partitionSpec = new HashMap<>();
GetTableResponse response = MockRESTMessage.getTableResponse();
partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
mockResponse(mapper.writeValueAsString(""), 200);
mockResponse("", 404);
assertThrows(
Catalog.TableNotExistException.class,
Expand All @@ -442,7 +443,7 @@ public void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception
@Test
public void testListPartitionsFromFile() throws Exception {
String databaseName = MockRESTMessage.databaseName();
GetTableResponse response = MockRESTMessage.getTableResponse();
GetTableResponse response = MockRESTMessage.getTableResponseEnablePartition();
mockResponse(mapper.writeValueAsString(response), 200);
mockResponse(mapper.writeValueAsString(response), 200);
List<PartitionEntry> partitionEntries =
Expand Down

0 comments on commit 88be864

Please sign in to comment.