Skip to content

Commit

Permalink
[core] Support setting the time field type for record level expire (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxinwei authored Aug 19, 2024
1 parent 14d82f9 commit 76b7f9c
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 7 deletions.
3 changes: 2 additions & 1 deletion docs/content/primary-key-table/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ use [dedicated compaction job]({{< ref "maintenance/dedicated-compaction#dedicat
In compaction, you can configure record-Level expire time to expire records, you should configure:

1. `'record-level.expire-time'`: time retain for records.
2. `'record-level.time-field'`: time field for record level expire, it should be a seconds INT.
2. `'record-level.time-field'`: time field for record level expire.
3. `'record-level.time-field-type'`: time field type for record level expire, it can be seconds-int or millis-long.

Expiration happens in compaction, and there is no strong guarantee to expire records in time.

Expand Down
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,13 @@
<td><h5>record-level.time-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Time field for record level expire, it should be a seconds INT.</td>
<td>Time field for record level expire.</td>
</tr>
<tr>
<td><h5>record-level.time-field-type</h5></td>
<td style="word-wrap: break-word;">seconds-int</td>
<td><p>Enum</p></td>
<td>Time field type for record level expire, it can be seconds-int or millis-long.<br /><br />Possible values:<ul><li>"seconds-int": Timestamps in seconds should be INT type.</li><li>"millis-long": Timestamps in milliseconds should be BIGINT type.</li></ul></td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
Expand Down
38 changes: 37 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1251,8 +1251,14 @@ public class CoreOptions implements Serializable {
key("record-level.time-field")
.stringType()
.noDefaultValue()
.withDescription("Time field for record level expire.");

public static final ConfigOption<TimeFieldType> RECORD_LEVEL_TIME_FIELD_TYPE =
key("record-level.time-field-type")
.enumType(TimeFieldType.class)
.defaultValue(TimeFieldType.SECONDS_INT)
.withDescription(
"Time field for record level expire, it should be a seconds INT.");
"Time field type for record level expire, it can be seconds-int or millis-long.");

public static final ConfigOption<String> FIELDS_DEFAULT_AGG_FUNC =
key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
Expand Down Expand Up @@ -2058,6 +2064,11 @@ public String recordLevelTimeField() {
return options.get(RECORD_LEVEL_TIME_FIELD);
}

@Nullable
public TimeFieldType recordLevelTimeFieldType() {
return options.get(RECORD_LEVEL_TIME_FIELD_TYPE);
}

public boolean prepareCommitWaitCompaction() {
if (!needLookup()) {
return false;
Expand Down Expand Up @@ -2682,4 +2693,29 @@ public InlineElement getDescription() {
return text(description);
}
}

/** Time field type for record level expire. */
public enum TimeFieldType implements DescribedEnum {
SECONDS_INT("seconds-int", "Timestamps in seconds should be INT type."),

MILLIS_LONG("millis-long", "Timestamps in milliseconds should be BIGINT type.");

private final String value;
private final String description;

TimeFieldType(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
Expand All @@ -36,6 +37,7 @@ public class RecordLevelExpire {

private final int timeField;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;

@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
Expand All @@ -58,20 +60,26 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
"Can not find time field %s for record level expire.", timeField));
}

CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeField);
if (!(field.type() instanceof IntType)) {
if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
&& field.type() instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
&& field.type() instanceof BigIntType))) {
throw new IllegalArgumentException(
String.format(
"Record level time field should be INT type, but is %s.",
field.type()));
}

return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds(), timeFieldType);
}

private RecordLevelExpire(int timeField, int expireTime) {
private RecordLevelExpire(
int timeField, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
this.timeField = timeField;
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
}

public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactory) {
Expand All @@ -85,7 +93,22 @@ private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
checkArgument(
!kv.value().isNullAt(timeField),
"Time field for record-level expire should not be null.");
int recordTime = kv.value().getInt(timeField);
final int recordTime;
switch (timeFieldType) {
case SECONDS_INT:
recordTime = kv.value().getInt(timeField);
break;
case MILLIS_LONG:
recordTime = (int) (kv.value().getLong(timeField) / 1000);
break;
default:
String msg =
String.format(
"type %s not support in %s",
timeFieldType,
CoreOptions.TimeFieldType.class.getName());
throw new IllegalArgumentException(msg);
}
return currentTime <= recordTime + expireTime;
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class RecordLevelExpireWithMillisecondTest extends PrimaryKeyTableTestBase {
@Override
@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.BIGINT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

@Override
protected Options tableOptions() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(
CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.MILLIS_LONG);
return options;
}

@Test
public void test() throws Exception {
writeCommit(GenericRow.of(1, 1, 1L), GenericRow.of(1, 2, 2L));

// can be queried
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(GenericRow.of(1, 1), GenericRow.of(1, 2));

long currentSecs = System.currentTimeMillis();
writeCommit(GenericRow.of(1, 3, currentSecs));
writeCommit(GenericRow.of(1, 4, currentSecs + 60 * 60 * 1000));
Thread.sleep(2000);

// no compaction, can be queried
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(
GenericRow.of(1, 1),
GenericRow.of(1, 2),
GenericRow.of(1, 3),
GenericRow.of(1, 4));

// compact, expired
compact(1);
assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 4));
}
}

0 comments on commit 76b7f9c

Please sign in to comment.