Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Oct 31, 2024
1 parent 58cbfa5 commit f39c983
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 48 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@
<td><h5>record-level.time-field-type</h5></td>
<td style="word-wrap: break-word;">seconds-int</td>
<td><p>Enum</p></td>
<td>Time field type for record level expire, it can be seconds-int,seconds-long or millis-long.<br /><br />Possible values:<ul><li>"seconds-int": Timestamps in seconds with INT field type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field type.</li><li>"timestamp": Timestamp field type.</li></ul></td>
<td>Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.<br /><br />Possible values:<ul><li>"seconds-int": Timestamps in seconds with INT field type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field type.</li><li>"timestamp": Timestamp field type.</li></ul></td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -69,7 +70,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
if (!isValidateFieldType(timeFieldType, field)) {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of SECONDS_INT,SECONDS_LONG or MILLIS_LONG, "
"The record level time field type should be one of SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
+ "but time field type is %s, field type is %s.",
timeFieldType, field.type()));
}
Expand All @@ -80,14 +81,17 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {

private static boolean isValidateFieldType(
CoreOptions.TimeFieldType timeFieldType, DataField field) {
DataType dataType = field.type();
return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
&& field.type() instanceof IntType)
&& dataType instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
&& field.type() instanceof BigIntType)
&& dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
&& field.type() instanceof BigIntType)
&& dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
&& field.type() instanceof TimestampType));
&& dataType instanceof TimestampType)
|| (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
&& dataType instanceof LocalZonedTimestampType));
}

private RecordLevelExpire(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.options.Options;

import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;

abstract class RecordLevelExpireWithTimestampBaseTest extends PrimaryKeyTableTestBase {

@Override
protected Options tableOptions() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.TIMESTAMP);
return options;
}

@Test
public void testTimestampTypeExpire() throws Exception {
long millis = System.currentTimeMillis();
Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);

// create at least two files in one bucket
writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2, timestamp2));
writeCommit(GenericRow.of(1, 3, timestamp3));

// no compaction, can be queried
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(
GenericRow.of(1, 1), GenericRow.of(1, 2), GenericRow.of(1, 3));
Thread.sleep(2000);

// compact, expired
compact(1);
assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;

import java.util.UUID;

class RecordLevelExpireWithTimestampLTZTest extends RecordLevelExpireWithTimestampBaseTest {

@Override
@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,20 @@

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class RecordLevelExpireWithTimestampTest extends PrimaryKeyTableTestBase {
class RecordLevelExpireWithTimestampTest extends RecordLevelExpireWithTimestampBaseTest {

@Override
@BeforeEach
Expand All @@ -64,36 +55,4 @@ public void beforeEachBase() throws Exception {
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

@Override
protected Options tableOptions() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.TIMESTAMP);
return options;
}

@Test
public void test() throws Exception {
long millis = System.currentTimeMillis();
Timestamp timestamp1 = Timestamp.fromEpochMillis(millis - 60 * 1000);
Timestamp timestamp2 = Timestamp.fromEpochMillis(millis);
Timestamp timestamp3 = Timestamp.fromEpochMillis(millis + 60 * 1000);

// create at least two files in one bucket
writeCommit(GenericRow.of(1, 1, timestamp1), GenericRow.of(1, 2, timestamp2));
writeCommit(GenericRow.of(1, 3, timestamp3));

// no compaction, can be queried
assertThat(query(new int[] {0, 1}))
.containsExactlyInAnyOrder(
GenericRow.of(1, 1), GenericRow.of(1, 2), GenericRow.of(1, 3));
Thread.sleep(2000);

// compact, expired
compact(1);
assertThat(query(new int[] {0, 1})).containsExactlyInAnyOrder(GenericRow.of(1, 3));
}
}

0 comments on commit f39c983

Please sign in to comment.