From d5afd0917d138c2881a2a0c5caf3d693689d4674 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Wed, 3 Jan 2024 13:39:21 +0800 Subject: [PATCH] [flink] Introduce parallelism execution for lookup join (#2614) --- .../flink_connector_configuration.html | 6 + .../paimon/utils/ParallelExecution.java | 4 +- .../paimon/crosspartition/IndexBootstrap.java | 72 +++------- .../paimon/io/SplitsParallelReadUtil.java | 132 ++++++++++++++++++ .../paimon/utils/ParallelExecutionTest.java | 2 +- .../paimon/flink/FlinkConnectorOptions.java | 7 + .../flink/lookup/FileStoreLookupFunction.java | 4 +- .../flink/lookup/TableStreamingReader.java | 38 +++-- 8 files changed, 198 insertions(+), 67 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index dcd934223515..84af60aee9e8 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -44,6 +44,12 @@ Integer The thread number for lookup async. + +
lookup.bootstrap-parallelism
+ 4 + Integer + The parallelism for bootstrap in a single task for lookup join. +
scan.infer-parallelism
true diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java index ab5866e429ad..9939b70c2571 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java @@ -183,7 +183,7 @@ public void releaseBatch() { } @Override - public E extraMesage() { + public E extraMessage() { return extraMessage; } }; @@ -197,6 +197,6 @@ public interface ParallelBatch { void releaseBatch(); - E extraMesage(); + E extraMessage(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java index e99f47f36a73..465955e71aab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java @@ -23,7 +23,6 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.JoinedRow; -import org.apache.paimon.data.serializer.InternalSerializers; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; @@ -35,26 +34,22 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.ParallelExecution; -import org.apache.paimon.utils.ParallelExecution.ParallelBatch; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.TypeUtils; import java.io.IOException; import java.io.Serializable; -import java.io.UncheckedIOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.paimon.CoreOptions.SCAN_MODE; import static org.apache.paimon.CoreOptions.StartupMode.LATEST; +import static org.apache.paimon.io.SplitsParallelReadUtil.parallelExecute; /** Bootstrap key index from Paimon table. */ public class IndexBootstrap implements Serializable { @@ -71,6 +66,10 @@ public IndexBootstrap(Table table) { public void bootstrap(int numAssigners, int assignId, Consumer collector) throws IOException { + bootstrap(numAssigners, assignId).forEachRemaining(collector); + } + + public RecordReader bootstrap(int numAssigners, int assignId) throws IOException { RowType rowType = table.rowType(); List fieldNames = rowType.getFieldNames(); int[] keyProjection = @@ -103,58 +102,25 @@ public void bootstrap(int numAssigners, int assignId, Consumer coll .collect(Collectors.toList()); } - List, GenericRow>>> suppliers = new ArrayList<>(); RowDataToObjectArrayConverter partBucketConverter = new RowDataToObjectArrayConverter( TypeUtils.concat( TypeUtils.project(rowType, table.partitionKeys()), RowType.of(DataTypes.INT()))); - for (Split split : splits) { - suppliers.add( - () -> { - try { - RecordReader reader = - readBuilder.newRead().createReader(split); - DataSplit dataSplit = ((DataSplit) split); - int bucket = dataSplit.bucket(); - GenericRow partAndBucket = - partBucketConverter.toGenericRow( - new JoinedRow( - dataSplit.partition(), GenericRow.of(bucket))); - return Pair.of(reader, partAndBucket); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - } - ParallelExecution execution = - new ParallelExecution<>( - InternalSerializers.create(TypeUtils.project(rowType, keyProjection)), - options.pageSize(), - options.crossPartitionUpsertBootstrapParallelism(), - suppliers); - JoinedRow joinedRow = new JoinedRow(); - while (true) { - try { - ParallelBatch batch = execution.take(); - if (batch == null) { - break; - } - - while (true) { - InternalRow row = batch.next(); - if (row == null) { - batch.releaseBatch(); - break; - } - - collector.accept(joinedRow.replace(row, batch.extraMesage())); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } + + return parallelExecute( + TypeUtils.project(rowType, keyProjection), + readBuilder, + splits, + options.pageSize(), + options.crossPartitionUpsertBootstrapParallelism(), + split -> { + DataSplit dataSplit = ((DataSplit) split); + int bucket = dataSplit.bucket(); + return partBucketConverter.toGenericRow( + new JoinedRow(dataSplit.partition(), GenericRow.of(bucket))); + }, + (row, extra) -> new JoinedRow().replace(row, extra)); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java new file mode 100644 index 000000000000..164d0f2f70fb --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/SplitsParallelReadUtil.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.ParallelExecution; +import org.apache.paimon.utils.ParallelExecution.ParallelBatch; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * An util class to wrap {@link ParallelExecution} to parallel execution for {@link Split} reader. + */ +public class SplitsParallelReadUtil { + + public static RecordReader parallelExecute( + RowType projectedType, + ReadBuilder readBuilder, + List splits, + int pageSize, + int parallelism) { + return parallelExecute( + projectedType, + readBuilder, + splits, + pageSize, + parallelism, + split -> null, + (row, unused) -> row); + } + + public static RecordReader parallelExecute( + RowType projectedType, + ReadBuilder readBuilder, + List splits, + int pageSize, + int parallelism, + Function extraFunction, + BiFunction addExtraToRow) { + List, EXTRA>>> suppliers = new ArrayList<>(); + for (Split split : splits) { + suppliers.add( + () -> { + try { + RecordReader reader = + readBuilder.newRead().createReader(split); + return Pair.of(reader, extraFunction.apply(split)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + ParallelExecution execution = + new ParallelExecution<>( + InternalSerializers.create(projectedType), + pageSize, + parallelism, + suppliers); + + return new RecordReader() { + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + ParallelBatch batch; + try { + batch = execution.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + if (batch == null) { + return null; + } + + return new RecordIterator() { + @Nullable + @Override + public InternalRow next() throws IOException { + InternalRow row = batch.next(); + if (row == null) { + return null; + } + + return addExtraToRow.apply(row, batch.extraMessage()); + } + + @Override + public void releaseBatch() { + batch.releaseBatch(); + } + }; + } + + @Override + public void close() throws IOException { + execution.close(); + } + }; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java index 01bb81f11dd1..f0ec847df668 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java @@ -170,7 +170,7 @@ private List> collect(ParallelExecution break; } - result.add(Pair.of(record, batch.extraMesage())); + result.add(Pair.of(record, batch.extraMessage())); } } catch (InterruptedException | IOException e) { Thread.currentThread().interrupt(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 2e3bec8141d3..f89212bc6db8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -274,6 +274,13 @@ public class FlinkConnectorOptions { .defaultValue(false) .withDescription("Whether to enable async lookup join."); + public static final ConfigOption LOOKUP_BOOTSTRAP_PARALLELISM = + ConfigOptions.key("lookup.bootstrap-parallelism") + .intType() + .defaultValue(4) + .withDescription( + "The parallelism for bootstrap in a single task for lookup join."); + public static final ConfigOption LOOKUP_ASYNC_THREAD_NUMBER = ConfigOptions.key("lookup.async-thread-number") .intType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index b854f965c997..56c866993888 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -162,7 +162,7 @@ private void bulkLoad(Options options) throws Exception { RocksDBState.createBulkLoadSorter( IOManager.create(path.toString()), new CoreOptions(options)); try (RecordReaderIterator batch = - new RecordReaderIterator<>(streamingReader.nextBatch())) { + new RecordReaderIterator<>(streamingReader.nextBatch(true))) { while (batch.hasNext()) { InternalRow row = batch.next(); if (lookupTable.recordFilter().test(row)) { @@ -252,7 +252,7 @@ private void checkRefresh() throws Exception { private void refresh() throws Exception { while (true) { try (RecordReaderIterator batch = - new RecordReaderIterator<>(streamingReader.nextBatch())) { + new RecordReaderIterator<>(streamingReader.nextBatch(false))) { if (!batch.hasNext()) { return; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java index 1582a384e5a2..c737458d7045 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java @@ -20,7 +20,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.io.SplitsParallelReadUtil; import org.apache.paimon.mergetree.compact.ConcatRecordReader; +import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateFilter; import org.apache.paimon.reader.RecordReader; @@ -44,16 +46,21 @@ import java.util.function.IntUnaryOperator; import java.util.stream.IntStream; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_BOOTSTRAP_PARALLELISM; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; /** A streaming reader to read table. */ public class TableStreamingReader { + private final Table table; + private final int[] projection; private final ReadBuilder readBuilder; @Nullable private final PredicateFilter recordFilter; private final StreamTableScan scan; public TableStreamingReader(Table table, int[] projection, @Nullable Predicate predicate) { + this.table = table; + this.projection = projection; if (CoreOptions.fromMap(table.options()).startupMode() != CoreOptions.StartupMode.COMPACTED_FULL) { table = @@ -92,23 +99,36 @@ public TableStreamingReader(Table table, int[] projection, @Nullable Predicate p } } - public RecordReader nextBatch() throws Exception { + public RecordReader nextBatch(boolean useParallelism) throws Exception { try { - return read(scan.plan()); + return read(scan.plan(), useParallelism); } catch (EndOfScanException e) { throw new IllegalArgumentException( "TableStreamingReader does not support finished enumerator.", e); } } - private RecordReader read(TableScan.Plan plan) throws IOException { - TableRead read = readBuilder.newRead(); - - List> readers = new ArrayList<>(); - for (Split split : plan.splits()) { - readers.add(() -> read.createReader(split)); + private RecordReader read(TableScan.Plan plan, boolean useParallelism) + throws IOException { + RecordReader reader; + if (useParallelism) { + CoreOptions options = CoreOptions.fromMap(table.options()); + reader = + SplitsParallelReadUtil.parallelExecute( + TypeUtils.project(table.rowType(), projection), + readBuilder, + plan.splits(), + options.pageSize(), + new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM)); + } else { + TableRead read = readBuilder.newRead(); + List> readers = new ArrayList<>(); + for (Split split : plan.splits()) { + readers.add(() -> read.createReader(split)); + } + reader = ConcatRecordReader.create(readers); } - RecordReader reader = ConcatRecordReader.create(readers); + if (recordFilter != null) { reader = reader.filter(recordFilter::test); }