From cb9f8523b72636d7aca1e7e8fecfec5a803e6caf Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Tue, 26 Nov 2024 17:37:49 +0800 Subject: [PATCH] support flink sourceIdleTime metric --- .../metrics/FileStoreSourceReaderMetrics.java | 19 +++++++++++++++ .../flink/source/operator/ReadOperator.java | 2 ++ .../FileStoreSourceReaderMetricsTest.java | 24 +++++++++++++++++++ .../source/operator/OperatorSourceTest.java | 10 ++++++++ 4 files changed, 55 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java index 2e1e94777949..6955bfef1827 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java @@ -27,18 +27,22 @@ public class FileStoreSourceReaderMetrics { private long latestFileCreationTime = UNDEFINED; private long lastSplitUpdateTime = UNDEFINED; + private long idleStartTime = ACTIVE; public static final long UNDEFINED = -1L; + private static final long ACTIVE = Long.MAX_VALUE; public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) { sourceReaderMetricGroup.gauge( MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, this::getFetchTimeLag); + sourceReaderMetricGroup.gauge(MetricNames.SOURCE_IDLE_TIME, this::getIdleTime); } /** Called when consumed snapshot changes. */ public void recordSnapshotUpdate(long fileCreationTime) { this.latestFileCreationTime = fileCreationTime; lastSplitUpdateTime = System.currentTimeMillis(); + idleStartTime = ACTIVE; } @VisibleForTesting @@ -57,4 +61,19 @@ public long getLatestFileCreationTime() { long getLastSplitUpdateTime() { return lastSplitUpdateTime; } + + public void idlingStarted() { + if (!isIdling()) { + idleStartTime = System.currentTimeMillis(); + } + } + + boolean isIdling() { + return idleStartTime != ACTIVE; + } + + @VisibleForTesting + long getIdleTime() { + return isIdling() ? System.currentTimeMillis() - idleStartTime : 0; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 80c85f7cdb35..89f1f8754251 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -83,6 +83,7 @@ public void open() throws Exception { this.read = readBuilder.newRead().withIOManager(ioManager); this.reuseRow = new FlinkRowData(null); this.reuseRecord = new StreamRecord<>(reuseRow); + this.sourceReaderMetrics.idlingStarted(); } @Override @@ -113,6 +114,7 @@ public void processElement(StreamRecord record) throws Exception { output.collect(reuseRecord); } } + sourceReaderMetrics.idlingStarted(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java index 2012e7a8956c..2eb34dcc7bd7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java @@ -52,4 +52,28 @@ public void testCurrentFetchLagUpdated() { assertThat(sourceReaderMetrics.getFetchTimeLag()) .isNotEqualTo(FileStoreSourceReaderMetrics.UNDEFINED); } + + @Test + public void testSourceIdleTimeUpdated() throws InterruptedException { + MetricListener metricListener = new MetricListener(); + final FileStoreSourceReaderMetrics sourceReaderMetrics = + new FileStoreSourceReaderMetrics(metricListener.getMetricGroup()); + + assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L); + + // idle start + sourceReaderMetrics.idlingStarted(); + Thread.sleep(10L); + assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L); + + //non-idle + sourceReaderMetrics.recordSnapshotUpdate(123); + Thread.sleep(10L); + assertThat(sourceReaderMetrics.getIdleTime()).isEqualTo(0L); + + // idle start + sourceReaderMetrics.idlingStarted(); + Thread.sleep(10L); + assertThat(sourceReaderMetrics.getIdleTime()).isGreaterThan(9L); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 61a03a29a21b..c5abe5eb32a4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -204,6 +204,11 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { .getValue()) .isEqualTo(-1L); + Thread.sleep(300L); + assertThat((Long)TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()).isGreaterThan(299L); + harness.processElement(new StreamRecord<>(splits.get(0))); assertThat( (Long) @@ -228,6 +233,11 @@ public void testReadOperatorMetricsRegisterAndUpdate() throws Exception { "currentEmitEventTimeLag") .getValue()) .isEqualTo(emitEventTimeLag); + + assertThat((Long)TestingMetricUtils.getGauge( + readerOperatorMetricGroup, "sourceIdleTime") + .getValue()).isGreaterThan(99L).isLessThan(300L); + } private T testReadSplit(