Skip to content

Commit

Permalink
[flink] When reading DataSplit, calculate fetch event time lag with e…
Browse files Browse the repository at this point in the history
…arliest file creation time instead of latest (#3952)
  • Loading branch information
tsreaper authored Aug 13, 2024
1 parent c4762cc commit 7dd2edb
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public OptionalLong latestFileCreationEpochMillis() {
return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
}

public OptionalLong earliestFileCreationEpochMillis() {
return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).min();
}

@Override
public long rowCount() {
long rowCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private void checkSplitOrStartNext() throws IOException {
if (nextSplit.split() instanceof DataSplit) {
long eventTime =
((DataSplit) nextSplit.split())
.latestFileCreationEpochMillis()
.earliestFileCreationEpochMillis()
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
metrics.recordSnapshotUpdate(eventTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void processElement(StreamRecord<Split> record) throws Exception {
// update metric when reading a new split
long eventTime =
((DataSplit) split)
.latestFileCreationEpochMillis()
.earliestFileCreationEpochMillis()
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);

Expand Down

0 comments on commit 7dd2edb

Please sign in to comment.