Skip to content

Commit

Permalink
[flink] splitAssigner adds a thread-safe interface for obtaining the …
Browse files Browse the repository at this point in the history
…number of remaining splits. (apache#3011)
  • Loading branch information
liming30 authored Mar 15, 2024
1 parent bc2e1af commit 994849c
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// context.callAsync will invoke this. This method runs in workerExecutorThreadPool in
// parallelism.
protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot() {
if (splitAssigner.remainingSplits().size() >= splitMaxNum) {
if (splitAssigner.numberOfRemainingSplits() >= splitMaxNum) {
return Optional.empty();
}
TableScan.Plan plan = scan.plan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Splits are allocated at the granularity of snapshots. When the splits of the current snapshot are
Expand All @@ -43,19 +44,24 @@ public class AlignedSplitAssigner implements SplitAssigner {

private final Deque<PendingSnapshot> pendingSplitAssignment;

private final AtomicInteger numberOfPendingSplits;

public AlignedSplitAssigner() {
this.pendingSplitAssignment = new LinkedList<>();
this.numberOfPendingSplits = new AtomicInteger(0);
}

@Override
public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname) {
PendingSnapshot head = pendingSplitAssignment.peek();
if (head != null && !head.isPlaceHolder) {
List<FileStoreSourceSplit> subtaskSplits = head.remove(subtask);
return subtaskSplits != null ? subtaskSplits : Collections.emptyList();
} else {
return Collections.emptyList();
if (subtaskSplits != null) {
numberOfPendingSplits.getAndAdd(-subtaskSplits.size());
return subtaskSplits;
}
}
return Collections.emptyList();
}

@Override
Expand All @@ -70,6 +76,7 @@ public void addSplit(int subtask, FileStoreSourceSplit splits) {
} else {
last.add(subtask, splits);
}
numberOfPendingSplits.incrementAndGet();
}

@Override
Expand All @@ -88,6 +95,7 @@ public void addSplitsBack(int suggestedTask, List<FileStoreSourceSplit> splits)
} else {
head.addAll(suggestedTask, splits);
}
numberOfPendingSplits.getAndAdd(splits.size());
}

@Override
Expand All @@ -105,6 +113,11 @@ public Optional<Long> getNextSnapshotId(int subtask) {
return Optional.ofNullable(head != null ? head.snapshotId : null);
}

@Override
public int numberOfRemainingSplits() {
return numberOfPendingSplits.get();
}

public boolean isAligned() {
PendingSnapshot head = pendingSplitAssignment.peek();
return head != null && head.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public Optional<Long> getNextSnapshotId(int subtask) {
return innerAssigner.getNextSnapshotId(subtask);
}

@Override
public int numberOfRemainingSplits() {
return innerAssigner.numberOfRemainingSplits();
}

private boolean filter(FileStoreSourceSplit sourceSplit) {
DataSplit dataSplit = (DataSplit) sourceSplit.split();
BinaryRow partition = dataSplit.partition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ public Optional<Long> getNextSnapshotId(int subtask) {
? Optional.empty()
: getSnapshotId(pendingSplitAssignment.peekFirst());
}

@Override
public int numberOfRemainingSplits() {
return pendingSplitAssignment.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId;

Expand All @@ -48,13 +49,16 @@ public class PreAssignSplitAssigner implements SplitAssigner {

private final Map<Integer, LinkedList<FileStoreSourceSplit>> pendingSplitAssignment;

private final AtomicInteger numberOfPendingSplits;

public PreAssignSplitAssigner(
int splitBatchSize,
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> splits) {
this.splitBatchSize = splitBatchSize;
this.pendingSplitAssignment =
createBatchFairSplitAssignment(splits, context.currentParallelism());
this.numberOfPendingSplits = new AtomicInteger(splits.size());
}

@Override
Expand All @@ -67,12 +71,14 @@ public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname
while (taskSplits != null && !taskSplits.isEmpty() && assignment.size() < splitBatchSize) {
assignment.add(taskSplits.poll());
}
numberOfPendingSplits.getAndAdd(-assignment.size());
return assignment;
}

@Override
public void addSplit(int suggestedTask, FileStoreSourceSplit split) {
pendingSplitAssignment.computeIfAbsent(suggestedTask, k -> new LinkedList<>()).add(split);
numberOfPendingSplits.incrementAndGet();
}

@Override
Expand All @@ -83,6 +89,7 @@ public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
while (iterator.hasPrevious()) {
remainingSplits.addFirst(iterator.previous());
}
numberOfPendingSplits.getAndAdd(splits.size());
}

@Override
Expand Down Expand Up @@ -115,4 +122,9 @@ public Optional<Long> getNextSnapshotId(int subtask) {
? Optional.empty()
: getSnapshotId(pendingSplits.peekFirst());
}

@Override
public int numberOfRemainingSplits() {
return numberOfPendingSplits.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@ public interface SplitAssigner {

/** Gets the snapshot id of the next split. */
Optional<Long> getNextSnapshotId(int subtask);

/**
* Gets the current number of remaining splits. This method should be guaranteed to be
* thread-safe.
*/
int numberOfRemainingSplits();
}
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,13 @@ public void testEnumeratorSplitMax() throws Exception {
context.triggerAllActions();

Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(16 * 2);
Assertions.assertThat(enumerator.splitAssigner.numberOfRemainingSplits()).isEqualTo(16 * 2);

enumerator.handleSplitRequest(0, "test");
enumerator.handleSplitRequest(1, "test");

Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(15 * 2);
Assertions.assertThat(enumerator.splitAssigner.numberOfRemainingSplits()).isEqualTo(15 * 2);
}

private void triggerCheckpointAndComplete(
Expand Down

0 comments on commit 994849c

Please sign in to comment.