Skip to content

Commit

Permalink
[flink] Emit event time lag should not increase when no new records a…
Browse files Browse the repository at this point in the history
…re read (#4281)
  • Loading branch information
tsreaper authored Oct 3, 2024
1 parent 6c75a5b commit bd156e9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class ReadOperator extends AbstractStreamOperator<RowData>
private transient IOManager ioManager;

private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
// we create our own gauge for currentEmitEventTimeLag, because this operator is not a FLIP-27
// source and Flink can't automatically calculate this metric
private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED;
private transient Counter numRecordsIn;

public ReadOperator(ReadBuilder readBuilder) {
Expand All @@ -65,19 +68,7 @@ public void open() throws Exception {
super.open();

this.sourceReaderMetrics = new FileStoreSourceReaderMetrics(getMetricGroup());
// we create our own gauge for currentEmitEventTimeLag, because this operator is not a
// FLIP-27 source and Flink can't automatically calculate this metric
getMetricGroup()
.gauge(
MetricNames.CURRENT_EMIT_EVENT_TIME_LAG,
() -> {
long eventTime = sourceReaderMetrics.getLatestFileCreationTime();
if (eventTime == FileStoreSourceReaderMetrics.UNDEFINED) {
return FileStoreSourceReaderMetrics.UNDEFINED;
} else {
return System.currentTimeMillis() - eventTime;
}
});
getMetricGroup().gauge(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, () -> emitEventTimeLag);
this.numRecordsIn =
InternalSourceReaderMetricGroup.wrap(getMetricGroup())
.getIOMetricGroup()
Expand Down Expand Up @@ -108,6 +99,8 @@ public void processElement(StreamRecord<Split> record) throws Exception {
try (CloseableIterator<InternalRow> iterator =
read.createReader(split).toCloseableIterator()) {
while (iterator.hasNext()) {
emitEventTimeLag = System.currentTimeMillis() - eventTime;

// each Split is already counted as one input record,
// so we don't need to count the first record
if (firstRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
readerOperatorMetricGroup, "currentEmitEventTimeLag")
.getValue())
.isEqualTo(-1L);

harness.processElement(new StreamRecord<>(splits.get(0)));
assertThat(
(Long)
Expand All @@ -211,13 +212,22 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
"currentFetchEventTimeLag")
.getValue())
.isGreaterThan(0);
long emitEventTimeLag =
(Long)
TestingMetricUtils.getGauge(
readerOperatorMetricGroup, "currentEmitEventTimeLag")
.getValue();
assertThat(emitEventTimeLag).isGreaterThan(0);

// wait for a while and read metrics again, metrics should not change
Thread.sleep(100);
assertThat(
(Long)
TestingMetricUtils.getGauge(
readerOperatorMetricGroup,
"currentEmitEventTimeLag")
.getValue())
.isGreaterThan(0);
.isEqualTo(emitEventTimeLag);
}

private <T> T testReadSplit(
Expand Down

0 comments on commit bd156e9

Please sign in to comment.