Skip to content

Commit

Permalink
[flink] Set default parallelism for sink in cases where AQE not suppo…
Browse files Browse the repository at this point in the history
…rted
  • Loading branch information
yunfengzhou-hub committed Sep 19, 2024
1 parent 3ac06dd commit bc85244
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableRunnable;
Expand All @@ -48,10 +47,8 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;

Expand Down Expand Up @@ -225,13 +222,6 @@ public DataStream<Committable> doWrite(
commitUser))
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);

boolean writeMCacheEnabled = table.coreOptions().writeManifestCache().getBytes() > 0;
boolean hashDynamicMode = table.bucketMode() == BucketMode.HASH_DYNAMIC;
if (!isStreaming && (writeMCacheEnabled || hashDynamicMode)) {
assertBatchAdaptiveParallelism(
env, written.getParallelism(), writeMCacheEnabled, hashDynamicMode);
}

Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
Expand Down Expand Up @@ -328,26 +318,6 @@ public static void assertBatchAdaptiveParallelism(
assertBatchAdaptiveParallelism(env, sinkParallelism, msg);
}

public static void assertBatchAdaptiveParallelism(
StreamExecutionEnvironment env,
int sinkParallelism,
boolean writeMCacheEnabled,
boolean hashDynamicMode) {
List<String> messages = new ArrayList<>();
if (writeMCacheEnabled) {
messages.add("Write Manifest Cache");
}
if (hashDynamicMode) {
messages.add("Dynamic Bucket Mode");
}
String msg =
String.format(
"Paimon Sink with %s does not support Flink's Adaptive Parallelism mode. "
+ "Please manually turn it off or set Paimon `sink.parallelism` manually.",
messages);
assertBatchAdaptiveParallelism(env, sinkParallelism, msg);
}

public static void assertBatchAdaptiveParallelism(
StreamExecutionEnvironment env, int sinkParallelism, String exceptionMsg) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
Expand Down Expand Up @@ -208,6 +210,7 @@ public FlinkSinkBuilder clusteringIfPossible(

/** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
parallelism = checkAndUpdateParallelism(input, parallelism);
input = trySortInput(input);
DataStream<InternalRow> input = mapToInternalRow(this.input, table.rowType());
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
Expand Down Expand Up @@ -282,4 +285,39 @@ private DataStream<RowData> trySortInput(DataStream<RowData> input) {
}
return input;
}

private Integer checkAndUpdateParallelism(DataStream<?> input, Integer parallelism) {
try {
boolean parallelismUndefined = parallelism == null || parallelism == -1;
boolean isStreaming = FlinkSink.isStreaming(input);
boolean isAdaptiveParallelismEnabled =
AdaptiveParallelism.isEnabled(input.getExecutionEnvironment());
boolean writeMCacheEnabled = table.coreOptions().writeManifestCache().getBytes() > 0;
boolean hashDynamicMode = table.bucketMode() == BucketMode.HASH_DYNAMIC;
if (parallelismUndefined
&& isStreaming
&& isAdaptiveParallelismEnabled
&& (writeMCacheEnabled || hashDynamicMode)) {
List<String> messages = new ArrayList<>();
if (writeMCacheEnabled) {
messages.add("Write Manifest Cache");
}
if (hashDynamicMode) {
messages.add("Dynamic Bucket Mode");
}
String msg =
String.format(
"Paimon Sink with %s does not support Flink's Adaptive Parallelism mode. "
+ "Configuring sink parallelism to `%s` instead. You can also set Paimon "
+ "`sink.parallelism` manually to override this configuration.",
messages, DEFAULT_PARALLELISM.key());
LOG.warn(msg);
return input.getExecutionEnvironment().getConfiguration().get(DEFAULT_PARALLELISM);
}
return parallelism;
} catch (NoClassDefFoundError ignored) {
// before 1.17, there is no adaptive parallelism
return parallelism;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,22 @@ protected List<String> ddl() {
public void testAQEWithWriteManifest() {
batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')");
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c"))
.hasMessageContaining(
"Paimon Sink with [Write Manifest Cache] does not support Flink's Adaptive Parallelism mode.");

// work fine
batchSql(
"INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT a, b, c FROM T GROUP BY a,b,c");

// work fine too
batchSql("ALTER TABLE T SET ('write-manifest-cache' = '0 b')");
batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c");
assertThat(batchSql("SELECT * FROM T"))
.containsExactlyInAnyOrder(
Row.of(1, 11, 111),
Row.of(2, 22, 222),
Row.of(1, 11, 111),
Row.of(2, 22, 222));
}

@Test
public void testAQEWithDynamicBucket() {
batchSql("CREATE TABLE IF NOT EXISTS D_T (a INT PRIMARY KEY NOT ENFORCED, b INT, c INT)");
batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)");
assertThatThrownBy(() -> batchSql("INSERT INTO D_T SELECT a, b, c FROM T GROUP BY a,b,c"))
.hasMessageContaining(
"Paimon Sink with [Dynamic Bucket Mode] does not support Flink's Adaptive Parallelism mode.");

// work fine
batchSql(
"INSERT INTO D_T /*+ OPTIONS('sink.parallelism'='1') */ SELECT a, b, c FROM T GROUP BY a,b,c");
batchSql("INSERT INTO D_T SELECT a, b, c FROM T GROUP BY a,b,c");
assertThat(batchSql("SELECT * FROM D_T"))
.containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
}

@Test
Expand Down

0 comments on commit bc85244

Please sign in to comment.