Skip to content

Commit

Permalink
drop partition when 404 throw exception
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Dec 30, 2024
1 parent ccd53b2 commit d84198d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 56 deletions.
40 changes: 17 additions & 23 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogUtils;
Expand Down Expand Up @@ -429,16 +428,14 @@ public void close() throws Exception {
}
}

@VisibleForTesting
Map<String, String> fetchOptionsFromServer(
protected Map<String, String> fetchOptionsFromServer(
Map<String, String> headers, Map<String, String> clientProperties) {
ConfigResponse response =
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers);
return response.merge(clientProperties);
}

@VisibleForTesting
Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
GetTableResponse response = getTableResponse(identifier);
FileStoreTable table =
Expand All @@ -461,8 +458,7 @@ Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException
return table;
}

@VisibleForTesting
public List<PartitionEntry> listPartitionsFromServer(Identifier identifier, RowType rowType)
protected List<PartitionEntry> listPartitionsFromServer(Identifier identifier, RowType rowType)
throws TableNotExistException {
try {
ListPartitionsResponse response =
Expand All @@ -485,20 +481,7 @@ public List<PartitionEntry> listPartitionsFromServer(Identifier identifier, RowT
}
}

@VisibleForTesting
PartitionEntry convertToPartitionEntry(PartitionResponse partition, RowType rowType) {
InternalRowSerializer serializer = new InternalRowSerializer(rowType);
GenericRow row = convertSpecToInternalRow(partition.getSpec(), rowType, null);
return new PartitionEntry(
serializer.toBinaryRow(row).copy(),
partition.getRecordCount(),
partition.getFileSizeInBytes(),
partition.getFileCount(),
partition.getLastFileCreationTime());
}

@VisibleForTesting
void cleanPartitionsInFileSystem(Table table, Map<String, String> partitions) {
protected void cleanPartitionsInFileSystem(Table table, Map<String, String> partitions) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
try (FileStoreCommit commit =
fileStoreTable
Expand All @@ -525,7 +508,7 @@ protected GetTableResponse getTableResponse(Identifier identifier)
}

protected boolean dropPartitionMetadata(Identifier identifier, Map<String, String> partitions)
throws TableNoPermissionException {
throws TableNoPermissionException, PartitionNotExistException {
try {
DropPartitionRequest request = new DropPartitionRequest(partitions);
client.delete(
Expand All @@ -535,7 +518,7 @@ protected boolean dropPartitionMetadata(Identifier identifier, Map<String, Strin
headers());
return true;
} catch (NoSuchResourceException ignore) {
return true;
throw new PartitionNotExistException(identifier, partitions);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
Expand Down Expand Up @@ -571,4 +554,15 @@ private ScheduledExecutorService tokenRefreshExecutor() {

return refreshExecutor;
}

private PartitionEntry convertToPartitionEntry(PartitionResponse partition, RowType rowType) {
InternalRowSerializer serializer = new InternalRowSerializer(rowType);
GenericRow row = convertSpecToInternalRow(partition.getSpec(), rowType, null);
return new PartitionEntry(
serializer.toBinaryRow(row).copy(),
partition.getRecordCount(),
partition.getFileSizeInBytes(),
partition.getFileCount(),
partition.getLastFileCreationTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@
import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -424,13 +419,13 @@ public void testDropPartitionWhenPartitionNoExist() throws Exception {
mockResponse(mapper.writeValueAsString(""), 404);
mockResponse(mapper.writeValueAsString(response), 200);
doNothing().when(mockRestCatalog).cleanPartitionsInFileSystem(any(), any());
assertDoesNotThrow(
assertThrows(
Catalog.PartitionNotExistException.class,
() ->
mockRestCatalog.dropPartition(
Identifier.create(databaseName, "table"), partitionSpec));
verify(mockRestCatalog, times(1)).dropPartitionMetadata(any(), any());
verify(mockRestCatalog, times(1)).getTable(any());
verify(mockRestCatalog, times(1)).cleanPartitionsInFileSystem(any(), any());
verify(mockRestCatalog, times(0)).cleanPartitionsInFileSystem(any(), any());
}

@Test
Expand Down Expand Up @@ -494,31 +489,6 @@ public void testListPartitionsFromFile() throws Exception {
verify(mockRestCatalog, times(0)).listPartitionsFromServer(any(), any());
}

@Test
public void convertToPartitionEntryTest() {
Map<String, String> spec = new HashMap<>();
spec.put("a", "1");
spec.put("b", "2");
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "a", DataTypes.INT()));
fields.add(new DataField(1, "b", DataTypes.STRING()));
RowType partitionRowType = new RowType(false, fields);
PartitionResponse partition = new PartitionResponse(spec, 1, 1, 1, 1);
PartitionEntry partitionEntry =
mockRestCatalog.convertToPartitionEntry(partition, partitionRowType);
InternalRowPartitionComputer partitionComputer =
FileStorePathFactory.getPartitionComputer(partitionRowType, null, false);
Map<String, String> partValues =
partitionComputer.generatePartValues(partitionEntry.partition());
for (Map.Entry<String, String> entry : spec.entrySet()) {
assertEquals(entry.getValue(), partValues.get(entry.getKey()));
}
assertEquals(partitionEntry.recordCount(), partition.getRecordCount());
assertEquals(partitionEntry.fileSizeInBytes(), partition.getFileSizeInBytes());
assertEquals(partitionEntry.fileCount(), partition.getFileCount());
assertEquals(partitionEntry.lastFileCreationTime(), partition.getLastFileCreationTime());
}

private void mockResponse(String mockResponse, int httpCode) {
MockResponse mockResponseObj =
new MockResponse()
Expand Down

0 comments on commit d84198d

Please sign in to comment.