Skip to content

Commit

Permalink
change name
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Oct 30, 2024
1 parent 97b9b33 commit b106fff
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/** A factory to create {@link RecordReader} expires records by time. */
public class RecordLevelExpire {

private final int timeField;
private final int timeFieldIndex;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;

Expand All @@ -46,22 +46,22 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
return null;
}

String timeField = options.recordLevelTimeField();
if (timeField == null) {
String timeFieldName = options.recordLevelTimeField();
if (timeFieldName == null) {
throw new IllegalArgumentException(
"You should set time field for record-level expire.");
}

// should no project here, record level expire only works in compaction
int fieldIndex = rowType.getFieldIndex(timeField);
int fieldIndex = rowType.getFieldIndex(timeFieldName);
if (fieldIndex == -1) {
throw new IllegalArgumentException(
String.format(
"Can not find time field %s for record level expire.", timeField));
"Can not find time field %s for record level expire.", timeFieldName));
}

CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeField);
DataField field = rowType.getField(timeFieldName);
if (!((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
&& field.type() instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
Expand All @@ -79,8 +79,8 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
}

private RecordLevelExpire(
int timeField, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
this.timeField = timeField;
int timeFieldIndex, int expireTime, CoreOptions.TimeFieldType timeFieldType) {
this.timeFieldIndex = timeFieldIndex;
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
}
Expand All @@ -94,18 +94,18 @@ private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
return reader.filter(
kv -> {
checkArgument(
!kv.value().isNullAt(timeField),
!kv.value().isNullAt(timeFieldIndex),
"Time field for record-level expire should not be null.");
final int recordTime;
switch (timeFieldType) {
case SECONDS_INT:
recordTime = kv.value().getInt(timeField);
recordTime = kv.value().getInt(timeFieldIndex);
break;
case SECONDS_LONG:
recordTime = (int) kv.value().getLong(timeField);
recordTime = (int) kv.value().getLong(timeFieldIndex);
break;
case MILLIS_LONG:
recordTime = (int) (kv.value().getLong(timeField) / 1000);
recordTime = (int) (kv.value().getLong(timeFieldIndex) / 1000);
break;
default:
String msg =
Expand Down

0 comments on commit b106fff

Please sign in to comment.