diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java index 2258c01851a4..0b57dfa884e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java @@ -18,7 +18,6 @@ package org.apache.paimon.tag; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; @@ -61,29 +60,41 @@ private TagTimeExpire( public List expire() { List> tags = tagManager.tagObjects(); - FileIO fileIO = snapshotManager.fileIO(); List expired = new ArrayList<>(); for (Pair pair : tags) { Tag tag = pair.getLeft(); String tagName = pair.getRight(); LocalDateTime createTime = tag.getTagCreateTime(); - if (createTime == null && olderThanTime != null) { - FileStatus tagFileStatus; - try { - tagFileStatus = fileIO.getFileStatus(tagManager.tagPath(tagName)); - } catch (IOException e) { - LOG.warn("Tag path {} not exist, skip expire it.", tagManager.tagPath(tagName)); + Duration timeRetained = tag.getTagTimeRetained(); + if (createTime == null || timeRetained == null) { + if (olderThanTime != null) { + FileStatus tagFileStatus; + try { + tagFileStatus = + snapshotManager.fileIO().getFileStatus(tagManager.tagPath(tagName)); + } catch (IOException e) { + LOG.warn( + "Tag path {} not exist, skip expire it.", + tagManager.tagPath(tagName)); + continue; + } + createTime = DateTimeUtils.toLocalDateTime(tagFileStatus.getModificationTime()); + } else { continue; } - createTime = DateTimeUtils.toLocalDateTime(tagFileStatus.getModificationTime()); } - Duration timeRetained = tag.getTagTimeRetained(); - if ((timeRetained != null && LocalDateTime.now().isAfter(createTime.plus(timeRetained))) - || (olderThanTime != null && olderThanTime.isAfter(createTime))) { + boolean isReachTimeRetained = + timeRetained != null + && LocalDateTime.now().isAfter(createTime.plus(timeRetained)); + boolean isOlderThan = olderThanTime != null && olderThanTime.isAfter(createTime); + if (isReachTimeRetained || isOlderThan) { LOG.info( - "Delete tag {}, because its existence time has reached its timeRetained of {}.", + "Delete tag {}, because its existence time has reached its timeRetained of {} or" + + " its createTime {} is olderThan olderThanTime {}.", tagName, - timeRetained); + timeRetained, + createTime, + olderThanTime); tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks); expired.add(tagName); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java index 6cfe3dbaa55f..e9bbb0a3bdc7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java @@ -24,7 +24,7 @@ /** Factory to create {@link ExpireTagsAction}. */ public class ExpireTagsActionFactory implements ActionFactory { - public static final String IDENTIFIER = "expire_tags"; + private static final String IDENTIFIER = "expire_tags"; private static final String OLDER_THAN = "older_than"; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java index 5178a256bec1..3d8af1de70cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.procedure; -import org.apache.paimon.FileStore; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.tag.TagTimeExpire; @@ -39,7 +38,7 @@ /** A procedure to expire tags by time. */ public class ExpireTagsProcedure extends ProcedureBase { - public static final String IDENTIFIER = "expire_tags"; + private static final String IDENTIFIER = "expire_tags"; @ProcedureHint( argument = { @@ -53,8 +52,8 @@ public class ExpireTagsProcedure extends ProcedureBase { ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr) throws Catalog.TableNotExistException { FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); - FileStore fileStore = fileStoreTable.store(); - TagTimeExpire tagTimeExpire = fileStore.newTagCreationManager().getTagTimeExpire(); + TagTimeExpire tagTimeExpire = + fileStoreTable.store().newTagCreationManager().getTagTimeExpire(); if (olderThanStr != null) { LocalDateTime olderThanTime = DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault()) @@ -64,7 +63,7 @@ public class ExpireTagsProcedure extends ProcedureBase { List expired = tagTimeExpire.expire(); return expired.isEmpty() ? new Row[] {Row.of("No expired tags.")} - : expired.stream().map(x -> Row.of(x)).toArray(Row[]::new); + : expired.stream().map(Row::of).toArray(Row[]::new); } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java index 0d872853e88b..d75ca5ee0aac 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java @@ -18,7 +18,6 @@ package org.apache.paimon.spark.procedure; -import org.apache.paimon.FileStore; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.tag.TagTimeExpire; import org.apache.paimon.utils.DateTimeUtils; @@ -74,9 +73,8 @@ public InternalRow[] call(InternalRow args) { tableIdent, table -> { FileStoreTable fileStoreTable = (FileStoreTable) table; - FileStore fileStore = fileStoreTable.store(); TagTimeExpire tagTimeExpire = - fileStore.newTagCreationManager().getTagTimeExpire(); + fileStoreTable.store().newTagCreationManager().getTagTimeExpire(); if (olderThanStr != null) { LocalDateTime olderThanTime = DateTimeUtils.parseTimestampData(