Skip to content

Commit

Permalink
reformat code
Browse files Browse the repository at this point in the history
  • Loading branch information
liming30 committed Nov 13, 2023
1 parent 4352c0f commit 983173d
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 78 deletions.
13 changes: 7 additions & 6 deletions docs/content/how-to/querying-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,15 +332,16 @@ NOTE: The consumer will prevent expiration of the snapshot. You can specify 'con
lifetime of consumers.
{{< /hint >}}

You can set `consumer.use-legacy-mode` to `false` to use the consumer function implemented based on
[flip-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface), which can provide
more capabilities, such as watermark alignment.
By default, the consumer uses `exactly-once` mode to record consumption progress, which strictly ensures that what is
recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed. You can set `consumer.mode` to
`at-least-once` to allow readers consume snapshots at different rates and record the slowest snapshot-id among all
readers into the consumer. This mode can provide more capabilities, such as watermark alignment.

{{< hint warning >}}
1. When there is no watermark definition, the consumer in non-legacy mode cannot provide the ability to pass the
1. When there is no watermark definition, the consumer in `at-least-once` mode cannot provide the ability to pass the
watermark in the snapshot to the downstream.
2. Since the implementation of legacy mode and non-legacy mode are completely different, the state of flink is
incompatible and cannot be restored from the state when switching modes.
2. Since the implementation of `exactly-once` mode and `at-least-once` mode are completely different, the state of
flink is incompatible and cannot be restored from the state when switching modes.
{{< /hint >}}

You can reset a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID.
Expand Down
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@
<td>The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value.</td>
</tr>
<tr>
<td><h5>consumer.use-legacy-mode</h5></td>
<td style="word-wrap: break-word;">true</td>
<td><h5>consumer.mode</h5></td>
<td style="word-wrap: break-word;">exactly-once</td>
<td>Boolean</td>
<td>Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true.</td>
<td>Specify the consumer consistency mode for table.<br /><br />Possible values:<ul><li>"exactly-once": Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed.</li><li>"at-least-once": Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.</li></ul></td>
</tr>
<tr>
<td><h5>continuous.discovery-interval</h5></td>
Expand Down
44 changes: 36 additions & 8 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -705,12 +705,11 @@ public class CoreOptions implements Serializable {
"The expiration interval of consumer files. A consumer file will be expired if "
+ "it's lifetime after last modification is over this value.");

public static final ConfigOption<Boolean> CONSUMER_WITH_LEGACY_MODE =
key("consumer.use-legacy-mode")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy mode when setting consumer-id. Legacy mode is implemented through the flink legacy source interface and will not have flip-27 related features. To maintain compatibility with the previous state, the default value is set to true.");
public static final ConfigOption<ConsumerMode> CONSUMER_CONSISTENCY_MODE =
key("consumer.mode")
.enumType(ConsumerMode.class)
.defaultValue(ConsumerMode.EXACTLY_ONCE)
.withDescription("Specify the consumer consistency mode for table.");

public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
Expand Down Expand Up @@ -1310,8 +1309,8 @@ public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}

