Skip to content

Commit

Permalink
[core] support to drop partition when delete tag (#3042)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Mar 20, 2024
1 parent e1afe3c commit 0e570dc
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ public void notifyCreation(String tagName) {
}
}

@Override
public void notifyDeletion(String tagName) {
LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
partitionSpec.put(partitionField, tagName);
try {
client.deletePartition(partitionSpec);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public interface MetastoreClient extends AutoCloseable {

void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exception;

void deletePartition(LinkedHashMap<String, String> partitionSpec) throws Exception;

/** Factory to create {@link MetastoreClient}. */
interface Factory extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ private void createTag(String tagName, Snapshot fromSnapshot) {

@Override
public void deleteTag(String tagName) {
tagManager().deleteTag(tagName, store().newTagDeletion(), snapshotManager());
tagManager()
.deleteTag(
tagName,
store().newTagDeletion(),
snapshotManager(),
store().createTagCallbacks());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@
public interface TagCallback extends AutoCloseable {

void notifyCreation(String tagName);

void notifyDeletion(String tagName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ private void tryToTag(Snapshot snapshot) {
int i = 0;
for (List<String> tag : tags.values()) {
tagManager.deleteTag(
checkAndGetOneAutoTag(tag), tagDeletion, snapshotManager);
checkAndGetOneAutoTag(tag),
tagDeletion,
snapshotManager,
callbacks);
i++;
if (i == toDelete) {
break;
Expand Down
20 changes: 17 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ public void deleteAllTagsOfOneSnapshot(
}

public void deleteTag(
String tagName, TagDeletion tagDeletion, SnapshotManager snapshotManager) {
String tagName,
TagDeletion tagDeletion,
SnapshotManager snapshotManager,
List<TagCallback> callbacks) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);

Expand All @@ -135,12 +138,12 @@ public void deleteTag(

// skip file deletion if snapshot exists
if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
fileIO.deleteQuietly(tagPath(tagName));
deleteTagMetaFile(tagName, callbacks);
return;
} else {
// FileIO discovers tags by tag file, so we should read all tags before we delete tag
SortedMap<Snapshot, List<String>> tags = tags();
fileIO.deleteQuietly(tagPath(tagName));
deleteTagMetaFile(tagName, callbacks);

// skip data file clean if more than 1 tags are created based on this snapshot
if (tags.get(taggedSnapshot).size() > 1) {
Expand All @@ -152,6 +155,17 @@ public void deleteTag(
doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
}

private void deleteTagMetaFile(String tagName, List<TagCallback> callbacks) {
fileIO.deleteQuietly(tagPath(tagName));
try {
callbacks.forEach(callback -> callback.notifyDeletion(tagName));
} finally {
for (TagCallback tagCallback : callbacks) {
IOUtils.closeQuietly(tagCallback);
}
}
}

private void doClean(
Snapshot taggedSnapshot,
List<Snapshot> taggedSnapshots,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ public void testDeleteTagWithSnapshot() throws Exception {
assertPathExists(fileIO, pathFactory.toManifestListPath(manifestListName));
}

tagManager.deleteTag("tag1", store.newTagDeletion(), snapshotManager);
tagManager.deleteTag(
"tag1", store.newTagDeletion(), snapshotManager, Collections.emptyList());

// check data files
assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 0));
Expand Down Expand Up @@ -501,7 +502,8 @@ public void testDeleteTagWithOtherTag() throws Exception {
assertPathExists(fileIO, pathFactory.toManifestListPath(manifestListName));
}

tagManager.deleteTag("tag2", store.newTagDeletion(), snapshotManager);
tagManager.deleteTag(
"tag2", store.newTagDeletion(), snapshotManager, Collections.emptyList());

// check data files
assertPathExists(fileIO, pathFactory.bucketPath(partition, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ public void testMixedSnapshotAndTagDeletion() throws Exception {
// randomly delete tags
for (int id = 1; id <= latestSnapshotId; id++) {
if (random.nextBoolean()) {
tagManager.deleteTag("tag" + id, store.newTagDeletion(), snapshotManager);
tagManager.deleteTag(
"tag" + id,
store.newTagDeletion(),
snapshotManager,
Collections.emptyList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
identifiersForTags.remove(checkpointId);
String tagName = SAVEPOINT_TAG_PREFIX + checkpointId;
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,17 @@ private void createTag() {
try {
// If the tag already exists, delete the tag
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
tagManager.deleteTag(
tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks());
}
// Create a new tag
tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks());
// Expire the tag
expireTag();
} catch (Exception e) {
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
tagManager.deleteTag(
tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks());
}
}
}
Expand All @@ -147,7 +149,11 @@ private void expireTag() {
} else {
List<String> sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames);
for (String toBeDeleted : sortedTagNames) {
tagManager.deleteTag(toBeDeleted, tagDeletion, snapshotManager);
tagManager.deleteTag(
toBeDeleted,
tagDeletion,
snapshotManager,
table.store().createTagCallbacks());
tagCount--;
if (tagCount == tagNumRetainedMax) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testBatchWriteGeneratorTag() throws Exception {
() ->
new VersionedSerializerWrapper<>(
new ManifestCommittableSerializer())));
committerOperator.open();

TableCommitImpl tableCommit = table.newCommit(initialCommitUser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ public void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exc
}
}

@Override
public void deletePartition(LinkedHashMap<String, String> partitionSpec) throws Exception {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
try {
client.dropPartition(
identifier.getDatabaseName(),
identifier.getObjectName(),
partitionValues,
false);
} catch (NoSuchObjectException e) {
// do nothing if the partition not exists
}
}

@Override
public void close() throws Exception {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,32 @@ public void testAddPartitionsForTag() throws Exception {
"4\t40\t2023-10-17");
}

@Test
public void testDeletePartitionForTag() throws Exception {
tEnv.executeSql(
"CREATE TABLE t (\n"
+ " k INT,\n"
+ " v BIGINT,\n"
+ " PRIMARY KEY (k) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'bucket' = '2',\n"
+ " 'metastore.tag-to-partition' = 'dt'\n"
+ ")");
tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)");
tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await();
tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)");

assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17");

tEnv.executeSql("CALL sys.delete_tag('test_db.t', '2023-10-16')");
assertThat(hiveShell.executeQuery("SHOW PARTITIONS t"))
.containsExactlyInAnyOrder("dt=2023-10-17");

assertThat(hiveShell.executeQuery("SELECT k, v FROM t WHERE dt='2023-10-16'")).isEmpty();
}

@Test
public void testHistoryPartitionsCascadeToUpdate() throws Exception {
tEnv.executeSql(
Expand Down

0 comments on commit 0e570dc

Please sign in to comment.