Skip to content

Commit

Permalink
[flink] Introduce parallelism execution for lookup join (apache#2614)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 3, 2024
1 parent b3fd931 commit d5afd09
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<td>Integer</td>
<td>The thread number for lookup async.</td>
</tr>
<tr>
<td><h5>lookup.bootstrap-parallelism</h5></td>
<td style="word-wrap: break-word;">4</td>
<td>Integer</td>
<td>The parallelism for bootstrap in a single task for lookup join.</td>
</tr>
<tr>
<td><h5>scan.infer-parallelism</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void releaseBatch() {
}

@Override
public E extraMesage() {
public E extraMessage() {
return extraMessage;
}
};
Expand All @@ -197,6 +197,6 @@ public interface ParallelBatch<T, E> {

void releaseBatch();

E extraMesage();
E extraMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
Expand All @@ -35,26 +34,22 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallelExecution;
import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.TypeUtils;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.StartupMode.LATEST;
import static org.apache.paimon.io.SplitsParallelReadUtil.parallelExecute;

/** Bootstrap key index from Paimon table. */
public class IndexBootstrap implements Serializable {
Expand All @@ -71,6 +66,10 @@ public IndexBootstrap(Table table) {

public void bootstrap(int numAssigners, int assignId, Consumer<InternalRow> collector)
throws IOException {
bootstrap(numAssigners, assignId).forEachRemaining(collector);
}

public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) throws IOException {
RowType rowType = table.rowType();
List<String> fieldNames = rowType.getFieldNames();
int[] keyProjection =
Expand Down Expand Up @@ -103,58 +102,25 @@ public void bootstrap(int numAssigners, int assignId, Consumer<InternalRow> coll
.collect(Collectors.toList());
}

List<Supplier<Pair<RecordReader<InternalRow>, GenericRow>>> suppliers = new ArrayList<>();
RowDataToObjectArrayConverter partBucketConverter =
new RowDataToObjectArrayConverter(
TypeUtils.concat(
TypeUtils.project(rowType, table.partitionKeys()),
RowType.of(DataTypes.INT())));
for (Split split : splits) {
suppliers.add(
() -> {
try {
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(split);
DataSplit dataSplit = ((DataSplit) split);
int bucket = dataSplit.bucket();
GenericRow partAndBucket =
partBucketConverter.toGenericRow(
new JoinedRow(
dataSplit.partition(), GenericRow.of(bucket)));
return Pair.of(reader, partAndBucket);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
ParallelExecution<InternalRow, GenericRow> execution =
new ParallelExecution<>(
InternalSerializers.create(TypeUtils.project(rowType, keyProjection)),
options.pageSize(),
options.crossPartitionUpsertBootstrapParallelism(),
suppliers);
JoinedRow joinedRow = new JoinedRow();
while (true) {
try {
ParallelBatch<InternalRow, GenericRow> batch = execution.take();
if (batch == null) {
break;
}

while (true) {
InternalRow row = batch.next();
if (row == null) {
batch.releaseBatch();
break;
}

collector.accept(joinedRow.replace(row, batch.extraMesage()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

return parallelExecute(
TypeUtils.project(rowType, keyProjection),
readBuilder,
splits,
options.pageSize(),
options.crossPartitionUpsertBootstrapParallelism(),
split -> {
DataSplit dataSplit = ((DataSplit) split);
int bucket = dataSplit.bucket();
return partBucketConverter.toGenericRow(
new JoinedRow(dataSplit.partition(), GenericRow.of(bucket)));
},
(row, extra) -> new JoinedRow().replace(row, extra));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallelExecution;
import org.apache.paimon.utils.ParallelExecution.ParallelBatch;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* An util class to wrap {@link ParallelExecution} to parallel execution for {@link Split} reader.
*/
public class SplitsParallelReadUtil {

public static RecordReader<InternalRow> parallelExecute(
RowType projectedType,
ReadBuilder readBuilder,
List<Split> splits,
int pageSize,
int parallelism) {
return parallelExecute(
projectedType,
readBuilder,
splits,
pageSize,
parallelism,
split -> null,
(row, unused) -> row);
}

public static <EXTRA> RecordReader<InternalRow> parallelExecute(
RowType projectedType,
ReadBuilder readBuilder,
List<Split> splits,
int pageSize,
int parallelism,
Function<Split, EXTRA> extraFunction,
BiFunction<InternalRow, EXTRA, InternalRow> addExtraToRow) {
List<Supplier<Pair<RecordReader<InternalRow>, EXTRA>>> suppliers = new ArrayList<>();
for (Split split : splits) {
suppliers.add(
() -> {
try {
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(split);
return Pair.of(reader, extraFunction.apply(split));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}

ParallelExecution<InternalRow, EXTRA> execution =
new ParallelExecution<>(
InternalSerializers.create(projectedType),
pageSize,
parallelism,
suppliers);

return new RecordReader<InternalRow>() {
@Nullable
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
ParallelBatch<InternalRow, EXTRA> batch;
try {
batch = execution.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}

if (batch == null) {
return null;
}

return new RecordIterator<InternalRow>() {
@Nullable
@Override
public InternalRow next() throws IOException {
InternalRow row = batch.next();
if (row == null) {
return null;
}

return addExtraToRow.apply(row, batch.extraMessage());
}

@Override
public void releaseBatch() {
batch.releaseBatch();
}
};
}

@Override
public void close() throws IOException {
execution.close();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private List<Pair<Integer, Integer>> collect(ParallelExecution<Integer, Integer>
break;
}

result.add(Pair.of(record, batch.extraMesage()));
result.add(Pair.of(record, batch.extraMessage()));
}
} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public class FlinkConnectorOptions {
.defaultValue(false)
.withDescription("Whether to enable async lookup join.");

public static final ConfigOption<Integer> LOOKUP_BOOTSTRAP_PARALLELISM =
ConfigOptions.key("lookup.bootstrap-parallelism")
.intType()
.defaultValue(4)
.withDescription(
"The parallelism for bootstrap in a single task for lookup join.");

public static final ConfigOption<Integer> LOOKUP_ASYNC_THREAD_NUMBER =
ConfigOptions.key("lookup.async-thread-number")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void bulkLoad(Options options) throws Exception {
RocksDBState.createBulkLoadSorter(
IOManager.create(path.toString()), new CoreOptions(options));
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(streamingReader.nextBatch())) {
new RecordReaderIterator<>(streamingReader.nextBatch(true))) {
while (batch.hasNext()) {
InternalRow row = batch.next();
if (lookupTable.recordFilter().test(row)) {
Expand Down Expand Up @@ -252,7 +252,7 @@ private void checkRefresh() throws Exception {
private void refresh() throws Exception {
while (true) {
try (RecordReaderIterator<InternalRow> batch =
new RecordReaderIterator<>(streamingReader.nextBatch())) {
new RecordReaderIterator<>(streamingReader.nextBatch(false))) {
if (!batch.hasNext()) {
return;
}
Expand Down
Loading

0 comments on commit d5afd09

Please sign in to comment.