Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Sep 19, 2024
1 parent 43e512a commit 0d97581
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 48 deletions.
4 changes: 2 additions & 2 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ All available procedures are listed below.
<td>
To expire tags by time. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>expiration_time: tagCreateTime before which tags will be removed.</li>
<li>older_than: tagCreateTime before which tags will be removed.</li>
</td>
<td>
CALL sys.expire_tags(table => 'default.T', expiration_time => '2024-09-06 11:00:00')
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
</td>
</tr>
<tr>
Expand Down
4 changes: 2 additions & 2 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ This section introduce all available spark procedures about paimon.
<td>
To expire tags by time. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: tagCreateTime before which tags will be removed.</li>
<li>older_than: tagCreateTime before which tags will be removed.</li>
</td>
<td>
CALL sys.expire_tags(table => 'default.T', expiration_time => '2024-09-06 11:00:00')
CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00')
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TagTimeExpire {
private final TagDeletion tagDeletion;
private final List<TagCallback> callbacks;

private LocalDateTime expirationTime;
private LocalDateTime olderThanTime;

private TagTimeExpire(
SnapshotManager snapshotManager,
Expand All @@ -66,9 +66,9 @@ public List<String> expire() {
if (createTime == null || timeRetained == null) {
continue;
}
if ((expirationTime == null
if ((olderThanTime == null
&& LocalDateTime.now().isAfter(createTime.plus(timeRetained)))
|| (expirationTime != null && expirationTime.isAfter(createTime))) {
|| (olderThanTime != null && olderThanTime.isAfter(createTime))) {
LOG.info(
"Delete tag {}, because its existence time has reached its timeRetained of {}.",
tagName,
Expand All @@ -80,8 +80,8 @@ public List<String> expire() {
return expired;
}

public TagTimeExpire withExpirationTime(LocalDateTime expirationTime) {
this.expirationTime = expirationTime;
public TagTimeExpire withOlderThanTime(LocalDateTime olderThanTime) {
this.olderThanTime = olderThanTime;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,19 @@
public class ExpireTagsAction extends ActionBase {

private final String table;
private final String expirationTime;
private final String olderThan;

public ExpireTagsAction(
String warehouse,
String table,
String expirationTime,
Map<String, String> catalogConfig) {
String warehouse, String table, String olderThan, Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
this.table = table;
this.expirationTime = expirationTime;
this.olderThan = olderThan;
}

@Override
public void run() throws Exception {
ExpireTagsProcedure expireTagsProcedure = new ExpireTagsProcedure();
expireTagsProcedure.withCatalog(catalog);
expireTagsProcedure.call(new DefaultProcedureContext(env), table, expirationTime);
expireTagsProcedure.call(new DefaultProcedureContext(env), table, olderThan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class ExpireTagsActionFactory implements ActionFactory {

public static final String IDENTIFIER = "expire_tags";

private static final String EXPIRATION_TIME = "expiration_time";
private static final String OLDER_THAN = "older_than";

@Override
public String identifier() {
Expand All @@ -37,11 +37,11 @@ public String identifier() {
public Optional<Action> create(MultipleParameterToolAdapter params) {
String warehouse = params.get(WAREHOUSE);
String table = params.get(TABLE);
String expirationTime = params.get(EXPIRATION_TIME);
String olderThan = params.get(OLDER_THAN);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

ExpireTagsAction expireTagsAction =
new ExpireTagsAction(warehouse, table, expirationTime, catalogConfig);
new ExpireTagsAction(warehouse, table, olderThan, catalogConfig);
return Optional.of(expireTagsAction);
}

Expand All @@ -54,6 +54,6 @@ public void printHelp() {
System.out.println(
" expire_tags --warehouse <warehouse_path> "
+ "--table <database.table_name> "
+ "[--expiration_time <expiration_time>]");
+ "[--older_than <older_than>]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ public class ExpireTagsProcedure extends ProcedureBase {
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(
name = "expiration_time",
name = "older_than",
type = @DataTypeHint("STRING"),
isOptional = true)
})
public @DataTypeHint("ROW<expired_tags STRING>") Row[] call(
ProcedureContext procedureContext, String tableId, @Nullable String expirationTimeStr)
ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr)
throws Catalog.TableNotExistException {
TagTimeExpire tagTimeExpire = table(tableId).newExpireTags();
if (expirationTimeStr != null) {
LocalDateTime expirationTime =
DateTimeUtils.parseTimestampData(expirationTimeStr, 3, TimeZone.getDefault())
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault())
.toLocalDateTime();
tagTimeExpire.withExpirationTime(expirationTime);
tagTimeExpire.withOlderThanTime(olderThanTime);
}
List<String> expired = tagTimeExpire.expire();
return expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,18 @@ public void testExpireTags() throws Exception {
assertThat(table.tagManager().tagExists("tag-4")).isFalse();
assertThat(table.tagManager().tagExists("tag-5")).isFalse();

// tag-3 as the base expiration_time
LocalDateTime expirationTime = table.tagManager().tag("tag-3").getTagCreateTime();
// tag-3 as the base older_than time
LocalDateTime olderThanTime = table.tagManager().tag("tag-3").getTagCreateTime();
java.sql.Timestamp timestamp =
new java.sql.Timestamp(
Timestamp.fromLocalDateTime(expirationTime).getMillisecond());
new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond());
createAction(
ExpireTagsAction.class,
"expire_tags",
"--warehouse",
warehouse,
"--table",
database + ".T",
"--expiration_time",
"--older_than",
timestamp.toString())
.run();
// tag-2 expires
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws Exception {
}

@Test
public void testExpireTagsByExpirationTime() throws Exception {
public void testExpireTagsByOlderThanTime() throws Exception {
sql(
"CREATE TABLE T (id STRING, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
Expand All @@ -94,14 +94,13 @@ public void testExpireTagsByExpirationTime() throws Exception {
sql(
"CALL sys.create_tag(`table` => 'default.T', tag => 'tag-4', snapshot => 4, time_retained => '1d')");

// tag-4 as the base expiration_time
LocalDateTime expirationTime = table.tagManager().tag("tag-4").getTagCreateTime();
// tag-4 as the base older_than time
LocalDateTime olderThanTime = table.tagManager().tag("tag-4").getTagCreateTime();
java.sql.Timestamp timestamp =
new java.sql.Timestamp(
Timestamp.fromLocalDateTime(expirationTime).getMillisecond());
new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond());
assertThat(
sql(
"CALL sys.expire_tags(`table` => 'default.T', expiration_time => '"
"CALL sys.expire_tags(`table` => 'default.T', older_than => '"
+ timestamp.toString()
+ "')"))
.containsExactlyInAnyOrder(Row.of("tag-1"), Row.of("tag-2"), Row.of("tag-3"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ExpireTagsProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("expiration_time", StringType)
ProcedureParameter.optional("older_than", StringType)
};

private static final StructType OUTPUT_TYPE =
Expand All @@ -67,17 +67,17 @@ public StructType outputType() {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String expirationTimeStr = args.isNullAt(1) ? null : args.getString(1);
String olderThanStr = args.isNullAt(1) ? null : args.getString(1);
return modifyPaimonTable(
tableIdent,
table -> {
TagTimeExpire tagTimeExpire = table.newExpireTags();
if (expirationTimeStr != null) {
LocalDateTime expirationTime =
if (olderThanStr != null) {
LocalDateTime olderThanTime =
DateTimeUtils.parseTimestampData(
expirationTimeStr, 3, TimeZone.getDefault())
olderThanStr, 3, TimeZone.getDefault())
.toLocalDateTime();
tagTimeExpire.withExpirationTime(expirationTime);
tagTimeExpire.withOlderThanTime(olderThanTime);
}
List<String> expired = tagTimeExpire.expire();
return expired.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase {
checkAnswer(spark.sql("select tag_name from `T$tags`"), Row("tag-1") :: Row("tag-2") :: Nil)
}

test("Paimon procedure: expire tags that createTime less than specified expirationTime") {
test("Paimon procedure: expire tags that createTime less than specified older_than") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
Expand All @@ -91,13 +91,13 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase {
"CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag-4', snapshot => 4, time_retained => '1d')")
checkAnswer(spark.sql("select count(tag_name) from `T$tags`"), Row(4) :: Nil)

// tag-4 as the base expiration_time
val expirationTime = table.tagManager().tag("tag-4").getTagCreateTime
// tag-4 as the base older_than time
val olderThanTime = table.tagManager().tag("tag-4").getTagCreateTime
val timestamp =
new java.sql.Timestamp(Timestamp.fromLocalDateTime(expirationTime).getMillisecond)
new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond)
checkAnswer(
spark.sql(
s"CALL paimon.sys.expire_tags(table => 'test.T', expiration_time => '${timestamp.toString}')"),
s"CALL paimon.sys.expire_tags(table => 'test.T', older_than => '${timestamp.toString}')"),
Row("tag-1") :: Row("tag-2") :: Row("tag-3") :: Nil
)
}
Expand Down

0 comments on commit 0d97581

Please sign in to comment.