Skip to content

Commit

Permalink
[flink] support the feature of ContinuousFileStoreSource using consum…
Browse files Browse the repository at this point in the history
…er-id.
  • Loading branch information
liming30 committed Oct 31, 2023
1 parent 0210f64 commit f345103
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 9 deletions.
11 changes: 11 additions & 0 deletions docs/content/how-to/querying-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ NOTE: The consumer will prevent expiration of the snapshot. You can specify 'con
lifetime of consumers.
{{< /hint >}}

You can set `consumer.use-legacy-mode` to `false` to use the consumer function implemented based on
[flip-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface), which can provide
more capabilities, such as watermark alignment.

{{< hint warning >}}
1. When there is no watermark definition, the consumer in non-legacy mode cannot provide the ability to pass the
watermark in the snapshot to the downstream.
2. Since the implementation of legacy mode and non-legacy mode are completely different, the state of flink is
incompatible and cannot be restored from the state when switching modes.
{{< /hint >}}

You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID.

{{< hint info >}}
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td>Duration</td>
<td>The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value.</td>
</tr>
<tr>
<td><h5>consumer.use-legacy-mode</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true.</td>
</tr>
<tr>
<td><h5>continuous.discovery-interval</h5></td>
<td style="word-wrap: break-word;">10 s</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,13 @@ public class CoreOptions implements Serializable {
"The expiration interval of consumer files. A consumer file will be expired if "
+ "it's lifetime after last modification is over this value.");

public static final ConfigOption<Boolean> CONSUMER_WITH_LEGACY_MODE =
key("consumer.use-legacy-mode")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true.");

public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
Expand Down Expand Up @@ -1303,6 +1310,10 @@ public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}

public boolean consumerWithLegacyMode() {
return options.get(CONSUMER_WITH_LEGACY_MODE);
}

public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
Expand All @@ -47,7 +48,11 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand All @@ -71,6 +76,12 @@ public class ContinuousFileSplitEnumerator

protected final SplitAssigner splitAssigner;

private final TreeMap<Long, Long> minNextSnapshotPerCheckpoint;

private final Map<Integer, Long> assignedSnapshotPerReader;

private final Map<Integer, Long> consumingSnapshotPerReader;

@Nullable protected Long nextSnapshotId;

protected boolean finished = false;
Expand All @@ -93,6 +104,10 @@ public ContinuousFileSplitEnumerator(
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
addSplits(remainSplits);

this.minNextSnapshotPerCheckpoint = new TreeMap<>();
this.assignedSnapshotPerReader = new HashMap<>(context.currentParallelism());
this.consumingSnapshotPerReader = new HashMap<>(context.currentParallelism());
}

@VisibleForTesting
Expand Down Expand Up @@ -140,7 +155,13 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
LOG.error("Received unrecognized event: {}", sourceEvent);
if (sourceEvent instanceof ReaderConsumeProgressEvent) {
long currentSnapshotId =
((ReaderConsumeProgressEvent) sourceEvent).lastConsumeSnapshotId();
consumingSnapshotPerReader.put(subtaskId, currentSnapshotId);
} else {
LOG.error("Received unrecognized event: {}", sourceEvent);
}
}

@Override
Expand All @@ -156,9 +177,23 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception
new PendingSplitsCheckpoint(splits, nextSnapshotId);

LOG.debug("Source Checkpoint is {}", checkpoint);

computeMinNextSnapshotId()
.ifPresent(
minNextSnapshotId ->
minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
return checkpoint;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
NavigableMap<Long, Long> nextSnapshots =
minNextSnapshotPerCheckpoint.headMap(checkpointId, true);
OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
max.ifPresent(scan::notifyCheckpointComplete);
nextSnapshots.clear();
}

// ------------------------------------------------------------------------

// this need to be synchronized because scan object is not thread safe. handleSplitRequest and
Expand Down Expand Up @@ -218,6 +253,8 @@ protected synchronized void assignSplits() {
List<FileStoreSourceSplit> splits = splitAssigner.getNext(task, null);
if (splits.size() > 0) {
assignment.put(task, splits);
TableScanUtils.getSnapshotId(splits.get(0))
.ifPresent(snapshotId -> assignedSnapshotPerReader.put(task, snapshotId));
}
}

Expand Down Expand Up @@ -250,6 +287,38 @@ protected boolean noMoreSplits() {
return finished;
}

/** Calculate the minimum snapshot currently being consumed by all readers. */
private Optional<Long> computeMinNextSnapshotId() {
long globalMinSnapshotId = Long.MAX_VALUE;
for (int subtask = 0; subtask < context.currentParallelism(); subtask++) {
// 1. if the reader is in the waiting list, it means that all allocated splits have been
// consumed, and the next snapshotId is calculated from splitAssigner.
//
// 2. if the reader is not in the waiting list, the larger value between the consumption
// progress reported by the reader and the most recently assigned snapshot id is used.
Long snapshotIdForTask;
if (readersAwaitingSplit.contains(subtask)) {
snapshotIdForTask = splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId);
} else {
Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask);
Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask);
if (consumingSnapshotId != null && assignedSnapshotId != null) {
snapshotIdForTask = Math.max(consumingSnapshotId, assignedSnapshotId);
} else {
snapshotIdForTask =
consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId;
}
}

if (snapshotIdForTask != null) {
globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForTask);
} else {
return Optional.empty();
}
}
return Optional.of(globalMinSnapshotId);
}

