diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 4514ea4ade1a..fc9af524a045 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -190,10 +190,10 @@ All available procedures are listed below. To expire tags by time. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • -
  • expiration_time: tagCreateTime before which tags will be removed.
  • +
  • older_than: tagCreateTime before which tags will be removed.
  • - CALL sys.expire_tags(table => 'default.T', expiration_time => '2024-09-06 11:00:00') + CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00') diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 1d5bf51d800f..6cbbb51892be 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -123,10 +123,10 @@ This section introduce all available spark procedures about paimon. To expire tags by time. Arguments:
  • table: the target table identifier. Cannot be empty.
  • -
  • expiration_time: tagCreateTime before which tags will be removed.
  • +
  • older_than: tagCreateTime before which tags will be removed.
  • - CALL sys.expire_tags(table => 'default.T', expiration_time => '2024-09-06 11:00:00') + CALL sys.expire_tags(table => 'default.T', older_than => '2024-09-06 11:00:00') diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java index 594fdba37cf1..f7fa4b6518cd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java @@ -42,7 +42,7 @@ public class TagTimeExpire { private final TagDeletion tagDeletion; private final List callbacks; - private LocalDateTime expirationTime; + private LocalDateTime olderThanTime; private TagTimeExpire( SnapshotManager snapshotManager, @@ -66,9 +66,9 @@ public List expire() { if (createTime == null || timeRetained == null) { continue; } - if ((expirationTime == null + if ((olderThanTime == null && LocalDateTime.now().isAfter(createTime.plus(timeRetained))) - || (expirationTime != null && expirationTime.isAfter(createTime))) { + || (olderThanTime != null && olderThanTime.isAfter(createTime))) { LOG.info( "Delete tag {}, because its existence time has reached its timeRetained of {}.", tagName, @@ -80,8 +80,8 @@ public List expire() { return expired; } - public TagTimeExpire withExpirationTime(LocalDateTime expirationTime) { - this.expirationTime = expirationTime; + public TagTimeExpire withOlderThanTime(LocalDateTime olderThanTime) { + this.olderThanTime = olderThanTime; return this; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java index 0a0972e3c839..c1231ed3ad54 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsAction.java @@ -28,22 +28,19 @@ public class ExpireTagsAction extends ActionBase { private final String table; - private final String expirationTime; + private final String olderThan; public ExpireTagsAction( - String warehouse, - String table, - String expirationTime, - Map catalogConfig) { + String warehouse, String table, String olderThan, Map catalogConfig) { super(warehouse, catalogConfig); this.table = table; - this.expirationTime = expirationTime; + this.olderThan = olderThan; } @Override public void run() throws Exception { ExpireTagsProcedure expireTagsProcedure = new ExpireTagsProcedure(); expireTagsProcedure.withCatalog(catalog); - expireTagsProcedure.call(new DefaultProcedureContext(env), table, expirationTime); + expireTagsProcedure.call(new DefaultProcedureContext(env), table, olderThan); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java index 4fa4459a0dbe..6cfe3dbaa55f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireTagsActionFactory.java @@ -26,7 +26,7 @@ public class ExpireTagsActionFactory implements ActionFactory { public static final String IDENTIFIER = "expire_tags"; - private static final String EXPIRATION_TIME = "expiration_time"; + private static final String OLDER_THAN = "older_than"; @Override public String identifier() { @@ -37,11 +37,11 @@ public String identifier() { public Optional create(MultipleParameterToolAdapter params) { String warehouse = params.get(WAREHOUSE); String table = params.get(TABLE); - String expirationTime = params.get(EXPIRATION_TIME); + String olderThan = params.get(OLDER_THAN); Map catalogConfig = optionalConfigMap(params, CATALOG_CONF); ExpireTagsAction expireTagsAction = - new ExpireTagsAction(warehouse, table, expirationTime, catalogConfig); + new ExpireTagsAction(warehouse, table, olderThan, catalogConfig); return Optional.of(expireTagsAction); } @@ -54,6 +54,6 @@ public void printHelp() { System.out.println( " expire_tags --warehouse " + "--table " - + "[--expiration_time ]"); + + "[--older_than ]"); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java index 87e7bed4e559..2080108ff643 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java @@ -43,19 +43,19 @@ public class ExpireTagsProcedure extends ProcedureBase { argument = { @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), @ArgumentHint( - name = "expiration_time", + name = "older_than", type = @DataTypeHint("STRING"), isOptional = true) }) public @DataTypeHint("ROW") Row[] call( - ProcedureContext procedureContext, String tableId, @Nullable String expirationTimeStr) + ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr) throws Catalog.TableNotExistException { TagTimeExpire tagTimeExpire = table(tableId).newExpireTags(); - if (expirationTimeStr != null) { - LocalDateTime expirationTime = - DateTimeUtils.parseTimestampData(expirationTimeStr, 3, TimeZone.getDefault()) + if (olderThanStr != null) { + LocalDateTime olderThanTime = + DateTimeUtils.parseTimestampData(olderThanStr, 3, TimeZone.getDefault()) .toLocalDateTime(); - tagTimeExpire.withExpirationTime(expirationTime); + tagTimeExpire.withOlderThanTime(olderThanTime); } List expired = tagTimeExpire.expire(); return expired.isEmpty() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionITTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionITTest.java index 88572a6240bb..ec37aa858b48 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionITTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionITTest.java @@ -87,11 +87,10 @@ public void testExpireTags() throws Exception { assertThat(table.tagManager().tagExists("tag-4")).isFalse(); assertThat(table.tagManager().tagExists("tag-5")).isFalse(); - // tag-3 as the base expiration_time - LocalDateTime expirationTime = table.tagManager().tag("tag-3").getTagCreateTime(); + // tag-3 as the base older_than time + LocalDateTime olderThanTime = table.tagManager().tag("tag-3").getTagCreateTime(); java.sql.Timestamp timestamp = - new java.sql.Timestamp( - Timestamp.fromLocalDateTime(expirationTime).getMillisecond()); + new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond()); createAction( ExpireTagsAction.class, "expire_tags", @@ -99,7 +98,7 @@ public void testExpireTags() throws Exception { warehouse, "--table", database + ".T", - "--expiration_time", + "--older_than", timestamp.toString()) .run(); // tag-2 expires diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java index bb3d61c33c96..5e58d4aa106f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java @@ -70,7 +70,7 @@ public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws Exception { } @Test - public void testExpireTagsByExpirationTime() throws Exception { + public void testExpireTagsByOlderThanTime() throws Exception { sql( "CREATE TABLE T (id STRING, name STRING," + " PRIMARY KEY (id) NOT ENFORCED)" @@ -94,14 +94,13 @@ public void testExpireTagsByExpirationTime() throws Exception { sql( "CALL sys.create_tag(`table` => 'default.T', tag => 'tag-4', snapshot => 4, time_retained => '1d')"); - // tag-4 as the base expiration_time - LocalDateTime expirationTime = table.tagManager().tag("tag-4").getTagCreateTime(); + // tag-4 as the base older_than time + LocalDateTime olderThanTime = table.tagManager().tag("tag-4").getTagCreateTime(); java.sql.Timestamp timestamp = - new java.sql.Timestamp( - Timestamp.fromLocalDateTime(expirationTime).getMillisecond()); + new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond()); assertThat( sql( - "CALL sys.expire_tags(`table` => 'default.T', expiration_time => '" + "CALL sys.expire_tags(`table` => 'default.T', older_than => '" + timestamp.toString() + "')")) .containsExactlyInAnyOrder(Row.of("tag-1"), Row.of("tag-2"), Row.of("tag-3")); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java index 6b42984600f5..c296f69b22ca 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireTagsProcedure.java @@ -41,7 +41,7 @@ public class ExpireTagsProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", StringType), - ProcedureParameter.optional("expiration_time", StringType) + ProcedureParameter.optional("older_than", StringType) }; private static final StructType OUTPUT_TYPE = @@ -67,17 +67,17 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String expirationTimeStr = args.isNullAt(1) ? null : args.getString(1); + String olderThanStr = args.isNullAt(1) ? null : args.getString(1); return modifyPaimonTable( tableIdent, table -> { TagTimeExpire tagTimeExpire = table.newExpireTags(); - if (expirationTimeStr != null) { - LocalDateTime expirationTime = + if (olderThanStr != null) { + LocalDateTime olderThanTime = DateTimeUtils.parseTimestampData( - expirationTimeStr, 3, TimeZone.getDefault()) + olderThanStr, 3, TimeZone.getDefault()) .toLocalDateTime(); - tagTimeExpire.withExpirationTime(expirationTime); + tagTimeExpire.withOlderThanTime(olderThanTime); } List expired = tagTimeExpire.expire(); return expired.isEmpty() diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala index 3a7f0d878019..7e63e7489d34 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala @@ -66,7 +66,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase { checkAnswer(spark.sql("select tag_name from `T$tags`"), Row("tag-1") :: Row("tag-2") :: Nil) } - test("Paimon procedure: expire tags that createTime less than specified expirationTime") { + test("Paimon procedure: expire tags that createTime less than specified older_than") { spark.sql(s""" |CREATE TABLE T (id STRING, name STRING) |USING PAIMON @@ -91,13 +91,13 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase { "CALL paimon.sys.create_tag(table => 'test.T', tag => 'tag-4', snapshot => 4, time_retained => '1d')") checkAnswer(spark.sql("select count(tag_name) from `T$tags`"), Row(4) :: Nil) - // tag-4 as the base expiration_time - val expirationTime = table.tagManager().tag("tag-4").getTagCreateTime + // tag-4 as the base older_than time + val olderThanTime = table.tagManager().tag("tag-4").getTagCreateTime val timestamp = - new java.sql.Timestamp(Timestamp.fromLocalDateTime(expirationTime).getMillisecond) + new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond) checkAnswer( spark.sql( - s"CALL paimon.sys.expire_tags(table => 'test.T', expiration_time => '${timestamp.toString}')"), + s"CALL paimon.sys.expire_tags(table => 'test.T', older_than => '${timestamp.toString}')"), Row("tag-1") :: Row("tag-2") :: Row("tag-3") :: Nil ) }