Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Dec 6, 2024
1 parent dc8823a commit cf45da4
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class FileStoreSourceReaderMetrics {
private long lastSplitUpdateTime = UNDEFINED;

public static final long UNDEFINED = -1L;
public static final long ACTIVE = Long.MAX_VALUE;

public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
sourceReaderMetricGroup.gauge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class ReadOperator extends AbstractStreamOperator<RowData>
// is not a FLIP-27
// source and Flink can't automatically calculate this metric
private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED;
private transient long idleStartTime = FileStoreSourceReaderMetrics.UNDEFINED;
private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
private transient Counter numRecordsIn;

public ReadOperator(ReadBuilder readBuilder) {
Expand Down Expand Up @@ -99,7 +99,7 @@ public void processElement(StreamRecord<Split> record) throws Exception {
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);
// update idleStartTime when reading a new split
idleStartTime = FileStoreSourceReaderMetrics.UNDEFINED;
idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;

boolean firstRecord = true;
try (CloseableIterator<InternalRow> iterator =
Expand Down Expand Up @@ -138,7 +138,7 @@ private void idlingStarted() {
}

private boolean isIdling() {
return idleStartTime != FileStoreSourceReaderMetrics.UNDEFINED;
return idleStartTime != FileStoreSourceReaderMetrics.ACTIVE;
}

private long getIdleTime() {
Expand Down

0 comments on commit cf45da4

Please sign in to comment.