diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index dcd934223515..84af60aee9e8 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -44,6 +44,12 @@
Integer |
The thread number for lookup async. |
+
+ lookup.bootstrap-parallelism |
+ 4 |
+ Integer |
+ The parallelism for bootstrap in a single task for lookup join. |
+
scan.infer-parallelism |
true |
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
index ab5866e429ad..9939b70c2571 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
@@ -183,7 +183,7 @@ public void releaseBatch() {
}
@Override
- public E extraMesage() {
+ public E extraMessage() {
return extraMessage;
}
};
@@ -197,6 +197,6 @@ public interface ParallelBatch {
void releaseBatch();
- E extraMesage();
+ E extraMessage();
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index e99f47f36a73..465955e71aab 100644
--- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -23,7 +23,6 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
-import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
@@ -35,26 +34,22 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.ParallelExecution;
-import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.TypeUtils;
import java.io.IOException;
import java.io.Serializable;
-import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.StartupMode.LATEST;
+import static org.apache.paimon.io.SplitsParallelReadUtil.parallelExecute;
/** Bootstrap key index from Paimon table. */
public class IndexBootstrap implements Serializable {
@@ -71,6 +66,10 @@ public IndexBootstrap(Table table) {
public void bootstrap(int numAssigners, int assignId, Consumer collector)
throws IOException {
+ bootstrap(numAssigners, assignId).forEachRemaining(collector);
+ }
+
+ public RecordReader bootstrap(int numAssigners, int assignId) throws IOException {
RowType rowType = table.rowType();
List fieldNames = rowType.getFieldNames();
int[] keyProjection =
@@ -103,58 +102,25 @@ public void bootstrap(int numAssigners, int assignId, Consumer coll
.collect(Collectors.toList());
}
- List, GenericRow>>> suppliers = new ArrayList<>();
RowDataToObjectArrayConverter partBucketConverter =
new RowDataToObjectArrayConverter(
TypeUtils.concat(
TypeUtils.project(rowType, table.partitionKeys()),
RowType.of(DataTypes.INT())));
- for (Split split : splits) {
- suppliers.add(
- () -> {
- try {
- RecordReader reader =
- readBuilder.newRead().createReader(split);
- DataSplit dataSplit = ((DataSplit) split);
- int bucket = dataSplit.bucket();
- GenericRow partAndBucket =
- partBucketConverter.toGenericRow(
- new JoinedRow(
- dataSplit.partition(), GenericRow.of(bucket)));
- return Pair.of(reader, partAndBucket);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
- }
- ParallelExecution execution =
- new ParallelExecution<>(
- InternalSerializers.create(TypeUtils.project(rowType, keyProjection)),
- options.pageSize(),
- options.crossPartitionUpsertBootstrapParallelism(),
- suppliers);
- JoinedRow joinedRow = new JoinedRow();
- while (true) {
- try {
- ParallelBatch batch = execution.take();
- if (batch == null) {
- break;
- }
-
- while (true) {
- InternalRow row = batch.next();
- if (row == null) {
- batch.releaseBatch();
- break;
- }
-
- collector.accept(joinedRow.replace(row, batch.extraMesage()));
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
+
+ return parallelExecute(
+ TypeUtils.project(rowType, keyProjection),
+ readBuilder,
+ splits,
+ options.pageSize(),
+ options.crossPartitionUpsertBootstrapParallelism(),
+ split -> {
+ DataSplit dataSplit = ((DataSplit) split);
+ int bucket = dataSplit.bucket();
+ return partBucketConverter.toGenericRow(
+ new JoinedRow(dataSplit.partition(), GenericRow.of(bucket)));
+ },
+ (row, extra) -> new JoinedRow().replace(row, extra));
}
@VisibleForTesting
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
new file mode 100644
index 000000000000..164d0f2f70fb
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ParallelExecution;
+import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * An util class to wrap {@link ParallelExecution} to parallel execution for {@link Split} reader.
+ */
+public class SplitsParallelReadUtil {
+
+ public static RecordReader parallelExecute(
+ RowType projectedType,
+ ReadBuilder readBuilder,
+ List splits,
+ int pageSize,
+ int parallelism) {
+ return parallelExecute(
+ projectedType,
+ readBuilder,
+ splits,
+ pageSize,
+ parallelism,
+ split -> null,
+ (row, unused) -> row);
+ }
+
+ public static RecordReader parallelExecute(
+ RowType projectedType,
+ ReadBuilder readBuilder,
+ List splits,
+ int pageSize,
+ int parallelism,
+ Function extraFunction,
+ BiFunction addExtraToRow) {
+ List, EXTRA>>> suppliers = new ArrayList<>();
+ for (Split split : splits) {
+ suppliers.add(
+ () -> {
+ try {
+ RecordReader reader =
+ readBuilder.newRead().createReader(split);
+ return Pair.of(reader, extraFunction.apply(split));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+
+ ParallelExecution execution =
+ new ParallelExecution<>(
+ InternalSerializers.create(projectedType),
+ pageSize,
+ parallelism,
+ suppliers);
+
+ return new RecordReader() {
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ ParallelBatch batch;
+ try {
+ batch = execution.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+
+ if (batch == null) {
+ return null;
+ }
+
+ return new RecordIterator() {
+ @Nullable
+ @Override
+ public InternalRow next() throws IOException {
+ InternalRow row = batch.next();
+ if (row == null) {
+ return null;
+ }
+
+ return addExtraToRow.apply(row, batch.extraMessage());
+ }
+
+ @Override
+ public void releaseBatch() {
+ batch.releaseBatch();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ execution.close();
+ }
+ };
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
index 01bb81f11dd1..f0ec847df668 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
@@ -170,7 +170,7 @@ private List> collect(ParallelExecution
break;
}
- result.add(Pair.of(record, batch.extraMesage()));
+ result.add(Pair.of(record, batch.extraMessage()));
}
} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 2e3bec8141d3..f89212bc6db8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -274,6 +274,13 @@ public class FlinkConnectorOptions {
.defaultValue(false)
.withDescription("Whether to enable async lookup join.");
+ public static final ConfigOption LOOKUP_BOOTSTRAP_PARALLELISM =
+ ConfigOptions.key("lookup.bootstrap-parallelism")
+ .intType()
+ .defaultValue(4)
+ .withDescription(
+ "The parallelism for bootstrap in a single task for lookup join.");
+
public static final ConfigOption LOOKUP_ASYNC_THREAD_NUMBER =
ConfigOptions.key("lookup.async-thread-number")
.intType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index b854f965c997..56c866993888 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -162,7 +162,7 @@ private void bulkLoad(Options options) throws Exception {
RocksDBState.createBulkLoadSorter(
IOManager.create(path.toString()), new CoreOptions(options));
try (RecordReaderIterator batch =
- new RecordReaderIterator<>(streamingReader.nextBatch())) {
+ new RecordReaderIterator<>(streamingReader.nextBatch(true))) {
while (batch.hasNext()) {
InternalRow row = batch.next();
if (lookupTable.recordFilter().test(row)) {
@@ -252,7 +252,7 @@ private void checkRefresh() throws Exception {
private void refresh() throws Exception {
while (true) {
try (RecordReaderIterator batch =
- new RecordReaderIterator<>(streamingReader.nextBatch())) {
+ new RecordReaderIterator<>(streamingReader.nextBatch(false))) {
if (!batch.hasNext()) {
return;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
index 1582a384e5a2..c737458d7045 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
@@ -20,7 +20,9 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReader;
@@ -44,16 +46,21 @@
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** A streaming reader to read table. */
public class TableStreamingReader {
+ private final Table table;
+ private final int[] projection;
private final ReadBuilder readBuilder;
@Nullable private final PredicateFilter recordFilter;
private final StreamTableScan scan;
public TableStreamingReader(Table table, int[] projection, @Nullable Predicate predicate) {
+ this.table = table;
+ this.projection = projection;
if (CoreOptions.fromMap(table.options()).startupMode()
!= CoreOptions.StartupMode.COMPACTED_FULL) {
table =
@@ -92,23 +99,36 @@ public TableStreamingReader(Table table, int[] projection, @Nullable Predicate p
}
}
- public RecordReader nextBatch() throws Exception {
+ public RecordReader nextBatch(boolean useParallelism) throws Exception {
try {
- return read(scan.plan());
+ return read(scan.plan(), useParallelism);
} catch (EndOfScanException e) {
throw new IllegalArgumentException(
"TableStreamingReader does not support finished enumerator.", e);
}
}
- private RecordReader read(TableScan.Plan plan) throws IOException {
- TableRead read = readBuilder.newRead();
-
- List> readers = new ArrayList<>();
- for (Split split : plan.splits()) {
- readers.add(() -> read.createReader(split));
+ private RecordReader read(TableScan.Plan plan, boolean useParallelism)
+ throws IOException {
+ RecordReader reader;
+ if (useParallelism) {
+ CoreOptions options = CoreOptions.fromMap(table.options());
+ reader =
+ SplitsParallelReadUtil.parallelExecute(
+ TypeUtils.project(table.rowType(), projection),
+ readBuilder,
+ plan.splits(),
+ options.pageSize(),
+ new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
+ } else {
+ TableRead read = readBuilder.newRead();
+ List> readers = new ArrayList<>();
+ for (Split split : plan.splits()) {
+ readers.add(() -> read.createReader(split));
+ }
+ reader = ConcatRecordReader.create(readers);
}
- RecordReader reader = ConcatRecordReader.create(readers);
+
if (recordFilter != null) {
reader = reader.filter(recordFilter::test);
}