Skip to content

Commit

Permalink
[flink] support to infer parallelism for system table (#3201)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Apr 12, 2024
1 parent 8bc4fae commit dc45524
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
return new PushedTableSource(
new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode));
new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier()));
} else {
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.LogConsistency;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogSourceProvider;
Expand All @@ -32,13 +31,9 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
Expand Down Expand Up @@ -67,7 +62,6 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
import static org.apache.paimon.utils.Preconditions.checkState;

/**
Expand All @@ -79,9 +73,6 @@
* LogSourceProvider}.
*/
public class DataTableSource extends FlinkTableSource {
private static final String FLINK_INFER_SCAN_PARALLELISM =
String.format(
"%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());

private final ObjectIdentifier tableIdentifier;
private final boolean streaming;
Expand All @@ -90,8 +81,6 @@ public class DataTableSource extends FlinkTableSource {

@Nullable private WatermarkStrategy<RowData> watermarkStrategy;

private SplitStatistics splitStatistics;

@Nullable private List<String> dynamicPartitionFilteringFields;

public DataTableSource(
Expand Down Expand Up @@ -211,48 +200,12 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);

return new PaimonDataStreamScanProvider(
!streaming, env -> configureSource(sourceBuilder, env));
}

private DataStream<RowData> configureSource(
FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
Options options = Options.fromMap(this.table.options());
Configuration envConfig = (Configuration) env.getConfiguration();
if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
options.set(
FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
if (streaming) {
parallelism = options.get(CoreOptions.BUCKET);
} else {
scanSplitsForInference();
parallelism = splitStatistics.splitNumber();
if (null != limit && limit > 0) {
int limitCount =
limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
parallelism = Math.min(parallelism, limitCount);
}

parallelism = Math.max(1, parallelism);
}
parallelism =
Math.min(
parallelism,
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
}

return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
}

private void scanSplitsForInference() {
if (splitStatistics == null) {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics = new SplitStatistics(splits);
}
!streaming,
env ->
sourceBuilder
.withParallelism(inferSourceParallelism(env))
.withEnv(env)
.build());
}

@Override
Expand Down Expand Up @@ -335,24 +288,4 @@ public void applyDynamicFiltering(List<String> candidateFilterFields) {
public boolean isStreaming() {
return streaming;
}

/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {

private final int splitNumber;
private final long totalRowCount;

private SplitStatistics(List<Split> splits) {
this.splitNumber = splits.size();
this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
}

public int splitNumber() {
return splitNumber;
}

public long totalRowCount() {
return totalRowCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.paimon.flink.source;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
Expand All @@ -46,16 +52,23 @@
import java.util.List;
import java.util.Optional;

import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;

/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource {

private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);

protected static final String FLINK_INFER_SCAN_PARALLELISM =
String.format(
"%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());

protected final Table table;

@Nullable protected Predicate predicate;
@Nullable protected int[][] projectFields;
@Nullable protected Long limit;
protected SplitStatistics splitStatistics;

public FlinkTableSource(Table table) {
this(table, null, null, null);
Expand Down Expand Up @@ -132,4 +145,68 @@ public void pushLimit(long limit) {
public abstract void applyDynamicFiltering(List<String> candidateFilterFields);

public abstract boolean isStreaming();

@Nullable
protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
Options options = Options.fromMap(this.table.options());
Configuration envConfig = (Configuration) env.getConfiguration();
if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
options.set(
FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
if (isStreaming()) {
parallelism = Math.max(1, options.get(CoreOptions.BUCKET));
} else {
scanSplitsForInference();
parallelism = splitStatistics.splitNumber();
if (null != limit && limit > 0) {
int limitCount =
limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
parallelism = Math.min(parallelism, limitCount);
}

parallelism = Math.max(1, parallelism);
parallelism =
Math.min(
parallelism,
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
}
}
return parallelism;
}

protected void scanSplitsForInference() {
if (splitStatistics == null) {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics = new SplitStatistics(splits);
}
}

/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {

private final int splitNumber;
private final long totalRowCount;

protected SplitStatistics(List<Split> splits) {
this.splitNumber = splits.size();
this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
}

public int splitNumber() {
return splitNumber;
}

public long totalRowCount() {
return totalRowCount;
}
}

public Table getTable() {
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.plan.stats.TableStats;

Expand All @@ -46,13 +49,16 @@ public class SystemTableSource extends FlinkTableSource {
private final boolean isStreamingMode;
private final int splitBatchSize;
private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
private final ObjectIdentifier tableIdentifier;

public SystemTableSource(Table table, boolean isStreamingMode) {
public SystemTableSource(
Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) {
super(table);
this.isStreamingMode = isStreamingMode;
Options options = Options.fromMap(table.options());
this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE);
this.tableIdentifier = tableIdentifier;
}

public SystemTableSource(
Expand All @@ -62,11 +68,13 @@ public SystemTableSource(
@Nullable int[][] projectFields,
@Nullable Long limit,
int splitBatchSize,
FlinkConnectorOptions.SplitAssignMode splitAssignMode) {
FlinkConnectorOptions.SplitAssignMode splitAssignMode,
ObjectIdentifier tableIdentifier) {
super(table, predicate, projectFields, limit);
this.isStreamingMode = isStreamingMode;
this.splitBatchSize = splitBatchSize;
this.splitAssignMode = splitAssignMode;
this.tableIdentifier = tableIdentifier;
}

@Override
Expand All @@ -85,7 +93,20 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
} else {
source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode);
}
return SourceProvider.of(source);
return new PaimonDataStreamScanProvider(
source.getBoundedness() == Boundedness.BOUNDED,
env -> {
Integer parallelism = inferSourceParallelism(env);
DataStreamSource<RowData> dataStreamSource =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
tableIdentifier.asSummaryString());
if (parallelism != null) {
dataStreamSource.setParallelism(parallelism);
}
return dataStreamSource;
});
}

@Override
Expand All @@ -97,7 +118,8 @@ public SystemTableSource copy() {
projectFields,
limit,
splitBatchSize,
splitAssignMode);
splitAssignMode,
tableIdentifier);
}

@Override
Expand Down
Loading

0 comments on commit dc45524

Please sign in to comment.