Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] support to infer parallelism for system table #3201

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
return new PushedTableSource(
new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode));
new SystemTableSource(
((SystemCatalogTable) origin).table(),
isStreamingMode,
context.getObjectIdentifier()));
} else {
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -67,7 +66,6 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
import static org.apache.paimon.utils.Preconditions.checkState;

/**
Expand All @@ -79,9 +77,6 @@
* LogSourceProvider}.
*/
public class DataTableSource extends FlinkTableSource {
private static final String FLINK_INFER_SCAN_PARALLELISM =
String.format(
"%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());

private final ObjectIdentifier tableIdentifier;
private final boolean streaming;
Expand All @@ -90,8 +85,6 @@ public class DataTableSource extends FlinkTableSource {

@Nullable private WatermarkStrategy<RowData> watermarkStrategy;

private SplitStatistics splitStatistics;

@Nullable private List<String> dynamicPartitionFilteringFields;

public DataTableSource(
Expand Down Expand Up @@ -247,14 +240,6 @@ private DataStream<RowData> configureSource(
return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
}

private void scanSplitsForInference() {
if (splitStatistics == null) {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics = new SplitStatistics(splits);
}
}

@Override
public DataTableSource copy() {
return new DataTableSource(
Expand Down Expand Up @@ -335,24 +320,4 @@ public void applyDynamicFiltering(List<String> candidateFilterFields) {
public boolean isStreaming() {
return streaming;
}

/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {

private final int splitNumber;
private final long totalRowCount;

private SplitStatistics(List<Split> splits) {
this.splitNumber = splits.size();
this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
}

public int splitNumber() {
return splitNumber;
}

public long totalRowCount() {
return totalRowCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.paimon.flink.source;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.table.connector.ChangelogMode;
Expand All @@ -46,16 +48,23 @@
import java.util.List;
import java.util.Optional;

import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;

/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource {

private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);

protected static final String FLINK_INFER_SCAN_PARALLELISM =
String.format(
"%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());

protected final Table table;

@Nullable protected Predicate predicate;
@Nullable protected int[][] projectFields;
@Nullable protected Long limit;
protected SplitStatistics splitStatistics;

public FlinkTableSource(Table table) {
this(table, null, null, null);
Expand Down Expand Up @@ -132,4 +141,36 @@ public void pushLimit(long limit) {
public abstract void applyDynamicFiltering(List<String> candidateFilterFields);

public abstract boolean isStreaming();

protected void scanSplitsForInference() {
if (splitStatistics == null) {
List<Split> splits =
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
splitStatistics = new SplitStatistics(splits);
}
}

/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {

private final int splitNumber;
private final long totalRowCount;

protected SplitStatistics(List<Split> splits) {
this.splitNumber = splits.size();
this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
}

public int splitNumber() {
return splitNumber;
}

public long totalRowCount() {
return totalRowCount;
}
}

public Table getTable() {
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.plan.stats.TableStats;

Expand All @@ -46,13 +51,16 @@ public class SystemTableSource extends FlinkTableSource {
private final boolean isStreamingMode;
private final int splitBatchSize;
private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
private final ObjectIdentifier tableIdentifier;

public SystemTableSource(Table table, boolean isStreamingMode) {
public SystemTableSource(
Table table, boolean isStreamingMode, ObjectIdentifier tableIdentifier) {
super(table);
this.isStreamingMode = isStreamingMode;
Options options = Options.fromMap(table.options());
this.splitBatchSize = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
this.splitAssignMode = options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE);
this.tableIdentifier = tableIdentifier;
}

public SystemTableSource(
Expand All @@ -62,11 +70,13 @@ public SystemTableSource(
@Nullable int[][] projectFields,
@Nullable Long limit,
int splitBatchSize,
FlinkConnectorOptions.SplitAssignMode splitAssignMode) {
FlinkConnectorOptions.SplitAssignMode splitAssignMode,
ObjectIdentifier tableIdentifier) {
super(table, predicate, projectFields, limit);
this.isStreamingMode = isStreamingMode;
this.splitBatchSize = splitBatchSize;
this.splitAssignMode = splitAssignMode;
this.tableIdentifier = tableIdentifier;
}

@Override
Expand All @@ -85,7 +95,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
} else {
source = new StaticFileStoreSource(readBuilder, limit, splitBatchSize, splitAssignMode);
}
return SourceProvider.of(source);
return new PaimonDataStreamScanProvider(
source.getBoundedness() == Boundedness.BOUNDED,
env ->
configSourceParallelism(
env,
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
tableIdentifier.asSummaryString())));
}

@Override
Expand All @@ -97,7 +115,8 @@ public SystemTableSource copy() {
projectFields,
limit,
splitBatchSize,
splitAssignMode);
splitAssignMode,
tableIdentifier);
}

@Override
Expand Down Expand Up @@ -139,4 +158,34 @@ public void applyDynamicFiltering(List<String> candidateFilterFields) {
public boolean isStreaming() {
return isStreamingMode;
}

private DataStreamSource<RowData> configSourceParallelism(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reuse DataTableSource.configureSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think for system table, the streaming mode's parallelism should not be the options.get(CoreOptions.BUCKET). It's better reuse this method, will refactor it.

StreamExecutionEnvironment env, DataStreamSource<RowData> source) {
Options options = Options.fromMap(this.table.options());
Configuration envConfig = (Configuration) env.getConfiguration();
if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
options.set(
FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
scanSplitsForInference();
parallelism = splitStatistics.splitNumber();
if (null != limit && limit > 0) {
int limitCount = limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : limit.intValue();
parallelism = Math.min(parallelism, limitCount);
}

parallelism = Math.max(1, parallelism);
parallelism =
Math.min(
parallelism,
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
}
if (parallelism != null) {
source.setParallelism(parallelism);
}
return source;
}
}
Loading