From b31c891e72ce49c1536227785727175469827d5a Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Tue, 10 Dec 2024 17:54:36 +0800 Subject: [PATCH] [flink] Improve readability of MonitorSourceFunction state --- .../flink/source/operator/MonitorSource.java | 99 ++++++++++++++----- 1 file changed, 76 insertions(+), 23 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index 8ca3b3d24736..753f8287e341 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -52,6 +52,7 @@ import java.util.NavigableMap; import java.util.OptionalLong; import java.util.TreeMap; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE; @@ -106,10 +107,17 @@ public SourceReader createReader( } private class Reader extends AbstractNonCoordinatedSourceReader { - private static final String CHECKPOINT_STATE = "CS"; - private static final String NEXT_SNAPSHOT_STATE = "NSS"; - private final StreamTableScan scan = readBuilder.newStreamScan(); + private final SplitState checkpointState = + new SplitState<>("next-snapshot", x -> Long.toString(x), Long::parseLong); + private final SplitState> nextSnapshotState = + new SplitState<>( + "next-snapshot-per-checkpoint", + x -> x.f0 + ":" + x.f1, + x -> + Tuple2.of( + Long.parseLong(x.split(":")[0]), + Long.parseLong(x.split(":")[1]))); private final TreeMap nextSnapshotPerCheckpoint = new TreeMap<>(); @Override @@ -123,34 +131,34 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public List snapshotState(long checkpointId) { - List results = new ArrayList<>(); - + this.checkpointState.clear(); Long nextSnapshot = this.scan.checkpoint(); if (nextSnapshot != null) { - results.add(new SimpleSourceSplit(CHECKPOINT_STATE + nextSnapshot)); + this.checkpointState.add(nextSnapshot); this.nextSnapshotPerCheckpoint.put(checkpointId, nextSnapshot); } - this.nextSnapshotPerCheckpoint.forEach( - (k, v) -> - results.add(new SimpleSourceSplit(NEXT_SNAPSHOT_STATE + k + ":" + v))); + List> nextSnapshots = new ArrayList<>(); + this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2<>(k, v))); + this.nextSnapshotState.update(nextSnapshots); if (LOG.isDebugEnabled()) { LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot); } + + List results = new ArrayList<>(); + results.addAll(checkpointState.snapshotState()); + results.addAll(nextSnapshotState.snapshotState()); return results; } @Override public void addSplits(List list) { LOG.info("Restoring state for the {}.", getClass().getSimpleName()); + checkpointState.restoreState(list); + nextSnapshotState.restoreState(list); - List retrievedStates = - list.stream() - .map(SimpleSourceSplit::value) - .filter(x -> x.startsWith(CHECKPOINT_STATE)) - .map(x -> Long.parseLong(x.substring(CHECKPOINT_STATE.length()))) - .collect(Collectors.toList()); + List retrievedStates = checkpointState.get(); // given that the parallelism of the source is 1, we can only have 1 retrieved items. Preconditions.checkArgument( @@ -161,14 +169,9 @@ public void addSplits(List list) { this.scan.restore(retrievedStates.get(0)); } - list.stream() - .map(SimpleSourceSplit::value) - .filter(x -> x.startsWith(NEXT_SNAPSHOT_STATE)) - .map(x -> x.substring(NEXT_SNAPSHOT_STATE.length()).split(":")) - .forEach( - x -> - nextSnapshotPerCheckpoint.put( - Long.parseLong(x[0]), Long.parseLong(x[1]))); + for (Tuple2 tuple2 : nextSnapshotState.get()) { + nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1); + } } @Override @@ -197,6 +200,56 @@ public InputStatus pollNext(ReaderOutput readerOutput) throws Exception { } } + private static class SplitState { + private final String identifier; + private final List values; + private final Function serializer; + private final Function deserializer; + + private SplitState( + String identifier, + Function serializer, + Function deserializer) { + this.identifier = identifier; + this.serializer = serializer; + this.deserializer = deserializer; + this.values = new ArrayList<>(); + } + + private void add(T value) { + values.add(value); + } + + private List get() { + return new ArrayList<>(values); + } + + private void update(List values) { + this.values.clear(); + this.values.addAll(values); + } + + private void clear() { + values.clear(); + } + + private List snapshotState() { + return values.stream() + .map(x -> new SimpleSourceSplit(identifier + serializer.apply(x))) + .collect(Collectors.toList()); + } + + private void restoreState(List splits) { + values.clear(); + splits.stream() + .map(SimpleSourceSplit::value) + .filter(x -> x.startsWith(identifier)) + .map(x -> x.substring(identifier.length())) + .map(this.deserializer) + .forEach(values::add); + } + } + public static DataStream buildSource( StreamExecutionEnvironment env, String name,