Skip to content

Commit

Permalink
[core] Support timestamp field type in record level expire (apache#4417)
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang authored and guanshi committed Nov 7, 2024
1 parent d6b14ad commit 0b4631f
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@
<td><h5>record-level.time-field-type</h5></td>
<td style="word-wrap: break-word;">seconds-int</td>
<td><p>Enum</p></td>
<td>Time field type for record level expire, it can be seconds-int,seconds-long or millis-long.<br /><br />Possible values:<ul><li>"seconds-int": Timestamps in seconds with INT field type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field type.</li></ul></td>
<td>Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.<br /><br />Possible values:<ul><li>"seconds-int": Timestamps in seconds with INT field type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field type.</li><li>"timestamp": Timestamp field type.</li></ul></td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ public class CoreOptions implements Serializable {
.enumType(TimeFieldType.class)
.defaultValue(TimeFieldType.SECONDS_INT)
.withDescription(
"Time field type for record level expire, it can be seconds-int,seconds-long or millis-long.");
"Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.");

public static final ConfigOption<String> FIELDS_DEFAULT_AGG_FUNC =
key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
Expand Down Expand Up @@ -2926,7 +2926,9 @@ public enum TimeFieldType implements DescribedEnum {

SECONDS_LONG("seconds-long", "Timestamps in seconds with BIGINT field type."),

MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT field type.");
MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT field type."),

TIMESTAMP("timestamp", "Timestamp field type.");

private final String value;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;

import javax.annotation.Nullable;

Expand All @@ -38,6 +42,7 @@ public class RecordLevelExpire {
private final int timeFieldIndex;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;
private final DataField rawDataField;

@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
Expand All @@ -62,27 +67,44 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {

CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeFieldName);
if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
&& field.type() instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
&& field.type() instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
&& field.type() instanceof BigIntType))) {
if (!isValidateFieldType(timeFieldType, field)) {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of SECONDS_INT,SECONDS_LONG or MILLIS_LONG, "
+ "but time field type is %s, field type is %s.",
timeFieldType, field.type()));
"The record level time field type should be one of SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
+ "but time field type is %s, field type is %s. You can specify the type through the config '%s'.",
timeFieldType,
field.type(),
CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE.key()));
}

return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds(), timeFieldType);
return new RecordLevelExpire(
fieldIndex, (int) expireTime.getSeconds(), timeFieldType, field);
}

private static boolean isValidateFieldType(
CoreOptions.TimeFieldType timeFieldType, DataField field) {
DataType dataType = field.type();
return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
&& dataType instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
&& dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
&& dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
&& dataType instanceof TimestampType)
|| (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
&& dataType instanceof LocalZonedTimestampType));
}

private RecordLevelExpire(
int timeFieldIndex, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
int timeFieldIndex,
int expireTime,
CoreOptions.TimeFieldType timeFieldType,
DataField rawDataField) {
this.timeFieldIndex = timeFieldIndex;
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
this.rawDataField = rawDataField;
}

public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactory) {
Expand All @@ -107,6 +129,29 @@ private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
case MILLIS_LONG:
recordTime = (int) (kv.value().getLong(timeFieldIndex) / 1000);
break;
case TIMESTAMP:
Timestamp timestamp;
if (rawDataField.type() instanceof TimestampType) {
TimestampType timestampType = (TimestampType) rawDataField.type();
timestamp =
kv.value()
.getTimestamp(
timeFieldIndex,
timestampType.getPrecision());
} else if (rawDataField.type() instanceof LocalZonedTimestampType) {
LocalZonedTimestampType timestampType =
(LocalZonedTimestampType) rawDataField.type();
timestamp =
kv.value()
.getTimestamp(
timeFieldIndex,
timestampType.getPrecision());
} else {
throw new UnsupportedOperationException(
"Unsupported timestamp type: " + rawDataField.type());
}
recordTime = (int) (timestamp.getMillisecond() / 1000);
break;
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.options.Options;

import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;

abstract class RecordLevelExpireWithTimestampBaseTest extends PrimaryKeyTableTestBase {

@Override
protected Options tableOptions() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.TIMESTAMP);
return options;
}

@Test
public void testTimestampTypeExpire() throws Exception {
long millis = System.currentTimeMillis();
Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);

// create at least two files in one bucket
writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2, timestamp2));
writeCommit(GenericRow.of(1, 3, timestamp3));

// no compaction, can be queried
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(
GenericRow.of(1, 1), GenericRow.of(1, 2), GenericRow.of(1, 3));
Thread.sleep(2000);

// compact, expired
compact(1);
assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;

import java.util.UUID;

class RecordLevelExpireWithTimestampLTZTest extends RecordLevelExpireWithTimestampBaseTest {

@Override
@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;

import java.util.UUID;

class RecordLevelExpireWithTimestampTest extends RecordLevelExpireWithTimestampBaseTest {

@Override
@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.TIMESTAMP())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}
}

0 comments on commit 0b4631f

Please sign in to comment.