Skip to content

Commit

Permalink
Fix the review content
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhuoyu committed Dec 19, 2023
1 parent c724c04 commit 1ece1d2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ private TagAutoCreation(

this.periodHandler.validateDelay(delay);

SortedMap<Snapshot, String> tags = tagManager.tags();
tags.entrySet().removeIf(entry -> entry.getValue().startsWith("savepoint"));
SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint"));

if (tags.isEmpty()) {
this.nextSnapshot =
Expand Down Expand Up @@ -123,7 +122,7 @@ private void tryToTag(Snapshot snapshot) {
nextTag = periodHandler.nextTagTime(thisTag);

if (numRetainedMax != null) {
SortedMap<Snapshot, String> tags = tagManager.tags();
SortedMap<Snapshot, String> tags = tagManager.tags(t -> !t.startsWith("savepoint"));
if (tags.size() > numRetainedMax) {
int toDelete = tags.size() - numRetainedMax;
int i = 0;
Expand Down
26 changes: 21 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,21 @@ public List<Snapshot> taggedSnapshots() {

/** Get all tagged snapshots with names sorted by snapshot id. */
public SortedMap<Snapshot, String> tags() {
return tags(tagName -> true);
}

/**
* Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate
* determines which tag names should be included in the result. Only snapshots with tag names
* that pass the predicate test are included.
*
* @param filter A Predicate that tests each tag name. Snapshots with tag names that fail the
* test are excluded from the result.
* @return A sorted map of filtered snapshots keyed by their IDs, each associated with its tag
* name.
* @throws RuntimeException if an IOException occurs during retrieval of snapshots.
*/
public SortedMap<Snapshot, String> tags(Predicate<String> filter) {
TreeMap<Snapshot, String> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
List<Path> paths =
Expand All @@ -187,14 +202,15 @@ public SortedMap<Snapshot, String> tags() {
.collect(Collectors.toList());

for (Path path : paths) {
String tagName = path.getName().substring(TAG_PREFIX.length());

if (!filter.test(tagName)) {
continue;
}
// If the tag file is not found, it might be deleted by
// other processes, so just skip this tag
Snapshot.safelyFromPath(fileIO, path)
.ifPresent(
snapshot ->
tags.put(
snapshot,
path.getName().substring(TAG_PREFIX.length())));
.ifPresent(snapshot -> tags.put(snapshot, tagName));
}
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,37 @@ public void testModifyTagPeriod() {
assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-19");
}

@Test
public void testSavepointTag() {
Options options = new Options();
options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK);
options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY);
options.set(TAG_NUM_RETAINED_MAX, 3);
FileStoreTable table;
TableCommitImpl commit;
TagManager tagManager;
table = this.table.copy(options.toMap());

commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
tagManager = table.store().newTagManager();

// test normal creation
commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00")));
assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11");

table.createTag("savepoint-11", 1);

// test newCommit create
commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00")));
assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-18 13");

// test expire old tag
commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:00:00")));
commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00")));
assertThat(tagManager.tags().values())
.containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 14", "2023-07-18 15");
}

private long localZoneMills(String timestamp) {
return LocalDateTime.parse(timestamp)
.atZone(ZoneId.systemDefault())
Expand Down

0 comments on commit 1ece1d2

Please sign in to comment.