Skip to content

Commit

Permalink
[core] Add schema validation for record-level.time-field (apache#4758)
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree authored Dec 30, 2024
1 parent e1c5d2d commit 7badfbb
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.StringUtils;

Expand All @@ -44,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -176,6 +181,36 @@ public static void validateTableSchema(TableSchema schema) {
}
}

String recordLevelTimeField = options.recordLevelTimeField();
if (recordLevelTimeField != null) {
Optional<DataField> field =
schema.fields().stream()
.filter(dataField -> dataField.name().equals(recordLevelTimeField))
.findFirst();
if (!field.isPresent()) {
throw new IllegalArgumentException(
String.format(
"Can not find time field %s for record level expire.",
recordLevelTimeField));
}
DataType dataType = field.get().type();
if (!(dataType instanceof IntType
|| dataType instanceof BigIntType
|| dataType instanceof TimestampType
|| dataType instanceof LocalZonedTimestampType)) {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.",
dataType));
}
if (dataType.isNullable()) {
throw new IllegalArgumentException(
String.format(
"Time field %s for record-level expire should be not null.",
recordLevelTimeField));
}
}

if (options.mergeEngine() == MergeEngine.FIRST_ROW) {
if (options.changelogProducer() != ChangelogProducer.LOOKUP
&& options.changelogProducer() != ChangelogProducer.NONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private void validateTableSchemaExec(Map<String, String> options) {
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
new DataField(1, "f1", DataTypes.INT()),
new DataField(2, "f2", DataTypes.INT()));
new DataField(2, "f2", DataTypes.INT()),
new DataField(3, "f3", DataTypes.STRING()));
List<String> partitionKeys = Collections.singletonList("f0");
List<String> primaryKeys = Collections.singletonList("f1");
options.put(BUCKET.key(), String.valueOf(-1));
Expand Down Expand Up @@ -103,4 +104,22 @@ public void testFromSnapshotConflict() {
.hasMessageContaining(
"[scan.snapshot-id] must be null when you set [scan.timestamp-millis,scan.timestamp]");
}

@Test
public void testRecordLevelTimeField() {
Map<String, String> options = new HashMap<>(2);
options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f0");
options.put(CoreOptions.RECORD_LEVEL_EXPIRE_TIME.key(), "1 m");
assertThatThrownBy(() -> validateTableSchemaExec(options))
.hasMessageContaining("Time field f0 for record-level expire should be not null");

options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f10");
assertThatThrownBy(() -> validateTableSchemaExec(options))
.hasMessageContaining("Can not find time field f10 for record level expire.");

options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f3");
assertThatThrownBy(() -> validateTableSchemaExec(options))
.hasMessageContaining(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is STRING.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,51 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

class RecordLevelExpireTest extends PrimaryKeyTableTestBase {

@Override
@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

@Override
protected Options tableOptions() {
Options options = new Options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void beforeEachBase() throws Exception {
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.BIGINT())
.column("col1", DataTypes.BIGINT().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception {
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.column("col1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void beforeEachBase() throws Exception {
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.TIMESTAMP())
.column("col1", DataTypes.TIMESTAMP().notNull())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.options(tableOptions().toMap())
Expand Down

0 comments on commit 7badfbb

Please sign in to comment.