Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Change variable name in RecordLevelExpire #4411

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/** A factory to create {@link RecordReader} expires records by time. */
public class RecordLevelExpire {

private final int timeField;
private final int timeFieldIndex;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;

Expand All @@ -46,22 +46,22 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
return null;
}

String timeField = options.recordLevelTimeField();
if (timeField == null) {
String timeFieldName = options.recordLevelTimeField();
if (timeFieldName == null) {
throw new IllegalArgumentException(
"You should set time field for record-level expire.");
}

// should no project here, record level expire only works in compaction
int fieldIndex = rowType.getFieldIndex(timeField);
int fieldIndex = rowType.getFieldIndex(timeFieldName);
if (fieldIndex == -1) {
throw new IllegalArgumentException(
String.format(
"Can not find time field %s for record level expire.", timeField));
"Can not find time field %s for record level expire.", timeFieldName));
}

CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeField);
DataField field = rowType.getField(timeFieldName);
if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
&& field.type() instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
Expand All @@ -79,8 +79,8 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
}

private RecordLevelExpire(
int timeField, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
this.timeField = timeField;
int timeFieldIndex, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
this.timeFieldIndex = timeFieldIndex;
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
}
Expand All @@ -94,18 +94,18 @@ private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
return reader.filter(
kv -> {
checkArgument(
!kv.value().isNullAt(timeField),
!kv.value().isNullAt(timeFieldIndex),
"Time field for record-level expire should not be null.");
final int recordTime;
switch (timeFieldType) {
case SECONDS_INT:
recordTime = kv.value().getInt(timeField);
recordTime = kv.value().getInt(timeFieldIndex);
break;
case SECONDS_LONG:
recordTime = (int) kv.value().getLong(timeField);
recordTime = (int) kv.value().getLong(timeFieldIndex);
break;
case MILLIS_LONG:
recordTime = (int) (kv.value().getLong(timeField) / 1000);
recordTime = (int) (kv.value().getLong(timeFieldIndex) / 1000);
break;
default:
String msg =
Expand Down
Loading