Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Dec 23, 2024
1 parent a48fb4e commit 9d987d7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,17 @@ public static void validateTableSchema(TableSchema schema) {
}
}

if (schema.options().containsKey(CoreOptions.RECORD_LEVEL_TIME_FIELD.key())) {
String fieldName = options.recordLevelTimeField();
String recordLevelTimeField = options.recordLevelTimeField();
if (recordLevelTimeField != null) {
Optional<DataField> field =
schema.fields().stream()
.filter(dataField -> dataField.name().equals(fieldName))
.filter(dataField -> dataField.name().equals(recordLevelTimeField))
.findFirst();
if (!field.isPresent()) {
throw new IllegalArgumentException(
String.format(
"Can not find time field %s for record level expire.", fieldName));
"Can not find time field %s for record level expire.",
recordLevelTimeField));
}
DataType dataType = field.get().type();
if (!(dataType instanceof IntType
Expand All @@ -206,7 +207,7 @@ public static void validateTableSchema(TableSchema schema) {
throw new IllegalArgumentException(
String.format(
"Time field %s for record-level expire should be not null.",
fieldName));
recordLevelTimeField));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testFromSnapshotConflict() {

@Test
public void testRecordLevelTimeField() {
Map<String, String> options = new HashMap<>();
Map<String, String> options = new HashMap<>(2);
options.put(CoreOptions.RECORD_LEVEL_TIME_FIELD.key(), "f0");
options.put(CoreOptions.RECORD_LEVEL_EXPIRE_TIME.key(), "1 m");
assertThatThrownBy(() -> validateTableSchemaExec(options))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.assertj.core.api.Assertions.assertThat;

class RecordLevelExpireTest extends PrimaryKeyTableTestBase {

@Override
@BeforeEach
public void beforeEachBase() throws Exception {
Expand Down

0 comments on commit 9d987d7

Please sign in to comment.