Skip to content

Commit

Permalink
[flink] support to infer parallelism for system table
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Apr 11, 2024
1 parent 5fbf727 commit afc6bc7
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
return new PushedTableSource(
new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode));
new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier()));
} else {
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -67,7 +66,6 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
import static org.apache.paimon.utils.Preconditions.checkState;

/**
Expand All @@ -79,9 +77,6 @@
* LogSourceProvider}.
*/
public class DataTableSource extends FlinkTableSource {
private static final String FLINK_INFER_SCAN_PARALLELISM =
String.format(
"%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());

private final ObjectIdentifier tableIdentifier;
private final boolean streaming;
Expand All @@ -90,8 +85,6 @@ public class DataTableSource extends FlinkTableSource {

@Nullable private WatermarkStrategy<RowData> watermarkStrategy;

private SplitStatistics splitStatistics;

@Nullable private List<String> dynamicPartitionFilteringFields;

public DataTableSource(
Expand Down Expand Up @@ -247,14 +240,6 @@ private DataStream<RowData> configureSource(
return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
}

private void scanSplitsForInference() {
if (splitStatistics == null) {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics = new SplitStatistics(splits);
}
}

@Override
public DataTableSource copy() {
return new DataTableSource(
Expand Down Expand Up @@ -335,24 +320,4 @@ public void applyDynamicFiltering(List<String> candidateFilterFields) {
public boolean isStreaming() {
return streaming;
}

/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {

private final int splitNumber;
private final long totalRowCount;

private SplitStatistics(List<Split> splits) {
this.splitNumber = splits.size();
this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
}

public int splitNumber() {
return splitNumber;
}

public long totalRowCount() {
return totalRowCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.paimon.flink.source;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.table.connector.ChangelogMode;
Expand All @@ -46,16 +48,23 @@
import java.util.List;
import java.util.Optional;

import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;

/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource {

private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);

protected static final String FLINK_INFER_SCAN_PARALLELISM =
String.format(
"%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());

protected final Table table;

@Nullable protected Predicate predicate;
@Nullable protected int[][] projectFields;
@Nullable protected Long limit;
protected SplitStatistics splitStatistics;

public FlinkTableSource(Table table) {
this(table, null, null, null);
Expand Down Expand Up @@ -132,4 +141,36 @@ public void pushLimit(long limit) {
public abstract void applyDynamicFiltering(List<String> candidateFilterFields);

public abstract boolean isStreaming();

protected void scanSplitsForInference() {
if (splitStatistics == null) {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics = new SplitStatistics(splits);
}
}

/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {

private final int splitNumber;
private final long totalRowCount;

protected SplitStatistics(List<Split> splits) {
this.splitNumber = splits.size();
this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
}

public int splitNumber() {
return splitNumber;
}

public long totalRowCount() {
return totalRowCount;
}
}

public Table getTable() {
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.plan.stats.TableStats;

Expand All @@ -46,13 +51,16 @@ public class SystemTableSource extends FlinkTableSource {
private final boolean isStreamingMode;
private final int splitBatchSize;
private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
private final ObjectIdentifier tableIdentifier;

public SystemTableSource(Table table, boolean isStreamingMode) {
public SystemTableSource(
Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) {
super(table);
this.isStreamingMode = isStreamingMode;
Options options = Options.fromMap(table.options());
this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE);
this.tableIdentifier = tableIdentifier;
}

public SystemTableSource(
Expand All @@ -62,11 +70,13 @@ public SystemTableSource(
@Nullable int[][] projectFields,
@Nullable Long limit,
int splitBatchSize,
FlinkConnectorOptions.SplitAssignMode splitAssignMode) {
FlinkConnectorOptions.SplitAssignMode splitAssignMode,
ObjectIdentifier tableIdentifier) {
super(table, predicate, projectFields, limit);
this.isStreamingMode = isStreamingMode;
this.splitBatchSize = splitBatchSize;
this.splitAssignMode = splitAssignMode;
this.tableIdentifier = tableIdentifier;
}

@Override
Expand All @@ -85,7 +95,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
} else {
source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode);
}
return SourceProvider.of(source);
return new PaimonDataStreamScanProvider(
source.getBoundedness() == Boundedness.BOUNDED,
env ->
configSourceParallelism(
env,
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
tableIdentifier.asSummaryString())));
}

@Override
Expand All @@ -97,7 +115,8 @@ public SystemTableSource copy() {
projectFields,
limit,
splitBatchSize,
splitAssignMode);
splitAssignMode,
tableIdentifier);
}

@Override
Expand Down Expand Up @@ -139,4 +158,34 @@ public void applyDynamicFiltering(List<String> candidateFilterFields) {
public boolean isStreaming() {
return isStreamingMode;
}

private DataStreamSource<RowData> configSourceParallelism(
StreamExecutionEnvironment env, DataStreamSource<RowData> source) {
Options options = Options.fromMap(this.table.options());
Configuration envConfig = (Configuration) env.getConfiguration();
if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
options.set(
FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
scanSplitsForInference();
parallelism = splitStatistics.splitNumber();
if (null != limit && limit > 0) {
int limitCount = limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
parallelism = Math.min(parallelism, limitCount);
}

parallelism = Math.max(1, parallelism);
parallelism =
Math.min(
parallelism,
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
}
if (parallelism != null) {
source.setParallelism(parallelism);
}
return source;
}
}

0 comments on commit afc6bc7

Please sign in to comment.