Skip to content

Commit

Permalink
[flink] Random choose task while sorting to avoid data skew (apache#2705
Browse files Browse the repository at this point in the history
)
  • Loading branch information
leaves12138 authored Jan 15, 2024
1 parent c8d8b8b commit ef05590
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.shuffle;

import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableSupplier;

import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -276,7 +277,7 @@ private static class AssignRangeIndexOperator<T>

private final SerializableSupplier<Comparator<T>> keyComparatorSupplier;

private transient List<T> boundaries;
private transient List<Pair<T, RandomList>> keyIndex;
private transient Collector<Tuple2<Integer, Tuple2<T, RowData>>> collector;
private transient Comparator<T> keyComparator;

Expand All @@ -293,42 +294,62 @@ public void open() throws Exception {

@Override
public void processElement1(StreamRecord<List<T>> streamRecord) {
this.boundaries = streamRecord.getValue();
keyIndex = new ArrayList<>();

T last = null;
int index = 0;
for (T t : streamRecord.getValue()) {
if (last != null && keyComparator.compare(last, t) == 0) {
keyIndex.get(keyIndex.size() - 1).getRight().add(index++);
} else {
Pair<T, RandomList> pair = Pair.of(t, new RandomList());
pair.getRight().add(index++);
keyIndex.add(pair);

last = t;
}
}
}

@Override
public void processElement2(StreamRecord<Tuple2<T, RowData>> streamRecord) {
if (boundaries == null) {
throw new RuntimeException("There should be one data from the first input.");
if (keyIndex == null || keyIndex.isEmpty()) {
throw new RuntimeException(
"There should be one data from the first input. And boundaries should not be empty.");
}
Tuple2<T, RowData> row = streamRecord.getValue();
collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}

@Override
public InputSelection nextSelection() {
return boundaries == null ? InputSelection.FIRST : InputSelection.ALL;
return keyIndex == null ? InputSelection.FIRST : InputSelection.ALL;
}

private int binarySearch(T key) {
int lastIndex = this.keyIndex.size() - 1;
int low = 0;
int high = this.boundaries.size() - 1;
int high = lastIndex;

while (low <= high) {
final int mid = (low + high) >>> 1;
final int result = keyComparator.compare(key, this.boundaries.get(mid));
final Pair<T, RandomList> indexPair = keyIndex.get(mid);
final int result = keyComparator.compare(key, indexPair.getLeft());

if (result > 0) {
low = mid + 1;
} else if (result < 0) {
high = mid - 1;
} else {
return mid;
return indexPair.getRight().get();
}
}

// key not found, but the low index is the target
// bucket, since the boundaries are the upper bound
return low;
return low > lastIndex
? keyIndex.get(lastIndex).getRight().get()
: keyIndex.get(low).getRight().get();
}

/** A {@link KeySelector} to select by f0 of tuple2. */
Expand Down Expand Up @@ -449,4 +470,20 @@ Iterator<Tuple2<Double, T>> sample() {
return queue.iterator();
}
}

/** Contains integers and randomly get one. */
private static class RandomList {

private static final Random RANDOM = new Random();

private final List<Integer> list = new ArrayList<>();

public void add(int i) {
list.add(i);
}

public int get() {
return list.get(RANDOM.nextInt(list.size()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ private void prepareData(int size, int loop) throws Exception {
commit(commitMessages);
}

private void prepareSameData(int size) throws Exception {
createTable();
BatchWriteBuilder builder = getTable().newBatchWriteBuilder();
try (BatchTableWrite batchTableWrite = builder.newWrite()) {
for (int i = 0; i < size; i++) {
batchTableWrite.write(data(0, 0, 0));
}
commit(batchTableWrite.prepareCommit());
}
}

@Test
public void testOrderBy() throws Exception {
prepareData(300, 1);
Expand Down Expand Up @@ -230,6 +241,23 @@ public void testTableConf() throws Exception {
.isEqualTo("20");
}

@Test
public void testRandomSuffixWorks() throws Exception {
prepareSameData(200);
Assertions.assertThatCode(() -> order(Collections.singletonList("f1")))
.doesNotThrowAnyException();
List<ManifestEntry> files =
((AppendOnlyFileStoreTable) getTable()).store().newScan().plan().files();
Assertions.assertThat(files.size()).isEqualTo(3);

dropTable();
prepareSameData(200);
Assertions.assertThatCode(() -> zorder(Arrays.asList("f1", "f2")))
.doesNotThrowAnyException();
files = ((AppendOnlyFileStoreTable) getTable()).store().newScan().plan().files();
Assertions.assertThat(files.size()).isEqualTo(3);
}

private void zorder(List<String> columns) throws Exception {
if (RANDOM.nextBoolean()) {
createAction("zorder", columns).run();
Expand Down Expand Up @@ -276,6 +304,10 @@ private void createTable() throws Exception {
catalog.createTable(identifier(), schema(), true);
}

private void dropTable() throws Exception {
catalog.dropTable(identifier(), true);
}

private Identifier identifier() {
return Identifier.create(database, tableName);
}
Expand Down

0 comments on commit ef05590

Please sign in to comment.