/** The result of scan. */
protected static class PlanWithNextSnapshotId {
private final TableScan.Plan plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.TableRead;

import org.apache.flink.api.connector.source.SourceReader;
Expand All @@ -33,13 +34,15 @@
import javax.annotation.Nullable;

import java.util.Map;
import java.util.Optional;

/** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
public class FileStoreSourceReader
extends SingleThreadMultiplexSourceReaderBase<
RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {

@Nullable private final IOManager ioManager;
private long lastConsumeSnapshotId = Long.MIN_VALUE;

public FileStoreSourceReader(
SourceReaderContext readerContext,
Expand Down Expand Up @@ -93,6 +96,20 @@ protected void onSplitFinished(Map<String, FileStoreSourceSplitState> finishedSp
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}

long maxFinishedSplits =
finishedSplitIds.values().stream()
.map(splitState -> TableScanUtils.getSnapshotId(splitState.toSourceSplit()))
.filter(Optional::isPresent)
.mapToLong(Optional::get)
.max()
.orElse(Long.MIN_VALUE);

if (lastConsumeSnapshotId < maxFinishedSplits) {
lastConsumeSnapshotId = maxFinishedSplits;
context.sendSourceEventToCoordinator(
new ReaderConsumeProgressEvent(lastConsumeSnapshotId));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public DataStream<RowData> build() {
} else {
if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
return buildAlignedContinuousFileSource();
} else if (conf.contains(CoreOptions.CONSUMER_ID)) {
} else if (conf.contains(CoreOptions.CONSUMER_ID)
&& conf.get(CoreOptions.CONSUMER_WITH_LEGACY_MODE)) {
return buildContinuousStreamOperator();
} else {
return buildContinuousFileSource();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source;

import org.apache.flink.api.connector.source.SourceEvent;

/**
* Event sent from {@link FileStoreSourceReader} to {@link ContinuousFileSplitEnumerator} to
* describe the current consumption progress.
*/
public class ReaderConsumeProgressEvent implements SourceEvent {

private static final long serialVersionUID = -1L;

private final long lastConsumeSnapshotId;

public ReaderConsumeProgressEvent(long lastConsumeSnapshotId) {
this.lastConsumeSnapshotId = lastConsumeSnapshotId;
}

public long lastConsumeSnapshotId() {
return lastConsumeSnapshotId;
}

@Override
public String toString() {
return "ReaderConsumeProgressEvent{"
+ "lastConsumeSnapshotId="
+ lastConsumeSnapshotId
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception
assignSplits();
}
Preconditions.checkArgument(alignedAssigner.isAligned());
lastConsumedSnapshotId = alignedAssigner.minRemainingSnapshotId();
lastConsumedSnapshotId = alignedAssigner.getNextSnapshotId(0).orElse(null);
alignedAssigner.removeFirst();
currentCheckpointId = checkpointId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Splits are allocated at the granularity of snapshots. When the splits of the current snapshot are
Expand Down Expand Up @@ -98,6 +99,12 @@ public Collection<FileStoreSourceSplit> remainingSplits() {
return remainingSplits;
}

@Override
public Optional<Long> getNextSnapshotId(int subtask) {
PendingSnapshot head = pendingSplitAssignment.peek();
return Optional.ofNullable(head != null ? head.snapshotId : null);
}

public boolean isAligned() {
PendingSnapshot head = pendingSplitAssignment.peek();
return head != null && head.empty();
Expand All @@ -114,11 +121,6 @@ public void removeFirst() {
"The head pending splits is not empty. This is a bug, please file an issue.");
}

public Long minRemainingSnapshotId() {
PendingSnapshot head = pendingSplitAssignment.peek();
return head != null ? head.snapshotId : null;
}

private static class PendingSnapshot {
private final long snapshotId;
private final boolean isPlaceHolder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;

import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId;

/**
* Splits are assigned preemptively in the order requested by the task. Only one split is assigned
Expand Down Expand Up @@ -65,4 +68,11 @@ public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
public Collection<FileStoreSourceSplit> remainingSplits() {
return new ArrayList<>(pendingSplitAssignment);
}

@Override
public Optional<Long> getNextSnapshotId(int subtask) {
return pendingSplitAssignment.isEmpty()
? Optional.empty()
: getSnapshotId(pendingSplitAssignment.peekFirst());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;

import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId;

/**
* Pre-calculate which splits each task should process according to the weight, and then distribute
* the splits fairly.
Expand Down Expand Up @@ -104,4 +107,12 @@ private static Map<Integer, LinkedList<FileStoreSourceSplit>> createBatchFairSpl
}
return assignment;
}

@Override
public Optional<Long> getNextSnapshotId(int subtask) {
LinkedList<FileStoreSourceSplit> pendingSplits = pendingSplitAssignment.get(subtask);
return (pendingSplits == null || pendingSplits.isEmpty())
? Optional.empty()
: getSnapshotId(pendingSplits.peekFirst());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Optional;

/**
* The {@code SplitAssigner} is responsible for deciding what splits should be processed next by
Expand All @@ -50,4 +51,7 @@ public interface SplitAssigner {

/** Gets the remaining splits that this assigner has pending. */
Collection<FileStoreSourceSplit> remainingSplits();

/** Gets the snapshot id of the next split. */
Optional<Long> getNextSnapshotId(int subtask);
}
Loading

0 comments on commit f345103

Please sign in to comment.