diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 4735a2abbcb7..067bf055c241 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -112,6 +112,10 @@ public OptionalLong latestFileCreationEpochMillis() { return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max(); } + public OptionalLong earliestFileCreationEpochMillis() { + return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).min(); + } + @Override public long rowCount() { long rowCount = 0; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java index 159d32748e5e..0a4be3a8a7f1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java @@ -181,7 +181,7 @@ private void checkSplitOrStartNext() throws IOException { if (nextSplit.split() instanceof DataSplit) { long eventTime = ((DataSplit) nextSplit.split()) - .latestFileCreationEpochMillis() + .earliestFileCreationEpochMillis() .orElse(FileStoreSourceReaderMetrics.UNDEFINED); metrics.recordSnapshotUpdate(eventTime); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index 1196c7a77627..97c20ed794a5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -100,7 +100,7 @@ public void processElement(StreamRecord record) throws Exception { // update metric when reading a new split long eventTime = ((DataSplit) split) - .latestFileCreationEpochMillis() + .earliestFileCreationEpochMillis() .orElse(FileStoreSourceReaderMetrics.UNDEFINED); sourceReaderMetrics.recordSnapshotUpdate(eventTime);