diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java index 33f5ed5a94f7..70efe68e83f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java @@ -44,6 +44,17 @@ public void notifyCreation(String tagName) { } } + @Override + public void notifyDeletion(String tagName) { + LinkedHashMap partitionSpec = new LinkedHashMap<>(); + partitionSpec.put(partitionField, tagName); + try { + client.deletePartition(partitionSpec); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override public void close() throws Exception { client.close(); diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index 615e783303c9..9247d49232c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -33,6 +33,8 @@ public interface MetastoreClient extends AutoCloseable { void addPartition(LinkedHashMap partitionSpec) throws Exception; + void deletePartition(LinkedHashMap partitionSpec) throws Exception; + /** Factory to create {@link MetastoreClient}. */ interface Factory extends Serializable { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d2c840ba9526..ac0f798a4b4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -462,7 +462,12 @@ private void createTag(String tagName, Snapshot fromSnapshot) { @Override public void deleteTag(String tagName) { - tagManager().deleteTag(tagName, store().newTagDeletion(), snapshotManager()); + tagManager() + .deleteTag( + tagName, + store().newTagDeletion(), + snapshotManager(), + store().createTagCallbacks()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java index 397b341d9590..1d20bb89db88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TagCallback.java @@ -26,4 +26,6 @@ public interface TagCallback extends AutoCloseable { void notifyCreation(String tagName); + + void notifyDeletion(String tagName); } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 31a9b19974f3..505454313bd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -155,7 +155,10 @@ private void tryToTag(Snapshot snapshot) { int i = 0; for (List tag : tags.values()) { tagManager.deleteTag( - checkAndGetOneAutoTag(tag), tagDeletion, snapshotManager); + checkAndGetOneAutoTag(tag), + tagDeletion, + snapshotManager, + callbacks); i++; if (i == toDelete) { break; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 90f690053160..134dea459f02 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -126,7 +126,10 @@ public void deleteAllTagsOfOneSnapshot( } public void deleteTag( - String tagName, TagDeletion tagDeletion, SnapshotManager snapshotManager) { + String tagName, + TagDeletion tagDeletion, + SnapshotManager snapshotManager, + List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); @@ -135,12 +138,12 @@ public void deleteTag( // skip file deletion if snapshot exists if (snapshotManager.snapshotExists(taggedSnapshot.id())) { - fileIO.deleteQuietly(tagPath(tagName)); + deleteTagMetaFile(tagName, callbacks); return; } else { // FileIO discovers tags by tag file, so we should read all tags before we delete tag SortedMap> tags = tags(); - fileIO.deleteQuietly(tagPath(tagName)); + deleteTagMetaFile(tagName, callbacks); // skip data file clean if more than 1 tags are created based on this snapshot if (tags.get(taggedSnapshot).size() > 1) { @@ -152,6 +155,17 @@ public void deleteTag( doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); } + private void deleteTagMetaFile(String tagName, List callbacks) { + fileIO.deleteQuietly(tagPath(tagName)); + try { + callbacks.forEach(callback -> callback.notifyDeletion(tagName)); + } finally { + for (TagCallback tagCallback : callbacks) { + IOUtils.closeQuietly(tagCallback); + } + } + } + private void doClean( Snapshot taggedSnapshot, List taggedSnapshots, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 9994c0809a88..ec73eb317e4a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -425,7 +425,8 @@ public void testDeleteTagWithSnapshot() throws Exception { assertPathExists(fileIO, pathFactory.toManifestListPath(manifestListName)); } - tagManager.deleteTag("tag1", store.newTagDeletion(), snapshotManager); + tagManager.deleteTag( + "tag1", store.newTagDeletion(), snapshotManager, Collections.emptyList()); // check data files assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0)); @@ -501,7 +502,8 @@ public void testDeleteTagWithOtherTag() throws Exception { assertPathExists(fileIO, pathFactory.toManifestListPath(manifestListName)); } - tagManager.deleteTag("tag2", store.newTagDeletion(), snapshotManager); + tagManager.deleteTag( + "tag2", store.newTagDeletion(), snapshotManager, Collections.emptyList()); // check data files assertPathExists(fileIO, pathFactory.bucketPath(partition, 0)); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java index ce93166a5bcd..9f5ccb81c8a2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java @@ -109,7 +109,11 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { // randomly delete tags for (int id = 1; id <= latestSnapshotId; id++) { if (random.nextBoolean()) { - tagManager.deleteTag("tag" + id, store.newTagDeletion(), snapshotManager); + tagManager.deleteTag( + "tag" + id, + store.newTagDeletion(), + snapshotManager, + Collections.emptyList()); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index da3425e9b0d3..dcf2c8b0045c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -156,7 +156,7 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { identifiersForTags.remove(checkpointId); String tagName = SAVEPOINT_TAG_PREFIX + checkpointId; if (tagManager.tagExists(tagName)) { - tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index d65ab74140fe..2c898831ec2c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -114,7 +114,8 @@ private void createTag() { try { // If the tag already exists, delete the tag if (tagManager.tagExists(tagName)) { - tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + tagManager.deleteTag( + tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); } // Create a new tag tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks()); @@ -122,7 +123,8 @@ private void createTag() { expireTag(); } catch (Exception e) { if (tagManager.tagExists(tagName)) { - tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + tagManager.deleteTag( + tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); } } } @@ -147,7 +149,11 @@ private void expireTag() { } else { List sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames); for (String toBeDeleted : sortedTagNames) { - tagManager.deleteTag(toBeDeleted, tagDeletion, snapshotManager); + tagManager.deleteTag( + toBeDeleted, + tagDeletion, + snapshotManager, + table.store().createTagCallbacks()); tagCount--; if (tagCount == tagNumRetainedMax) { break; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java index f020f65bd427..d6d7f434bac5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -63,6 +63,7 @@ public void testBatchWriteGeneratorTag() throws Exception { () -> new VersionedSerializerWrapper<>( new ManifestCommittableSerializer()))); + committerOperator.open(); TableCommitImpl tableCommit = table.newCommit(initialCommitUser); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index d24944b34f5d..031b1848a01e 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -91,6 +91,20 @@ public void addPartition(LinkedHashMap partitionSpec) throws Exc } } + @Override + public void deletePartition(LinkedHashMap partitionSpec) throws Exception { + List partitionValues = new ArrayList<>(partitionSpec.values()); + try { + client.dropPartition( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionValues, + false); + } catch (NoSuchObjectException e) { + // do nothing if the partition not exists + } + } + @Override public void close() throws Exception { client.close(); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 80d6c546035f..aacd9087c746 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -933,6 +933,32 @@ public void testAddPartitionsForTag() throws Exception { "4\t40\t2023-10-17"); } + @Test + public void testDeletePartitionForTag() throws Exception { + tEnv.executeSql( + "CREATE TABLE t (\n" + + " k INT,\n" + + " v BIGINT,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ") WITH (\n" + + " 'bucket' = '2',\n" + + " 'metastore.tag-to-partition' = 'dt'\n" + + ")"); + tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await(); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)"); + tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await(); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)"); + + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17"); + + tEnv.executeSql("CALL sys.delete_tag('test_db.t', '2023-10-16')"); + assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) + .containsExactlyInAnyOrder("dt=2023-10-17"); + + assertThat(hiveShell.executeQuery("SELECT k, v FROM t WHERE dt='2023-10-16'")).isEmpty(); + } + @Test public void testHistoryPartitionsCascadeToUpdate() throws Exception { tEnv.executeSql(