Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Sep 25, 2024
1 parent 559f322 commit b48f58b
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -195,12 +194,6 @@ public ExpireSnapshots newExpireChangelog() {
return wrapped.newExpireChangelog();
}

@Override
public TagTimeExpire newExpireTags() {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newExpireTags();
}

@Override
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new PrivilegedFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
Expand Down Expand Up @@ -373,15 +372,6 @@ public ExpireSnapshots newExpireChangelog() {
snapshotManager(), tagManager(), store().newChangelogDeletion());
}

@Override
public TagTimeExpire newExpireTags() {
return TagTimeExpire.create(
snapshotManager(),
tagManager(),
store().newTagDeletion(),
store().createTagCallbacks());
}

@Override
public TableCommitImpl newCommit(String commitUser) {
CoreOptions options = coreOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -200,11 +199,6 @@ public ExpireSnapshots newExpireChangelog() {
return wrapped.newExpireChangelog();
}

@Override
public TagTimeExpire newExpireTags() {
return wrapped.newExpireTags();
}

@Override
public DataTableScan newScan() {
return wrapped.newScan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SimpleFileReader;

Expand Down Expand Up @@ -307,11 +306,6 @@ default ExpireSnapshots newExpireChangelog() {
throw new UnsupportedOperationException();
}

@Override
default TagTimeExpire newExpireTags() {
throw new UnsupportedOperationException();
}

@Override
default ReadBuilder newReadBuilder() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.WriteSelector;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.SimpleFileReader;

import java.time.Duration;
Expand Down Expand Up @@ -253,12 +252,4 @@ default ExpireSnapshots newExpireChangelog() {
"Readonly Table %s does not support expireChangelog.",
this.getClass().getSimpleName()));
}

@Override
default TagTimeExpire newExpireTags() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support expireTags.",
this.getClass().getSimpleName()));
}
}
4 changes: 0 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SimpleFileReader;

