Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Sep 27, 2024
1 parent 64772ed commit b5362c7
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 23 deletions.
39 changes: 25 additions & 14 deletions paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.tag;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
Expand Down Expand Up @@ -61,29 +60,41 @@ private TagTimeExpire(

public List<String> expire() {
List<Pair<Tag, String>> tags = tagManager.tagObjects();
FileIO fileIO = snapshotManager.fileIO();
List<String> expired = new ArrayList<>();
for (Pair<Tag, String> pair : tags) {
Tag tag = pair.getLeft();
String tagName = pair.getRight();
LocalDateTime createTime = tag.getTagCreateTime();
if (createTime == null && olderThanTime != null) {
FileStatus tagFileStatus;
try {
tagFileStatus = fileIO.getFileStatus(tagManager.tagPath(tagName));
} catch (IOException e) {
LOG.warn("Tag path {} not exist, skip expire it.", tagManager.tagPath(tagName));
Duration timeRetained = tag.getTagTimeRetained();
if (createTime == null || timeRetained == null) {
if (olderThanTime != null) {
FileStatus tagFileStatus;
try {
tagFileStatus =
snapshotManager.fileIO().getFileStatus(tagManager.tagPath(tagName));
} catch (IOException e) {
LOG.warn(
"Tag path {} not exist, skip expire it.",
tagManager.tagPath(tagName));
continue;
}
createTime = DateTimeUtils.toLocalDateTime(tagFileStatus.getModificationTime());
} else {
continue;
}
createTime = DateTimeUtils.toLocalDateTime(tagFileStatus.getModificationTime());
}
Duration timeRetained = tag.getTagTimeRetained();
if ((timeRetained != null && LocalDateTime.now().isAfter(createTime.plus(timeRetained)))
|| (olderThanTime != null && olderThanTime.isAfter(createTime))) {
boolean isReachTimeRetained =
timeRetained != null
&& LocalDateTime.now().isAfter(createTime.plus(timeRetained));
boolean isOlderThan = olderThanTime != null && olderThanTime.isAfter(createTime);
if (isReachTimeRetained || isOlderThan) {
LOG.info(
"Delete tag {}, because its existence time has reached its timeRetained of {}.",
"Delete tag {}, because its existence time has reached its timeRetained of {} or"
+ " its createTime {} is olderThan olderThanTime {}.",
tagName,
timeRetained);
timeRetained,
createTime,
olderThanTime);
tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks);
expired.add(tagName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/** Factory to create {@link ExpireTagsAction}. */
public class ExpireTagsActionFactory implements ActionFactory {

public static final String IDENTIFIER = "expire_tags";
private static final String IDENTIFIER = "expire_tags";

private static final String OLDER_THAN = "older_than";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
Expand All @@ -39,7 +38,7 @@
/** A procedure to expire tags by time. */
public class ExpireTagsProcedure extends ProcedureBase {

public static final String IDENTIFIER = "expire_tags";
private static final String IDENTIFIER = "expire_tags";

@ProcedureHint(
argument = {
Expand All @@ -53,8 +52,8 @@ public class ExpireTagsProcedure extends ProcedureBase {
ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
FileStore fileStore = fileStoreTable.store();
TagTimeExpire tagTimeExpire = fileStore.newTagCreationManager().getTagTimeExpire();
TagTimeExpire tagTimeExpire =
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault())
Expand All @@ -64,7 +63,7 @@ public class ExpireTagsProcedure extends ProcedureBase {
List<String> expired = tagTimeExpire.expire();
return expired.isEmpty()
? new Row[] {Row.of("No expired tags.")}
: expired.stream().map(x -> Row.of(x)).toArray(Row[]::new);
: expired.stream().map(Row::of).toArray(Row[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.FileStore;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.DateTimeUtils;
Expand Down Expand Up @@ -74,9 +73,8 @@ public InternalRow[] call(InternalRow args) {
tableIdent,
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();
TagTimeExpire tagTimeExpire =
fileStore.newTagCreationManager().getTagTimeExpire();
fileStoreTable.store().newTagCreationManager().getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(
Expand Down

0 comments on commit b5362c7

Please sign in to comment.