diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 39b368982ae1..d2c840ba9526 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.SortedMap; import java.util.function.BiConsumer; import static org.apache.paimon.CoreOptions.PATH; @@ -427,11 +428,25 @@ public void rollbackTo(long snapshotId) { @Override public void createTag(String tagName, long fromSnapshotId) { SnapshotManager snapshotManager = snapshotManager(); + Snapshot snapshot = null; + if (snapshotManager.snapshotExists(fromSnapshotId)) { + snapshot = snapshotManager.snapshot(fromSnapshotId); + } else { + SortedMap> tags = tagManager().tags(); + for (Snapshot snap : tags.keySet()) { + if (snap.id() == fromSnapshotId) { + snapshot = snap; + break; + } else if (snap.id() > fromSnapshotId) { + break; + } + } + } checkArgument( - snapshotManager.snapshotExists(fromSnapshotId), + snapshot != null, "Cannot create tag because given snapshot #%s doesn't exist.", fromSnapshotId); - createTag(tagName, snapshotManager.snapshot(fromSnapshotId)); + createTag(tagName, snapshot); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index a29a3e151c76..90f690053160 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -77,18 +77,24 @@ public Path branchTagPath(String branchName, String tagName) { /** Create a tag from given snapshot and save it in the storage. */ public void createTag(Snapshot snapshot, String tagName, List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName); - Path newTagPath = tagPath(tagName); - try { - fileIO.writeFileUtf8(newTagPath, snapshot.toJson()); - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Exception occurs when committing tag '%s' (path %s). " - + "Cannot clean up because we can't determine the success.", - tagName, newTagPath), - e); + // skip create tag for the same snapshot of the same name. + if (tagExists(tagName)) { + Snapshot tagged = taggedSnapshot(tagName); + Preconditions.checkArgument( + tagged.id() == snapshot.id(), "Tag name '%s' already exists.", tagName); + } else { + Path newTagPath = tagPath(tagName); + try { + fileIO.writeFileUtf8(newTagPath, snapshot.toJson()); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Exception occurs when committing tag '%s' (path %s). " + + "Cannot clean up because we can't determine the success.", + tagName, newTagPath), + e); + } } try { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 4cd019568c41..fabd61639207 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -68,6 +68,7 @@ import org.apache.paimon.utils.TagManager; import org.apache.paimon.utils.TraceableFileIO; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -915,6 +916,64 @@ public void testCreateTag() throws Exception { assertThat(tagged.equals(snapshot2)).isTrue(); } + @Test + public void testCreateTagOnExpiredSnapshot() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(SNAPSHOT_NUM_RETAINED_MAX, 1); + conf.set(SNAPSHOT_NUM_RETAINED_MIN, 1); + }); + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + // snapshot 1 + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 1)); + table.createTag("test-tag", 1); + // verify that tag file exist + TagManager tagManager = new TagManager(new TraceableFileIO(), tablePath); + assertThat(tagManager.tagExists("test-tag")).isTrue(); + // verify that test-tag is equal to snapshot 1 + Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot snapshot1 = table.snapshotManager().snapshot(1); + assertThat(tagged.equals(snapshot1)).isTrue(); + // snapshot 2 + write.write(rowData(2, 20, 200L)); + commit.commit(1, write.prepareCommit(false, 2)); + SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); + // The snapshot 1 is expired. + assertThat(snapshotManager.snapshotExists(1)).isFalse(); + table.createTag("test-tag-2", 1); + // verify that tag file exist + assertThat(tagManager.tagExists("test-tag-2")).isTrue(); + // verify that test-tag is equal to snapshot 1 + Snapshot tag2 = tagManager.taggedSnapshot("test-tag-2"); + assertThat(tag2.equals(snapshot1)).isTrue(); + } + } + + @Test + public void testCreateSameTagName() throws Exception { + FileStoreTable table = createFileStoreTable(); + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + // snapshot 1 + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 1)); + // snapshot 2 + write.write(rowData(1, 10, 100L)); + commit.commit(1, write.prepareCommit(false, 2)); + TagManager tagManager = new TagManager(new TraceableFileIO(), tablePath); + table.createTag("test-tag", 1); + // verify that tag file exist + assertThat(tagManager.tagExists("test-tag")).isTrue(); + // Create again + table.createTag("test-tag", 1); + Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2)) + .hasMessageContaining("Tag name 'test-tag' already exists."); + } + } + @Test public void testCreateBranch() throws Exception { FileStoreTable table = createFileStoreTable();