Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
apply 1958.patch
Browse files Browse the repository at this point in the history
[flink][source] Supports disable infer scan parallelism by flink job dynamic option
wxplovecc committed Sep 11, 2023
1 parent e3cba29 commit d04e496
Showing 3 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -32,6 +32,8 @@
/** Utility class for {@link Options} related helper functions. */
public class OptionsUtils {

public static final String PAIMON_PREFIX = "paimon.";

// --------------------------------------------------------------------------------------------
// Type conversion
// --------------------------------------------------------------------------------------------
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@
import org.apache.paimon.utils.Projection;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -68,6 +69,7 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;

/**
* Table source to create {@link StaticFileStoreSource} or {@link ContinuousFileStoreSource} under
@@ -78,6 +80,9 @@
* LogSourceProvider}.
*/
public class DataTableSource extends FlinkTableSource {
private static final String FLINK_INFER_SCAN_PARALLELISM =
String.format(
"%s%s", PAIMON_PREFIX, FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());

private final ObjectIdentifier tableIdentifier;
protected final boolean streaming;
@@ -220,6 +225,12 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
private DataStream<RowData> configureSource(
FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
Options options = Options.fromMap(this.table.options());
Configuration envConfig = (Configuration) env.getConfiguration();
if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
options.set(
FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
}
Integer parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
if (streaming) {
Original file line number Diff line number Diff line change
@@ -40,11 +40,11 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;

/** Utils for create {@link FileStoreTable} and {@link Predicate}. */
public class HiveUtils {

private static final String PAIMON_PREFIX = "paimon.";

public static FileStoreTable createFileStoreTable(JobConf jobConf) {
Options options = extractCatalogConfig(jobConf);
options.set(CoreOptions.PATH, LocationKeyExtractor.getPaimonLocation(jobConf));

0 comments on commit d04e496

Please sign in to comment.