Skip to content

Commit

Permalink
[core] Introduce cross-partition-upsert.bootstrap-parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 19, 2023
1 parent 1c790d3 commit f9de43f
Show file tree
Hide file tree
Showing 13 changed files with 543 additions and 13 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
<td>Duration</td>
<td>The discovery interval of continuous reading.</td>
</tr>
<tr>
<td><h5>cross-partition-upsert.bootstrap-parallelism</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The parallelism for bootstrap in a single task for cross partition upsert.</td>
</tr>
<tr>
<td><h5>cross-partition-upsert.index-ttl</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,13 @@ public class CoreOptions implements Serializable {
+ "this can avoid maintaining too many indexes and lead to worse and worse performance, "
+ "but please note that this may also cause data duplication.");

public static final ConfigOption<Integer> CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM =
key("cross-partition-upsert.bootstrap-parallelism")
.intType()
.defaultValue(10)
.withDescription(
"The parallelism for bootstrap in a single task for cross partition upsert.");

public static final ConfigOption<Integer> ZORDER_VAR_LENGTH_CONTRIBUTION =
key("zorder.var-length-contribution")
.intType()
Expand Down Expand Up @@ -1327,6 +1334,10 @@ public Duration crossPartitionUpsertIndexTtl() {
return options.get(CROSS_PARTITION_UPSERT_INDEX_TTL);
}

public int crossPartitionUpsertBootstrapParallelism() {
return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM);
}

