Skip to content

Commit

Permalink
[flink] Improve readability of MonitorSourceFunction state
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Dec 10, 2024
1 parent 536857a commit b31c891
Showing 1 changed file with 76 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
Expand Down Expand Up @@ -106,10 +107,17 @@ public SourceReader<Split, SimpleSourceSplit> createReader(
}

private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
private static final String CHECKPOINT_STATE = "CS";
private static final String NEXT_SNAPSHOT_STATE = "NSS";

private final StreamTableScan scan = readBuilder.newStreamScan();
private final SplitState<Long> checkpointState =
new SplitState<>("next-snapshot", x -> Long.toString(x), Long::parseLong);
private final SplitState<Tuple2<Long, Long>> nextSnapshotState =
new SplitState<>(
"next-snapshot-per-checkpoint",
x -> x.f0 + ":" + x.f1,
x ->
Tuple2.of(
Long.parseLong(x.split(":")[0]),
Long.parseLong(x.split(":")[1])));
private final TreeMap<Long, Long> nextSnapshotPerCheckpoint = new TreeMap<>();

@Override
Expand All @@ -123,34 +131,34 @@ public void notifyCheckpointComplete(long checkpointId) {

@Override
public List<SimpleSourceSplit> snapshotState(long checkpointId) {
List<SimpleSourceSplit> results = new ArrayList<>();

this.checkpointState.clear();
Long nextSnapshot = this.scan.checkpoint();
if (nextSnapshot != null) {
results.add(new SimpleSourceSplit(CHECKPOINT_STATE + nextSnapshot));
this.checkpointState.add(nextSnapshot);
this.nextSnapshotPerCheckpoint.put(checkpointId, nextSnapshot);
}

this.nextSnapshotPerCheckpoint.forEach(
(k, v) ->
results.add(new SimpleSourceSplit(NEXT_SNAPSHOT_STATE + k + ":" + v)));
List<Tuple2<Long, Long>> nextSnapshots = new ArrayList<>();
this.nextSnapshotPerCheckpoint.forEach((k, v) -> nextSnapshots.add(new Tuple2<>(k, v)));
this.nextSnapshotState.update(nextSnapshots);

if (LOG.isDebugEnabled()) {
LOG.debug("{} checkpoint {}.", getClass().getSimpleName(), nextSnapshot);
}

List<SimpleSourceSplit> results = new ArrayList<>();
results.addAll(checkpointState.snapshotState());
results.addAll(nextSnapshotState.snapshotState());
return results;
}

@Override
public void addSplits(List<SimpleSourceSplit> list) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
checkpointState.restoreState(list);
nextSnapshotState.restoreState(list);

List<Long> retrievedStates =
list.stream()
.map(SimpleSourceSplit::value)
.filter(x -> x.startsWith(CHECKPOINT_STATE))
.map(x -> Long.parseLong(x.substring(CHECKPOINT_STATE.length())))
.collect(Collectors.toList());
List<Long> retrievedStates = checkpointState.get();

// given that the parallelism of the source is 1, we can only have 1 retrieved items.
Preconditions.checkArgument(
Expand All @@ -161,14 +169,9 @@ public void addSplits(List<SimpleSourceSplit> list) {
this.scan.restore(retrievedStates.get(0));
}

list.stream()
.map(SimpleSourceSplit::value)
.filter(x -> x.startsWith(NEXT_SNAPSHOT_STATE))
.map(x -> x.substring(NEXT_SNAPSHOT_STATE.length()).split(":"))
.forEach(
x ->
nextSnapshotPerCheckpoint.put(
Long.parseLong(x[0]), Long.parseLong(x[1])));
for (Tuple2<Long, Long> tuple2 : nextSnapshotState.get()) {
nextSnapshotPerCheckpoint.put(tuple2.f0, tuple2.f1);
}
}

@Override
Expand Down Expand Up @@ -197,6 +200,56 @@ public InputStatus pollNext(ReaderOutput<Split> readerOutput) throws Exception {
}
}

private static class SplitState<T> {
private final String identifier;
private final List<T> values;
private final Function<T, String> serializer;
private final Function<String, T> deserializer;

private SplitState(
String identifier,
Function<T, String> serializer,
Function<String, T> deserializer) {
this.identifier = identifier;
this.serializer = serializer;
this.deserializer = deserializer;
this.values = new ArrayList<>();
}

private void add(T value) {
values.add(value);
}

private List<T> get() {
return new ArrayList<>(values);
}

private void update(List<T> values) {
this.values.clear();
this.values.addAll(values);
}

private void clear() {
values.clear();
}

private List<SimpleSourceSplit> snapshotState() {
return values.stream()
.map(x -> new SimpleSourceSplit(identifier + serializer.apply(x)))
.collect(Collectors.toList());
}

private void restoreState(List<SimpleSourceSplit> splits) {
values.clear();
splits.stream()
.map(SimpleSourceSplit::value)
.filter(x -> x.startsWith(identifier))
.map(x -> x.substring(identifier.length()))
.map(this.deserializer)
.forEach(values::add);
}
}

public static DataStream<RowData> buildSource(
StreamExecutionEnvironment env,
String name,
Expand Down

0 comments on commit b31c891

Please sign in to comment.