From f39c983bfe6e44674870595094269e78d55c7dc2 Mon Sep 17 00:00:00 2001
From: Askwang <135721692+Askwang@users.noreply.github.com>
Date: Thu, 31 Oct 2024 22:50:35 +0800
Subject: [PATCH] fix comments
---
.../generated/core_configuration.html | 2 +-
.../apache/paimon/io/RecordLevelExpire.java | 14 ++--
...ecordLevelExpireWithTimestampBaseTest.java | 66 +++++++++++++++++++
...RecordLevelExpireWithTimestampLTZTest.java | 58 ++++++++++++++++
.../RecordLevelExpireWithTimestampTest.java | 43 +-----------
5 files changed, 135 insertions(+), 48 deletions(-)
create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 6b28ed29cae2..bd30b6794dc7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -627,7 +627,7 @@
record-level.time-field-type |
seconds-int |
Enum |
- Time field type for record level expire, it can be seconds-int,seconds-long or millis-long.
Possible values:- "seconds-int": Timestamps in seconds with INT field type.
- "seconds-long": Timestamps in seconds with BIGINT field type.
- "millis-long": Timestamps in milliseconds with BIGINT field type.
- "timestamp": Timestamp field type.
|
+ Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.
Possible values:- "seconds-int": Timestamps in seconds with INT field type.
- "seconds-long": Timestamps in seconds with BIGINT field type.
- "millis-long": Timestamps in milliseconds with BIGINT field type.
- "timestamp": Timestamp field type.
|
rowkind.field |
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index 24ab8e298423..e548ae44c422 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -24,6 +24,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
@@ -69,7 +70,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
if (!isValidateFieldType(timeFieldType, field)) {
throw new IllegalArgumentException(
String.format(
- "The record level time field type should be one of SECONDS_INT,SECONDS_LONG or MILLIS_LONG, "
+ "The record level time field type should be one of SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
+ "but time field type is %s, field type is %s.",
timeFieldType, field.type()));
}
@@ -80,14 +81,17 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
private static boolean isValidateFieldType(
CoreOptions.TimeFieldType timeFieldType, DataField field) {
+ DataType dataType = field.type();
return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
- && field.type() instanceof IntType)
+ && dataType instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
- && field.type() instanceof BigIntType)
+ && dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
- && field.type() instanceof BigIntType)
+ && dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
- && field.type() instanceof TimestampType));
+ && dataType instanceof TimestampType)
+ || (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
+ && dataType instanceof LocalZonedTimestampType));
}
private RecordLevelExpire(
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
new file mode 100644
index 000000000000..dcc8d246da06
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampBaseTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class RecordLevelExpireWithTimestampBaseTest extends PrimaryKeyTableTestBase {
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
+ options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.TIMESTAMP);
+ return options;
+ }
+
+ @Test
+ public void testTimestampTypeExpire() throws Exception {
+ long millis = System.currentTimeMillis();
+ Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
+ Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
+ Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);
+
+ // create at least two files in one bucket
+ writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2, timestamp2));
+ writeCommit(GenericRow.of(1, 3, timestamp3));
+
+ // no compaction, can be queried
+ assertThat(query(new int[] {0, 1}))
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1), GenericRow.of(1, 2), GenericRow.of(1, 3));
+ Thread.sleep(2000);
+
+ // compact, expired
+ compact(1);
+ assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
new file mode 100644
index 000000000000..af834af276c4
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampLTZTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.UUID;
+
+class RecordLevelExpireWithTimestampLTZTest extends RecordLevelExpireWithTimestampBaseTest {
+
+ @Override
+ @BeforeEach
+ public void beforeEachBase() throws Exception {
+ CatalogContext context =
+ CatalogContext.create(
+ new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ Identifier identifier = new Identifier("default", "T");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .options(tableOptions().toMap())
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ commitUser = UUID.randomUUID().toString();
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
index 0a64b33df19f..3c4add8914f8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/RecordLevelExpireWithTimestampTest.java
@@ -18,29 +18,20 @@
package org.apache.paimon.table;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import java.time.Duration;
import java.util.UUID;
-import static org.assertj.core.api.Assertions.assertThat;
-
-class RecordLevelExpireWithTimestampTest extends PrimaryKeyTableTestBase {
+class RecordLevelExpireWithTimestampTest extends RecordLevelExpireWithTimestampBaseTest {
@Override
@BeforeEach
@@ -64,36 +55,4 @@ public void beforeEachBase() throws Exception {
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}
-
- @Override
- protected Options tableOptions() {
- Options options = new Options();
- options.set(CoreOptions.BUCKET, 1);
- options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
- options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
- options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.TIMESTAMP);
- return options;
- }
-
- @Test
- public void test() throws Exception {
- long millis = System.currentTimeMillis();
- Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
- Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
- Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);
-
- // create at least two files in one bucket
- writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2, timestamp2));
- writeCommit(GenericRow.of(1, 3, timestamp3));
-
- // no compaction, can be queried
- assertThat(query(new int[] {0, 1}))
- .containsExactlyInAnyOrder(
- GenericRow.of(1, 1), GenericRow.of(1, 2), GenericRow.of(1, 3));
- Thread.sleep(2000);
-
- // compact, expired
- compact(1);
- assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
- }
}