public boolean consumerWithLegacyMode() {
return options.get(CONSUMER_WITH_LEGACY_MODE);
public ConsumerMode consumerWithLegacyMode() {
return options.get(CONSUMER_CONSISTENCY_MODE);
}

public boolean partitionedTableInMetastore() {
Expand Down Expand Up @@ -1958,4 +1957,33 @@ public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the log consistency mode for table. */
public enum ConsumerMode implements DescribedEnum {
EXACTLY_ONCE(
"exactly-once",
"Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed."),

AT_LEAST_ONCE(
"at-least-once",
"Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.");

private final String value;
private final String description;

ConsumerMode(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source;

import org.apache.paimon.flink.utils.TableScanUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;

/** Calculator for calculating consumer consumption progress. */
public class ConsumerProgressCalculator {
private final TreeMap<Long, Long> minNextSnapshotPerCheckpoint;

private final Map<Integer, Long> assignedSnapshotPerReader;

private final Map<Integer, Long> consumingSnapshotPerReader;

public ConsumerProgressCalculator(int parallelism) {
this.minNextSnapshotPerCheckpoint = new TreeMap<>();
this.assignedSnapshotPerReader = new HashMap<>(parallelism);
this.consumingSnapshotPerReader = new HashMap<>(parallelism);
}

public void updateConsumeProgress(int subtaskId, ReaderConsumeProgressEvent event) {
consumingSnapshotPerReader.put(subtaskId, event.lastConsumeSnapshotId());
}

public void updateAssignInformation(int subtaskId, FileStoreSourceSplit split) {
TableScanUtils.getSnapshotId(split)
.ifPresent(snapshotId -> assignedSnapshotPerReader.put(subtaskId, snapshotId));
}

public void notifySnapshotState(
long checkpointId,
Set<Integer> readersAwaitingSplit,
Function<Integer, Long> unassignedCalculationFunction,
int parallelism) {
computeMinNextSnapshotId(readersAwaitingSplit, unassignedCalculationFunction, parallelism)
.ifPresent(
minNextSnapshotId ->
minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
}

public OptionalLong notifyCheckpointComplete(long checkpointId) {
NavigableMap<Long, Long> nextSnapshots =
minNextSnapshotPerCheckpoint.headMap(checkpointId, true);
OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
nextSnapshots.clear();
return max;
}

/** Calculate the minimum snapshot currently being consumed by all readers. */
private Optional<Long> computeMinNextSnapshotId(
Set<Integer> readersAwaitingSplit,
Function<Integer, Long> unassignedCalculationFunction,
int parallelism) {
long globalMinSnapshotId = Long.MAX_VALUE;
for (int subtask = 0; subtask < parallelism; subtask++) {
// 1. if the reader is in the waiting list, it means that all allocated splits have
// been
// consumed, and the next snapshotId is calculated from splitAssigner.
//
// 2. if the reader is not in the waiting list, the larger value between the
// consumption
// progress reported by the reader and the most recently assigned snapshot id is
// used.
Long snapshotIdForSubtask;
if (readersAwaitingSplit.contains(subtask)) {
snapshotIdForSubtask = unassignedCalculationFunction.apply(subtask);
} else {
Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask);
Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask);
if (consumingSnapshotId != null && assignedSnapshotId != null) {
snapshotIdForSubtask = Math.max(consumingSnapshotId, assignedSnapshotId);
} else {
snapshotIdForSubtask =
consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId;
}
}

if (snapshotIdForSubtask != null) {
globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForSubtask);
} else {
return Optional.empty();
}
}
return Optional.of(globalMinSnapshotId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
Expand All @@ -48,11 +47,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand All @@ -76,11 +71,7 @@ public class ContinuousFileSplitEnumerator

protected final SplitAssigner splitAssigner;

private final TreeMap<Long, Long> minNextSnapshotPerCheckpoint;

private final Map<Integer, Long> assignedSnapshotPerReader;

private final Map<Integer, Long> consumingSnapshotPerReader;
private final ConsumerProgressCalculator consumerProgressCalculator;

@Nullable protected Long nextSnapshotId;

Expand All @@ -105,9 +96,8 @@ public ContinuousFileSplitEnumerator(
this.splitAssigner = createSplitAssigner(bucketMode);
addSplits(remainSplits);

this.minNextSnapshotPerCheckpoint = new TreeMap<>();
this.assignedSnapshotPerReader = new HashMap<>(context.currentParallelism());
this.consumingSnapshotPerReader = new HashMap<>(context.currentParallelism());
this.consumerProgressCalculator =
new ConsumerProgressCalculator(context.currentParallelism());
}

@VisibleForTesting
Expand Down Expand Up @@ -156,9 +146,8 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof ReaderConsumeProgressEvent) {
long currentSnapshotId =
((ReaderConsumeProgressEvent) sourceEvent).lastConsumeSnapshotId();
consumingSnapshotPerReader.put(subtaskId, currentSnapshotId);
consumerProgressCalculator.updateConsumeProgress(
subtaskId, (ReaderConsumeProgressEvent) sourceEvent);
} else {
LOG.error("Received unrecognized event: {}", sourceEvent);
}
Expand All @@ -176,22 +165,21 @@ public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception
final PendingSplitsCheckpoint checkpoint =
new PendingSplitsCheckpoint(splits, nextSnapshotId);

LOG.debug("Source Checkpoint is {}", checkpoint);
consumerProgressCalculator.notifySnapshotState(
checkpointId,
readersAwaitingSplit,
subtask -> splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId),
context.currentParallelism());

computeMinNextSnapshotId()
.ifPresent(
minNextSnapshotId ->
minNextSnapshotPerCheckpoint.put(checkpointId, minNextSnapshotId));
LOG.debug("Source Checkpoint is {}", checkpoint);
return checkpoint;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
NavigableMap<Long, Long> nextSnapshots =
minNextSnapshotPerCheckpoint.headMap(checkpointId, true);
OptionalLong max = nextSnapshots.values().stream().mapToLong(Long::longValue).max();
max.ifPresent(scan::notifyCheckpointComplete);
nextSnapshots.clear();
consumerProgressCalculator
.notifyCheckpointComplete(checkpointId)
.ifPresent(scan::notifyCheckpointComplete);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -253,8 +241,7 @@ protected synchronized void assignSplits() {
List<FileStoreSourceSplit> splits = splitAssigner.getNext(task, null);
if (splits.size() > 0) {
assignment.put(task, splits);
TableScanUtils.getSnapshotId(splits.get(0))
.ifPresent(snapshotId -> assignedSnapshotPerReader.put(task, snapshotId));
consumerProgressCalculator.updateAssignInformation(task, splits.get(0));
}
}

Expand Down Expand Up @@ -287,38 +274,6 @@ protected boolean noMoreSplits() {
return finished;
}

/** Calculate the minimum snapshot currently being consumed by all readers. */
private Optional<Long> computeMinNextSnapshotId() {
long globalMinSnapshotId = Long.MAX_VALUE;
for (int subtask = 0; subtask < context.currentParallelism(); subtask++) {
// 1. if the reader is in the waiting list, it means that all allocated splits have been
// consumed, and the next snapshotId is calculated from splitAssigner.
//
// 2. if the reader is not in the waiting list, the larger value between the consumption
// progress reported by the reader and the most recently assigned snapshot id is used.
Long snapshotIdForTask;
if (readersAwaitingSplit.contains(subtask)) {
snapshotIdForTask = splitAssigner.getNextSnapshotId(subtask).orElse(nextSnapshotId);
} else {
Long consumingSnapshotId = consumingSnapshotPerReader.get(subtask);
Long assignedSnapshotId = assignedSnapshotPerReader.get(subtask);
if (consumingSnapshotId != null && assignedSnapshotId != null) {
snapshotIdForTask = Math.max(consumingSnapshotId, assignedSnapshotId);
} else {
snapshotIdForTask =
consumingSnapshotId != null ? consumingSnapshotId : assignedSnapshotId;
}
}

if (snapshotIdForTask != null) {
globalMinSnapshotId = Math.min(globalMinSnapshotId, snapshotIdForTask);
} else {
return Optional.empty();
}
}
return Optional.of(globalMinSnapshotId);
}

/** The result of scan. */
protected static class PlanWithNextSnapshotId {
private final TableScan.Plan plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ public DataStream<RowData> build() {
if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) {
return buildAlignedContinuousFileSource();
} else if (conf.contains(CoreOptions.CONSUMER_ID)
&& conf.get(CoreOptions.CONSUMER_WITH_LEGACY_MODE)) {
&& conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
== CoreOptions.ConsumerMode.EXACTLY_ONCE) {
return buildContinuousStreamOperator();
} else {
return buildContinuousFileSource();
Expand Down

0 comments on commit 983173d

Please sign in to comment.