Skip to content

Commit

Permalink
[flink] Fix BucketUnawareCompactSource sleep only when empty
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Dec 10, 2024
1 parent 540a2b1 commit 536857a
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public static class BucketUnawareCompactSourceReader
extends AbstractNonCoordinatedSourceReader<UnawareAppendCompactionTask> {
private final UnawareAppendTableCompactionCoordinator compactionCoordinator;
private final long scanInterval;
private long lastFetchTimeMillis = 0L;

public BucketUnawareCompactSourceReader(
FileStoreTable table, boolean streaming, Predicate filter, long scanInterval) {
Expand All @@ -103,22 +102,21 @@ public BucketUnawareCompactSourceReader(
@Override
public InputStatus pollNext(ReaderOutput<UnawareAppendCompactionTask> readerOutput)
throws Exception {
long sleepTimeMillis = scanInterval - System.currentTimeMillis() + lastFetchTimeMillis;
if (sleepTimeMillis > 0) {
Thread.sleep(sleepTimeMillis);
}

boolean isEmpty;
try {
// do scan and plan action, emit append-only compaction tasks.
List<UnawareAppendCompactionTask> tasks = compactionCoordinator.run();
isEmpty = tasks.isEmpty();
tasks.forEach(readerOutput::collect);
return InputStatus.MORE_AVAILABLE;
} catch (EndOfScanException esf) {
LOG.info("Catching EndOfStreamException, the stream is finished.");
return InputStatus.END_OF_INPUT;
} finally {
lastFetchTimeMillis = System.currentTimeMillis();
}

if (isEmpty) {
Thread.sleep(scanInterval);
}
return InputStatus.MORE_AVAILABLE;
}
}

Expand Down

0 comments on commit 536857a

Please sign in to comment.