From 992a4065d30099f55b787e7d5fffc55fbb1ccd36 Mon Sep 17 00:00:00 2001
From: askwang <135721692+askwang@users.noreply.github.com>
Date: Tue, 5 Nov 2024 16:15:51 +0800
Subject: [PATCH] [core] Support timestamp field type in record level expire
(#4417)
---
.../generated/core_configuration.html | 2 +-
.../java/org/apache/paimon/CoreOptions.java | 6 +-
.../apache/paimon/io/RecordLevelExpire.java | 67 ++++++++++++++++---
...ecordLevelExpireWithTimestampBaseTest.java | 66 ++++++++++++++++++
...RecordLevelExpireWithTimestampLTZTest.java | 58 ++++++++++++++++
.../RecordLevelExpireWithTimestampTest.java | 58 ++++++++++++++++
6 files changed, 243 insertions(+), 14 deletions(-)
create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index d9ac0b99bf65..984373a856b4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -633,7 +633,7 @@
record-level.time-field-type |
seconds-int |
Enum |
- Time field type for record level expire, it can be seconds-int,seconds-long or millis-long.
Possible values:- "seconds-int": Timestamps in seconds with INT field type.
- "seconds-long": Timestamps in seconds with BIGINT field type.
- "millis-long": Timestamps in milliseconds with BIGINT field type.
|
+ Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.
Possible values:- "seconds-int": Timestamps in seconds with INT field type.
- "seconds-long": Timestamps in seconds with BIGINT field type.
- "millis-long": Timestamps in milliseconds with BIGINT field type.
- "timestamp": Timestamp field type.
|
rowkind.field |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 6ba8d70e09a5..c20f91e36618 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1358,7 +1358,7 @@ public class CoreOptions implements Serializable {
.enumType(TimeFieldType.class)
.defaultValue(TimeFieldType.SECONDS_INT)
.withDescription(
- "Time field type for record level expire, it can be seconds-int,seconds-long or millis-long.");
+ "Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.");
public static final ConfigOption FIELDS_DEFAULT_AGG_FUNC =
key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
@@ -2926,7 +2926,9 @@ public enum TimeFieldType implements DescribedEnum {
SECONDS_LONG("seconds-long", "Timestamps in seconds with BIGINT field type."),
- MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT field type.");
+ MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT field type."),
+
+ TIMESTAMP("timestamp", "Timestamp field type.");
private final String value;
private final String description;
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 6a5a1f49b1ef..6083ad92a148 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -20,11 +20,15 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
import javax.annotation.Nullable;
@@ -38,6 +42,7 @@ public class RecordLevelExpire {
private final int timeFieldIndex;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;
+ private final DataField rawDataField;
@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
@@ -62,27 +67,44 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeFieldName);
- if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
- && field.type() instanceof IntType)
- || (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
- && field.type() instanceof BigIntType)
- || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
- && field.type() instanceof BigIntType))) {
+ if (!isValidateFieldType(timeFieldType, field)) {
throw new IllegalArgumentException(
String.format(
- "The record level time field type should be one of SECONDS_INT,SECONDS_LONG or MILLIS_LONG, "
- + "but time field type is %s, field type is %s.",
- timeFieldType, field.type()));
+ "The record level time field type should be one of SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
+ + "but time field type is %s, field type is %s. You can specify the type through the config '%s'.",
+ timeFieldType,
+ field.type(),
+ CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE.key()));
}
- return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds(), timeFieldType);
+ return new RecordLevelExpire(
+ fieldIndex, (int) expireTime.getSeconds(), timeFieldType, field);
+ }
+
+ private static boolean isValidateFieldType(
+ CoreOptions.TimeFieldType timeFieldType, DataField field) {
+ DataType dataType = field.type();
+ return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
+ && dataType instanceof IntType)
+ || (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
+ && dataType instanceof BigIntType)
+ || (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
+ && dataType instanceof BigIntType)
+ || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
+ && dataType instanceof TimestampType)
+ || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
+ && dataType instanceof LocalZonedTimestampType));
}
private RecordLevelExpire(
- int timeFieldIndex, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
+ int timeFieldIndex,
+ int expireTime,
+ CoreOptions.TimeFieldType timeFieldType,
+ DataField rawDataField) {
this.timeFieldIndex = timeFieldIndex;
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
+ this.rawDataField = rawDataField;
}
public FileReaderFactory wrap(FileReaderFactory readerFactory) {
@@ -107,6 +129,29 @@ private RecordReader wrap(RecordReader reader) {
case MILLIS_LONG:
recordTime = (int) (kv.value().getLong(timeFieldIndex) / 1000);
break;
+ case TIMESTAMP:
+ Timestamp timestamp;
+ if (rawDataField.type() instanceof TimestampType) {
+ TimestampType timestampType = (TimestampType) rawDataField.type();
+ timestamp =
+ kv.value()
+ .getTimestamp(
+ timeFieldIndex,
+ timestampType.getPrecision());
+ } else if (rawDataField.type() instanceof LocalZonedTimestampType) {
+ LocalZonedTimestampType timestampType =
+ (LocalZonedTimestampType) rawDataField.type();
+ timestamp =
+ kv.value()
+ .getTimestamp(
+ timeFieldIndex,
+ timestampType.getPrecision());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp type: " + rawDataField.type());
+ }
+ recordTime = (int) (timestamp.getMillisecond() / 1000);
+ break;
default:
String msg =
String.format(
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
new file mode 100644
index 000000000000..dcc8d246da06
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class RecordLevelExpireWithTimestampBaseTest extends PrimaryKeyTableTestBase {
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.TIMESTAMP);
+ return options;
+ }
+
+ @Test
+ public void testTimestampTypeExpire() throws Exception {
+ long millis = System.currentTimeMillis();
+ Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
+ Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
+ Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);
+
+ // create at least two files in one bucket
+ writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2, timestamp2));
+ writeCommit(GenericRow.of(1, 3, timestamp3));
+
+ // no compaction, can be queried
+ assertThat(query(new int[] {0, 1}))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1), GenericRow.of(1, 2), GenericRow.of(1, 3));
+ Thread.sleep(2000);
+
+ // compact, expired
+ compact(1);
+ assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
new file mode 100644
index 000000000000..af834af276c4
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.UUID;
+
+class RecordLevelExpireWithTimestampLTZTest extends RecordLevelExpireWithTimestampBaseTest {
+
+ @Override
+ @BeforeEach
+ public void beforeEachBase() throws Exception {
+ CatalogContext context =
+ CatalogContext.create(
+ new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .options(tableOptions().toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ commitUser = UUID.randomUUID().toString();
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
new file mode 100644
index 000000000000..3c4add8914f8
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.UUID;
+
+class RecordLevelExpireWithTimestampTest extends RecordLevelExpireWithTimestampBaseTest {
+
+ @Override
+ @BeforeEach
+ public void beforeEachBase() throws Exception {
+ CatalogContext context =
+ CatalogContext.create(
+ new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.TIMESTAMP())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .options(tableOptions().toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ commitUser = UUID.randomUUID().toString();
+ }
+}