public int varTypeSize() {
return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -40,6 +41,10 @@ public class SimpleCollectingOutputView extends AbstractPagedOutputView {

private int segmentNum;

public SimpleCollectingOutputView(MemorySegmentSource memSource, int segmentSize) {
this(new ArrayList<>(), memSource, segmentSize);
}

public SimpleCollectingOutputView(
List<MemorySegment> fullSegmentTarget, MemorySegmentSource memSource, int segmentSize) {
super(memSource.nextSegment(), segmentSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.memory;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

/** A {@link MemorySegmentPool} for allocated segments. */
public class ArraySegmentPool implements MemorySegmentPool {

private final Queue<MemorySegment> segments;
private final int pageSize;

public ArraySegmentPool(List<MemorySegment> segments) {
this.segments = new LinkedList<>(segments);
this.pageSize = segments.get(0).size();
}

@Override
public int pageSize() {
return pageSize;
}

@Override
public void returnAll(List<MemorySegment> memory) {
segments.addAll(memory);
}

@Override
public int freePages() {
return segments.size();
}

@Override
public MemorySegment nextSegment() {
return segments.poll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.annotation.Public;

import javax.annotation.Nullable;

/**
* Interface describing entities that can provide memory segments.
*
Expand All @@ -33,5 +35,6 @@ public interface MemorySegmentSource {
*
* @return The next memory segment, or null, if none is available.
*/
@Nullable
MemorySegment nextSegment();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.data.RandomAccessInputView;
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.memory.ArraySegmentPool;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.reader.RecordReader;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* A class to help parallel execution.
*
* @param <T> Record Type.
* @param <E> Extra message of one {@link RecordReader}.
*/
public class ParallelExecution<T, E> implements Closeable {

private final Serializer<T> serializer;
private final BlockingQueue<MemorySegment> idlePages;
private final BlockingQueue<ParallelBatch<T, E>> results;
private final ExecutorService executorService;

private final AtomicReference<Throwable> exception;

private final CountDownLatch latch;

public ParallelExecution(
Serializer<T> serializer,
int pageSize,
int parallelism,
List<Supplier<Pair<RecordReader<T>, E>>> readers) {
this.serializer = serializer;
int totalPages = parallelism * 2;
this.idlePages = new ArrayBlockingQueue<>(totalPages);
for (int i = 0; i < totalPages; i++) {
idlePages.add(MemorySegment.allocateHeapMemory(pageSize));
}
this.executorService =
new ThreadPoolExecutor(
parallelism,
parallelism,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ExecutorThreadFactory(Thread.currentThread().getName() + "-parallel"));
this.results = new LinkedBlockingQueue<>();
this.exception = new AtomicReference<>();
this.latch = new CountDownLatch(readers.size());

for (Supplier<Pair<RecordReader<T>, E>> readerSupplier : readers) {
Serializer<T> duplicate = this.serializer.duplicate();
executorService.submit(() -> asyncRead(readerSupplier, duplicate));
}
}

@Nullable
public ParallelBatch<T, E> take() throws InterruptedException, IOException {
ParallelBatch<T, E> element;
do {
if (latch.getCount() == 0 && results.isEmpty()) {
return null;
}

element = results.poll(2, TimeUnit.SECONDS);

if (exception.get() != null) {
throw new IOException(exception.get());
}
} while (element == null);
return element;
}

private void asyncRead(
Supplier<Pair<RecordReader<T>, E>> readerSupplier, Serializer<T> serializer) {
Pair<RecordReader<T>, E> pair = readerSupplier.get();
try (CloseableIterator<T> iterator = pair.getLeft().toCloseableIterator()) {
int count = 0;
SimpleCollectingOutputView outputView = null;

while (iterator.hasNext()) {
T next = iterator.next();

while (true) {
if (outputView == null) {
outputView = newOutputView();
count = 0;
}

try {
serializer.serialize(next, outputView);
count++;
break;
} catch (EOFException e) {
sendToResults(outputView, count, pair.getRight());
outputView = null;
}
}
}

if (outputView != null) {
sendToResults(outputView, count, pair.getRight());
}

latch.countDown();
} catch (Throwable e) {
this.exception.set(e);
}
}

private SimpleCollectingOutputView newOutputView() throws InterruptedException {
MemorySegment page = idlePages.take();
return new SimpleCollectingOutputView(
new ArraySegmentPool(Collections.singletonList(page)), page.size());
}

private void sendToResults(SimpleCollectingOutputView outputView, int count, E extraMessage) {
results.add(iterator(outputView.getCurrentSegment(), count, extraMessage));
}

@Override
public void close() throws IOException {
this.executorService.shutdownNow();
}

private ParallelBatch<T, E> iterator(MemorySegment page, int numRecords, E extraMessage) {
RandomAccessInputView inputView =
new RandomAccessInputView(
new ArrayList<>(Collections.singletonList(page)), page.size());
return new ParallelBatch<T, E>() {

int numReturn = 0;

@Nullable
@Override
public T next() throws IOException {
if (numReturn >= numRecords) {
return null;
}

numReturn++;
return serializer.deserialize(inputView);
}

@Override
public void releaseBatch() {
idlePages.add(page);
}

@Override
public E extraMesage() {
return extraMessage;
}
};
}

/** A batch provides next and extra message. */
public interface ParallelBatch<T, E> {

@Nullable
T next() throws IOException;

void releaseBatch();

E extraMesage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
Expand Down Expand Up @@ -56,6 +57,10 @@ public int getArity() {
return fieldGetters.length;
}

public GenericRow toGenericRow(InternalRow rowData) {
return GenericRow.of(convert(rowData));
}

public Object[] convert(InternalRow rowData) {
Object[] result = new Object[fieldGetters.length];
for (int i = 0; i < fieldGetters.length; i++) {
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
Expand All @@ -45,6 +46,16 @@
/** Type related helper functions. */
public class TypeUtils {

public static RowType concat(RowType left, RowType right) {
RowType.Builder builder = RowType.builder();
List<DataField> fields = new ArrayList<>(left.getFields());
fields.addAll(right.getFields());
fields.forEach(
dataField ->
builder.field(dataField.name(), dataField.type(), dataField.description()));
return builder.build();
}

public static RowType project(RowType inputType, int[] mapping) {
List<DataField> fields = inputType.getFields();
return new RowType(
Expand Down
Loading

0 comments on commit f9de43f

Please sign in to comment.