Skip to content

Commit

Permalink
support flink sourceIdleTime metric
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Nov 26, 2024
1 parent dc9849f commit cb9f852
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,22 @@ public class FileStoreSourceReaderMetrics {

private long latestFileCreationTime = UNDEFINED;
private long lastSplitUpdateTime = UNDEFINED;
private long idleStartTime = ACTIVE;

public static final long UNDEFINED = -1L;
private static final long ACTIVE = Long.MAX_VALUE;

public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
sourceReaderMetricGroup.gauge(
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, this::getFetchTimeLag);
sourceReaderMetricGroup.gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime);
}

/** Called when consumed snapshot changes. */
public void recordSnapshotUpdate(long fileCreationTime) {
this.latestFileCreationTime = fileCreationTime;
lastSplitUpdateTime = System.currentTimeMillis();
idleStartTime = ACTIVE;
}

@VisibleForTesting
Expand All @@ -57,4 +61,19 @@ public long getLatestFileCreationTime() {
long getLastSplitUpdateTime() {
return lastSplitUpdateTime;
}

public void idlingStarted() {
if (!isIdling()) {
idleStartTime = System.currentTimeMillis();
}
}

boolean isIdling() {
return idleStartTime != ACTIVE;
}

@VisibleForTesting
long getIdleTime() {
return isIdling() ? System.currentTimeMillis() - idleStartTime : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void open() throws Exception {
this.read = readBuilder.newRead().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(reuseRow);
this.sourceReaderMetrics.idlingStarted();
}

@Override
Expand Down Expand Up @@ -113,6 +114,7 @@ public void processElement(StreamRecord<Split> record) throws Exception {
output.collect(reuseRecord);
}
}
sourceReaderMetrics.idlingStarted();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,28 @@ public void testCurrentFetchLagUpdated() {
assertThat(sourceReaderMetrics.getFetchTimeLag())
.isNotEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
}

@Test
public void testSourceIdleTimeUpdated() throws InterruptedException {
MetricListener metricListener = new MetricListener();
final FileStoreSourceReaderMetrics sourceReaderMetrics =
new FileStoreSourceReaderMetrics(metricListener.getMetricGroup());

assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L);

// idle start
sourceReaderMetrics.idlingStarted();
Thread.sleep(10L);
assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L);

//non-idle
sourceReaderMetrics.recordSnapshotUpdate(123);
Thread.sleep(10L);
assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L);

// idle start
sourceReaderMetrics.idlingStarted();
Thread.sleep(10L);
assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
.getValue())
.isEqualTo(-1L);

Thread.sleep(300L);
assertThat((Long)TestingMetricUtils.getGauge(
readerOperatorMetricGroup, "sourceIdleTime")
.getValue()).isGreaterThan(299L);

harness.processElement(new StreamRecord<>(splits.get(0)));
assertThat(
(Long)
Expand All @@ -228,6 +233,11 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
"currentEmitEventTimeLag")
.getValue())
.isEqualTo(emitEventTimeLag);

assertThat((Long)TestingMetricUtils.getGauge(
readerOperatorMetricGroup, "sourceIdleTime")
.getValue()).isGreaterThan(99L).isLessThan(300L);

}

private <T> T testReadSplit(
Expand Down

0 comments on commit cb9f852

Please sign in to comment.