Expand Down Expand Up @@ -165,9 +164,6 @@ default void deleteBranches(String branchNames) {
@Experimental
ExpireSnapshots newExpireChangelog();

/** Expire tags. */
TagTimeExpire newExpireTags();

// =============== Read & Write Operations ==================

/** Returns a new read builder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ public static TagAutoManager create(
public TagAutoCreation getTagAutoCreation() {
return tagAutoCreation;
}

public TagTimeExpire getTagTimeExpire() {
return tagTimeExpire;
}
}
21 changes: 16 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

package org.apache.paimon.tag;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,17 +61,24 @@ private TagTimeExpire(

public List<String> expire() {
List<Pair<Tag, String>> tags = tagManager.tagObjects();
FileIO fileIO = snapshotManager.fileIO();
List<String> expired = new ArrayList<>();
for (Pair<Tag, String> pair : tags) {
Tag tag = pair.getLeft();
String tagName = pair.getRight();
LocalDateTime createTime = tag.getTagCreateTime();
Duration timeRetained = tag.getTagTimeRetained();
if (createTime == null || timeRetained == null) {
continue;
if (createTime == null) {
FileStatus tagFileStatus;
try {
tagFileStatus = fileIO.getFileStatus(tagManager.tagPath(tagName));
} catch (IOException e) {
LOG.warn("Tag path {} not exist, skip expire it.", tagManager.tagPath(tagName));
continue;
}
createTime = DateTimeUtils.toLocalDateTime(tagFileStatus.getModificationTime());
}
if ((olderThanTime == null
&& LocalDateTime.now().isAfter(createTime.plus(timeRetained)))
Duration timeRetained = tag.getTagTimeRetained();
if ((timeRetained != null && LocalDateTime.now().isAfter(createTime.plus(timeRetained)))
|| (olderThanTime != null && olderThanTime.isAfter(createTime))) {
LOG.info(
"Delete tag {}, because its existence time has reached its timeRetained of {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.DateTimeUtils;

Expand Down Expand Up @@ -50,7 +52,9 @@ public class ExpireTagsProcedure extends ProcedureBase {
public @DataTypeHint("ROW<expired_tags STRING>") Row[] call(
ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr)
throws Catalog.TableNotExistException {
TagTimeExpire tagTimeExpire = table(tableId).newExpireTags();
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
FileStore fileStore = fileStoreTable.store();
TagTimeExpire tagTimeExpire = fileStore.newTagCreationManager().getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link ExpireTagsAction}. */
public class ExpireTagsActionITTest extends ActionITCaseBase {
public class ExpireTagsActionTest extends ActionITCaseBase {

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -101,8 +101,8 @@ public void testExpireTags() throws Exception {
"--older_than",
timestamp.toString())
.run();
// tag-2 expires
assertThat(table.tagManager().tags().size()).isEqualTo(2);
assertThat(table.tagManager().tagExists("tag-2")).isFalse();
// tag-1,tag-2 expires. tag-1 expired by its file creation time.
assertThat(table.tagManager().tags().size()).isEqualTo(1);
assertThat(table.tagManager().tagExists("tag-3")).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -85,25 +86,51 @@ public void testExpireTagsByOlderThanTime() throws Exception {
}
checkSnapshots(snapshotManager, 1, 5);

sql(
"CALL sys.create_tag(`table` => 'default.T', tag => 'tag-1', snapshot_id => 1, time_retained => '1d')");
sql("CALL sys.create_tag(`table` => 'default.T', tag => 'tag-1', snapshot_id => 1)");
sql(
"CALL sys.create_tag(`table` => 'default.T', tag => 'tag-2', snapshot_id => 2, time_retained => '1d')");
sql(
"CALL sys.create_tag(`table` => 'default.T', tag => 'tag-3', snapshot_id => 3, time_retained => '1d')");
sql(
"CALL sys.create_tag(`table` => 'default.T', tag => 'tag-4', snapshot_id => 4, time_retained => '1d')");
List<Row> sql = sql("select count(tag_name) from `T$tags`");
assertThat(sql("select count(tag_name) from `T$tags`")).containsExactly(Row.of(4L));

// tag-4 as the base older_than time
LocalDateTime olderThanTime = table.tagManager().tag("tag-4").getTagCreateTime();
java.sql.Timestamp timestamp =
new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond());
// no tags expired
assertThat(sql("CALL sys.expire_tags(`table` => 'default.T')"))
.containsExactlyInAnyOrder(Row.of("No expired tags."));

// tag-2 as the base older_than time.
// tag-1 expired by its file creation time.
LocalDateTime olderThanTime1 = table.tagManager().tag("tag-2").getTagCreateTime();
java.sql.Timestamp timestamp1 =
new java.sql.Timestamp(
Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond());
assertThat(
sql(
"CALL sys.expire_tags(`table` => 'default.T', older_than => '"
+ timestamp.toString()
+ timestamp1.toString()
+ "')"))
.containsExactlyInAnyOrder(Row.of("tag-1"), Row.of("tag-2"), Row.of("tag-3"));
.containsExactlyInAnyOrder(Row.of("tag-1"));

sql(
"CALL sys.create_tag(`table` => 'default.T', tag => 'tag-5', snapshot_id => 5, time_retained => '1s')");
Thread.sleep(1000);

// tag-4 as the base older_than time.
// tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained.
LocalDateTime olderThanTime2 = table.tagManager().tag("tag-4").getTagCreateTime();
java.sql.Timestamp timestamp2 =
new java.sql.Timestamp(
Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond());
assertThat(
sql(
"CALL sys.expire_tags(`table` => 'default.T', older_than => '"
+ timestamp2.toString()
+ "')"))
.containsExactlyInAnyOrder(Row.of("tag-2"), Row.of("tag-3"), Row.of("tag-5"));

assertThat(sql("select tag_name from `T$tags`")).containsExactly(Row.of("tag-4"));
}

private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.FileStore;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.DateTimeUtils;

Expand Down Expand Up @@ -71,7 +73,10 @@ public InternalRow[] call(InternalRow args) {
return modifyPaimonTable(
tableIdent,
table -> {
TagTimeExpire tagTimeExpire = table.newExpireTags();
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();
TagTimeExpire tagTimeExpire =
fileStore.newTagCreationManager().getTagTimeExpire();
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase {
}
checkSnapshots(snapshotManager, 1, 5)

spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag-1', snapshot => 1, time_retained => '1d')")
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag-1', snapshot => 1)")
spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag-2', snapshot => 2, time_retained => '1d')")
spark.sql(
Expand All @@ -91,15 +90,38 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase {
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag-4', snapshot => 4, time_retained => '1d')")
checkAnswer(spark.sql("select count(tag_name) from `T$tags`"), Row(4) :: Nil)

// tag-4 as the base older_than time
val olderThanTime = table.tagManager().tag("tag-4").getTagCreateTime
val timestamp =
new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond)
// no tags expired
checkAnswer(
spark.sql("CALL paimon.sys.expire_tags(table => 'test.T')"),
Row("No expired tags.") :: Nil)

// tag-2 as the base older_than time.
// tag-1 expired by its file creation time.
val olderThanTime1 = table.tagManager().tag("tag-2").getTagCreateTime
val timestamp1 =
new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond)
checkAnswer(
spark.sql(
s"CALL paimon.sys.expire_tags(table => 'test.T', older_than => '${timestamp.toString}')"),
Row("tag-1") :: Row("tag-2") :: Row("tag-3") :: Nil
s"CALL paimon.sys.expire_tags(table => 'test.T', older_than => '${timestamp1.toString}')"),
Row("tag-1") :: Nil
)

spark.sql(
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag-5', snapshot => 5, time_retained => '1s')")
Thread.sleep(1000)

// tag-4 as the base older_than time.
// tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained.
val olderThanTime2 = table.tagManager().tag("tag-4").getTagCreateTime
val timestamp2 =
new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond)
checkAnswer(
spark.sql(
s"CALL paimon.sys.expire_tags(table => 'test.T', older_than => '${timestamp2.toString}')"),
Row("tag-2") :: Row("tag-3") :: Row("tag-5") :: Nil
)

checkAnswer(spark.sql("select tag_name from `T$tags`"), Row("tag-4") :: Nil)
}

private def checkSnapshots(sm: SnapshotManager, earliest: Int, latest: Int): Unit = {
Expand Down

0 comments on commit b48f58b

Please sign in to comment.