Skip to content

Commit

Permalink
resolve comment
Browse files Browse the repository at this point in the history
  • Loading branch information
liming30 committed Nov 13, 2023
1 parent 7179e4a commit f59978e
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 228 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,35 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.source.assigners.DynamicPartitionPruningAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;

import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */
public class StaticFileStoreSplitEnumerator extends StaticFileStoreSplitEnumeratorBase {
public class StaticFileStoreSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {

private static final Logger LOG = LoggerFactory.getLogger(StaticFileStoreSplitEnumerator.class);

private final SplitEnumeratorContext<FileStoreSourceSplit> context;

@Nullable private final Snapshot snapshot;

private SplitAssigner splitAssigner;

@Nullable private final DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo;

Expand All @@ -49,33 +63,79 @@ public StaticFileStoreSplitEnumerator(
@Nullable Snapshot snapshot,
SplitAssigner splitAssigner,
@Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) {
super(context, snapshot, splitAssigner);
this.context = context;
this.snapshot = snapshot;
this.splitAssigner = splitAssigner;
this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo;
}

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
DynamicFilteringData dynamicFilteringData =
((DynamicFilteringEvent) sourceEvent).getData();
LOG.info(
"Received DynamicFilteringEvent: {}, is filtering: {}.",
subtaskId,
dynamicFilteringData.isFiltering());
public void start() {
// no resources to start
}

@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}

List<FileStoreSourceSplit> assignment = splitAssigner.getNext(subtask, hostname);
if (assignment.size() > 0) {
context.assignSplits(
new SplitsAssignment<>(Collections.singletonMap(subtask, assignment)));
} else {
context.signalNoMoreSplits(subtask);
}
}

@Override
public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) {
splitAssigner.addSplitsBack(subtaskId, backSplits);
}

@Override
public void addReader(int subtaskId) {
// this source is purely lazy-pull-based, nothing to do upon registration
}

@Override
public PendingSplitsCheckpoint snapshotState(long checkpointId) {
return new PendingSplitsCheckpoint(
splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id());
}

@Override
public void close() {
// no resources to close
}

@Nullable
public Snapshot snapshot() {
return snapshot;
}

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent.getClass().getSimpleName().equals("DynamicFilteringEvent")) {
checkNotNull(
dynamicPartitionFilteringInfo,
"Cannot apply dynamic filtering because dynamicPartitionFilteringInfo hasn't been set.");

if (dynamicFilteringData.isFiltering()) {
this.splitAssigner =
new DynamicPartitionPruningAssigner(
splitAssigner,
dynamicPartitionFilteringInfo.getPartitionRowProjection(),
dynamicFilteringData);
}
this.splitAssigner =
DynamicPartitionPruningAssigner.createDynamicPartitionPruningAssignerIfNeeded(
subtaskId,
splitAssigner,
dynamicPartitionFilteringInfo.getPartitionRowProjection(),
sourceEvent,
LOG);
} else {
super.handleSourceEvent(subtaskId, sourceEvent);
LOG.error("Received unrecognized event: {}", sourceEvent);
}
}

@VisibleForTesting
public SplitAssigner getSplitAssigner() {
return splitAssigner;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.table.source.DataSplit;

import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.slf4j.Logger;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -83,6 +86,23 @@ public Collection<FileStoreSourceSplit> remainingSplits() {
.collect(Collectors.toList());
}

public static SplitAssigner createDynamicPartitionPruningAssignerIfNeeded(
int subtaskId,
SplitAssigner oriAssigner,
Projection partitionRowProjection,
SourceEvent sourceEvent,
Logger logger) {
DynamicFilteringData dynamicFilteringData = ((DynamicFilteringEvent) sourceEvent).getData();
logger.info(
"Received DynamicFilteringEvent: {}, is filtering: {}.",
subtaskId,
dynamicFilteringData.isFiltering());
return dynamicFilteringData.isFiltering()
? new DynamicPartitionPruningAssigner(
oriAssigner, partitionRowProjection, dynamicFilteringData)
: oriAssigner;
}

private boolean filter(FileStoreSourceSplit sourceSplit) {
DataSplit dataSplit = (DataSplit) sourceSplit.split();
BinaryRow partition = dataSplit.partition();
Expand Down

0 comments on commit f59978e

Please sign in to comment.