From 76b7f9c0841d1edfefd07bb52dad186fbe0e9c95 Mon Sep 17 00:00:00 2001
From: chenxinwei <2507217662@qq.com>
Date: Mon, 19 Aug 2024 21:12:39 +0800
Subject: [PATCH] [core] Support setting the time field type for record level
expire (#3992)
---
docs/content/primary-key-table/compaction.md | 3 +-
.../generated/core_configuration.html | 8 +-
.../java/org/apache/paimon/CoreOptions.java | 38 ++++++-
.../apache/paimon/io/RecordLevelExpire.java | 31 +++++-
.../RecordLevelExpireWithMillisecondTest.java | 102 ++++++++++++++++++
5 files changed, 175 insertions(+), 7 deletions(-)
create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
diff --git a/docs/content/primary-key-table/compaction.md b/docs/content/primary-key-table/compaction.md
index 0ba607bca049..11b9e1d984e4 100644
--- a/docs/content/primary-key-table/compaction.md
+++ b/docs/content/primary-key-table/compaction.md
@@ -75,7 +75,8 @@ use [dedicated compaction job]({{< ref "maintenance/dedicated-compaction#dedicat
In compaction, you can configure record-Level expire time to expire records, you should configure:
1. `'record-level.expire-time'`: time retain for records.
-2. `'record-level.time-field'`: time field for record level expire, it should be a seconds INT.
+2. `'record-level.time-field'`: time field for record level expire.
+3. `'record-level.time-field-type'`: time field type for record level expire, it can be seconds-int or millis-long.
Expiration happens in compaction, and there is no strong guarantee to expire records in time.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index ee36a01de8f6..1ed1d4f6c4cb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -585,7 +585,13 @@
record-level.time-field |
(none) |
String |
- Time field for record level expire, it should be a seconds INT. |
+ Time field for record level expire. |
+
+
+ record-level.time-field-type |
+ seconds-int |
+ Enum |
+ Time field type for record level expire, it can be seconds-int or millis-long.
Possible values:- "seconds-int": Timestamps in seconds should be INT type.
- "millis-long": Timestamps in milliseconds should be BIGINT type.
|
rowkind.field |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 961062209a72..dfdc0375d4d6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1251,8 +1251,14 @@ public class CoreOptions implements Serializable {
key("record-level.time-field")
.stringType()
.noDefaultValue()
+ .withDescription("Time field for record level expire.");
+
+ public static final ConfigOption RECORD_LEVEL_TIME_FIELD_TYPE =
+ key("record-level.time-field-type")
+ .enumType(TimeFieldType.class)
+ .defaultValue(TimeFieldType.SECONDS_INT)
.withDescription(
- "Time field for record level expire, it should be a seconds INT.");
+ "Time field type for record level expire, it can be seconds-int or millis-long.");
public static final ConfigOption FIELDS_DEFAULT_AGG_FUNC =
key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
@@ -2058,6 +2064,11 @@ public String recordLevelTimeField() {
return options.get(RECORD_LEVEL_TIME_FIELD);
}
+ @Nullable
+ public TimeFieldType recordLevelTimeFieldType() {
+ return options.get(RECORD_LEVEL_TIME_FIELD_TYPE);
+ }
+
public boolean prepareCommitWaitCompaction() {
if (!needLookup()) {
return false;
@@ -2682,4 +2693,29 @@ public InlineElement getDescription() {
return text(description);
}
}
+
+ /** Time field type for record level expire. */
+ public enum TimeFieldType implements DescribedEnum {
+ SECONDS_INT("seconds-int", "Timestamps in seconds should be INT type."),
+
+ MILLIS_LONG("millis-long", "Timestamps in milliseconds should be BIGINT type.");
+
+ private final String value;
+ private final String description;
+
+ TimeFieldType(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 4a61b66a798a..e1955c1cd427 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
@@ -36,6 +37,7 @@ public class RecordLevelExpire {
private final int timeField;
private final int expireTime;
+ private final CoreOptions.TimeFieldType timeFieldType;
@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
@@ -58,20 +60,26 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
"Can not find time field %s for record level expire.", timeField));
}
+ CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeField);
- if (!(field.type() instanceof IntType)) {
+ if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
+ && field.type() instanceof IntType)
+ || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
+ && field.type() instanceof BigIntType))) {
throw new IllegalArgumentException(
String.format(
"Record level time field should be INT type, but is %s.",
field.type()));
}
- return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
+ return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds(), timeFieldType);
}
- private RecordLevelExpire(int timeField, int expireTime) {
+ private RecordLevelExpire(
+ int timeField, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
this.timeField = timeField;
this.expireTime = expireTime;
+ this.timeFieldType = timeFieldType;
}
public FileReaderFactory wrap(FileReaderFactory readerFactory) {
@@ -85,7 +93,22 @@ private RecordReader wrap(RecordReader reader) {
checkArgument(
!kv.value().isNullAt(timeField),
"Time field for record-level expire should not be null.");
- int recordTime = kv.value().getInt(timeField);
+ final int recordTime;
+ switch (timeFieldType) {
+ case SECONDS_INT:
+ recordTime = kv.value().getInt(timeField);
+ break;
+ case MILLIS_LONG:
+ recordTime = (int) (kv.value().getLong(timeField) / 1000);
+ break;
+ default:
+ String msg =
+ String.format(
+ "type %s not support in %s",
+ timeFieldType,
+ CoreOptions.TimeFieldType.class.getName());
+ throw new IllegalArgumentException(msg);
+ }
return currentTime <= recordTime + expireTime;
});
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
new file mode 100644
index 000000000000..14ec6885c608
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithMillisecondTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class RecordLevelExpireWithMillisecondTest extends PrimaryKeyTableTestBase {
+ @Override
+ @BeforeEach
+ public void beforeEachBase() throws Exception {
+ CatalogContext context =
+ CatalogContext.create(
+ new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.BIGINT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .options(tableOptions().toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ commitUser = UUID.randomUUID().toString();
+ }
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+ options.set(
+ CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.MILLIS_LONG);
+ return options;
+ }
+
+ @Test
+ public void test() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1L), GenericRow.of(1, 2, 2L));
+
+ // can be queried
+ assertThat(query(new int[] {0, 1}))
+ .containsExactlyInAnyOrder(GenericRow.of(1, 1), GenericRow.of(1, 2));
+
+ long currentSecs = System.currentTimeMillis();
+ writeCommit(GenericRow.of(1, 3, currentSecs));
+ writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60 * 1000));
+ Thread.sleep(2000);
+
+ // no compaction, can be queried
+ assertThat(query(new int[] {0, 1}))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1),
+ GenericRow.of(1, 2),
+ GenericRow.of(1, 3),
+ GenericRow.of(1, 4));
+
+ // compact, expired
+ compact(1);
+ assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 4));
+ }
+}