From cef2fc18d3ceab949ee05ee56422a6bc0781190c Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Tue, 10 Oct 2023 11:03:36 +0800 Subject: [PATCH] [flink] Introduce zorder/order sort compact for dynamic bucket table (#2100) --- .../apache/paimon/index/BucketAssigner.java | 29 +++ .../paimon/index/HashBucketAssigner.java | 4 +- .../paimon/index/HashIndexMaintainer.java | 15 +- .../apache/paimon/index/IndexMaintainer.java | 5 +- .../index/SimpleHashBucketAssigner.java | 93 ++++++++ .../operation/AbstractFileStoreWrite.java | 3 +- .../index/SimpleHashBucketAssignerTest.java | 69 ++++++ .../paimon/table/DynamicBucketTableTest.java | 109 +++++++++ .../apache/paimon/table/TableTestBase.java | 1 - .../flink/action/SortCompactAction.java | 16 +- .../flink/sink/DynamicBucketCompactSink.java | 61 +++++ .../paimon/flink/sink/DynamicBucketSink.java | 8 +- .../paimon/flink/sink/FlinkSinkBuilder.java | 17 +- .../sink/HashBucketAssignerOperator.java | 28 ++- .../flink/DynamicBucketTableITCase.java | 40 ++++ ...rtCompactActionForDynamicBucketITCase.java | 215 ++++++++++++++++++ ...tCompactActionForUnawareBucketITCase.java} | 18 +- 17 files changed, 689 insertions(+), 42 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/{SortCompactActionITCase.java => SortCompactActionForUnawareBucketITCase.java} (96%) diff --git a/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java new file mode 100644 index 000000000000..f87d6a16f918 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/index/BucketAssigner.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index; + +import org.apache.paimon.data.BinaryRow; + +/** Assigner a bucket for a record, just used in dynamic bucket table. */ +public interface BucketAssigner { + + int assign(BinaryRow partition, int hash); + + void prepareCommit(long commitIdentifier); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java index 329d3b9712ed..519a495b0a2e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java @@ -34,7 +34,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; /** Assign bucket for key hashcode. */ -public class HashBucketAssigner { +public class HashBucketAssigner implements BucketAssigner { private static final Logger LOG = LoggerFactory.getLogger(HashBucketAssigner.class); @@ -64,6 +64,7 @@ public HashBucketAssigner( } /** Assign a bucket for key hash of a record. */ + @Override public int assign(BinaryRow partition, int hash) { int recordAssignId = computeAssignId(hash); checkArgument( @@ -88,6 +89,7 @@ public int assign(BinaryRow partition, int hash) { } /** Prepare commit to clear outdated partition index. */ + @Override public void prepareCommit(long commitIdentifier) { long latestCommittedIdentifier; if (partitionIndex.values().stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java index 998c72399d93..66a8d6409a9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java @@ -19,11 +19,14 @@ package org.apache.paimon.index; import org.apache.paimon.KeyValue; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.IntHashSet; import org.apache.paimon.utils.IntIterator; +import javax.annotation.Nullable; + import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; @@ -40,7 +43,10 @@ public class HashIndexMaintainer implements IndexMaintainer { private boolean modified; private HashIndexMaintainer( - IndexFileHandler fileHandler, Long snapshotId, BinaryRow partition, int bucket) { + IndexFileHandler fileHandler, + @Nullable Long snapshotId, + BinaryRow partition, + int bucket) { this.fileHandler = fileHandler; IntHashSet hashcode = new IntHashSet(); if (snapshotId != null) { @@ -93,6 +99,11 @@ public List prepareCommit() { return Collections.emptyList(); } + @VisibleForTesting + public boolean isEmpty() { + return hashcode.size() == 0; + } + /** Factory to restore {@link HashIndexMaintainer}. */ public static class Factory implements IndexMaintainer.Factory { @@ -104,7 +115,7 @@ public Factory(IndexFileHandler handler) { @Override public IndexMaintainer createOrRestore( - Long snapshotId, BinaryRow partition, int bucket) { + @Nullable Long snapshotId, BinaryRow partition, int bucket) { return new HashIndexMaintainer(handler, snapshotId, partition, bucket); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java index 8d61d46f88bf..ba881d6140b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java @@ -20,6 +20,8 @@ import org.apache.paimon.data.BinaryRow; +import javax.annotation.Nullable; + import java.util.List; /** Maintainer to maintain index. */ @@ -31,6 +33,7 @@ public interface IndexMaintainer { /** Factory to restore {@link IndexMaintainer}. */ interface Factory { - IndexMaintainer createOrRestore(Long snapshotId, BinaryRow partition, int bucket); + IndexMaintainer createOrRestore( + @Nullable Long snapshotId, BinaryRow partition, int bucket); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java new file mode 100644 index 000000000000..7094684fcd35 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.utils.Int2ShortHashMap; + +import java.util.HashMap; +import java.util.Map; + +/** When we need to overwrite the table, we should use this to avoid loading index. */ +public class SimpleHashBucketAssigner implements BucketAssigner { + + private final int numAssigners; + private final int assignId; + private final long targetBucketRowNumber; + + private final Map partitionIndex; + + public SimpleHashBucketAssigner(int numAssigners, int assignId, long targetBucketRowNumber) { + this.numAssigners = numAssigners; + this.assignId = assignId; + this.targetBucketRowNumber = targetBucketRowNumber; + this.partitionIndex = new HashMap<>(); + } + + @Override + public int assign(BinaryRow partition, int hash) { + SimplePartitionIndex index = + this.partitionIndex.computeIfAbsent(partition, p -> new SimplePartitionIndex()); + return index.assign(hash); + } + + @Override + public void prepareCommit(long commitIdentifier) { + // do nothing + } + + /** Simple partition bucket hash assigner. */ + private class SimplePartitionIndex { + + public final Int2ShortHashMap hash2Bucket = new Int2ShortHashMap(); + private final Map bucketInformation; + private int currentBucket; + + private SimplePartitionIndex() { + bucketInformation = new HashMap<>(); + loadNewBucket(); + } + + public int assign(int hash) { + // the same hash should go into the same bucket + if (hash2Bucket.containsKey(hash)) { + return hash2Bucket.get(hash); + } + + Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L); + if (num >= targetBucketRowNumber) { + loadNewBucket(); + } + bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L : l + 1); + hash2Bucket.put(hash, (short) currentBucket); + return currentBucket; + } + + private void loadNewBucket() { + for (int i = 0; i < Short.MAX_VALUE; i++) { + if (i % numAssigners == assignId && !bucketInformation.containsKey(i)) { + currentBucket = i; + return; + } + } + throw new RuntimeException( + "Can't find a suitable bucket to assign, all the bucket are assigned?"); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index ad2fbb720d97..125756a01105 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -330,7 +330,8 @@ public WriterContainer createWriterContainer( IndexMaintainer indexMaintainer = indexFactory == null ? null - : indexFactory.createOrRestore(latestSnapshotId, partition, bucket); + : indexFactory.createOrRestore( + ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); RecordWriter writer = createWriter(partition.copy(), bucket, restoreFiles, null, compactExecutor()); notifyNewWriter(writer); diff --git a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java new file mode 100644 index 000000000000..d795b89263f0 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index; + +import org.apache.paimon.data.BinaryRow; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Tests for {@link SimpleHashBucketAssigner}. */ +public class SimpleHashBucketAssignerTest { + + @Test + public void testAssign() { + SimpleHashBucketAssigner simpleHashBucketAssigner = new SimpleHashBucketAssigner(2, 0, 100); + + BinaryRow binaryRow = BinaryRow.EMPTY_ROW; + int hash = 0; + + for (int i = 0; i < 100; i++) { + int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(0); + } + + for (int i = 0; i < 100; i++) { + int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(2); + } + + int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(4); + } + + @Test + public void testAssignWithSameHash() { + SimpleHashBucketAssigner simpleHashBucketAssigner = new SimpleHashBucketAssigner(2, 0, 100); + + BinaryRow binaryRow = BinaryRow.EMPTY_ROW; + int hash = 0; + + for (int i = 0; i < 100; i++) { + int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(0); + } + + // reset hash, the record will go into bucket 0 + hash = 0; + for (int i = 0; i < 100; i++) { + int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(0); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java new file mode 100644 index 000000000000..d8f5e11a8661 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.index.HashIndexMaintainer; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.BatchWriteBuilderImpl; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.sink.DynamicBucketRow; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +/** Tests for Dynamic Bucket Table. */ +public class DynamicBucketTableTest extends TableTestBase { + + @Test + public void testOverwriteDynamicBucketTable() throws Exception { + createTableDefault(); + + commitDefault(writeDataDefault(100, 100)); + + Table table = getTableDefault(); + BatchWriteBuilderImpl builder = (BatchWriteBuilderImpl) table.newBatchWriteBuilder(); + TableWriteImpl batchTableWrite = (TableWriteImpl) builder.withOverwrite().newWrite(); + HashIndexMaintainer indexMaintainer = + (HashIndexMaintainer) + batchTableWrite + .getWrite() + .createWriterContainer(BinaryRow.EMPTY_ROW, 0, true) + .indexMaintainer; + + Assertions.assertThat(indexMaintainer.isEmpty()).isTrue(); + batchTableWrite.write(data(0)); + Assertions.assertThat( + ((CommitMessageImpl) batchTableWrite.prepareCommit().get(0)) + .indexIncrement() + .newIndexFiles() + .get(0) + .rowCount()) + .isEqualTo(1); + } + + protected List writeDataDefault(int size, int times) throws Exception { + List messages; + Table table = getTableDefault(); + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite batchTableWrite = builder.newWrite()) { + for (int i = 0; i < times; i++) { + for (int j = 0; j < size; j++) { + batchTableWrite.write(data(i)); + } + } + messages = batchTableWrite.prepareCommit(); + } + + return messages; + } + + protected Schema schemaDefault() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.BIGINT()); + schemaBuilder.column("f1", DataTypes.BIGINT()); + schemaBuilder.column("f2", DataTypes.BIGINT()); + schemaBuilder.column("f3", DataTypes.BIGINT()); + schemaBuilder.option("bucket", "-1"); + schemaBuilder.option("scan.parallelism", "6"); + schemaBuilder.option("sink.parallelism", "3"); + schemaBuilder.option("dynamic-bucket.target-row-num", "100"); + schemaBuilder.primaryKey("f0"); + return schemaBuilder.build(); + } + + private static InternalRow data(int bucket) { + GenericRow row = + GenericRow.of( + RANDOM.nextLong(), + (long) RANDOM.nextInt(10000), + (long) RANDOM.nextInt(10000), + (long) RANDOM.nextInt(10000)); + return new DynamicBucketRow(row, bucket); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 322d0c8ad41e..8bd03ca7f76d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -180,7 +180,6 @@ private List writeOnce(Table table, int time, int size) throws Ex } } - // schema with all the basic types. protected Schema schemaDefault() { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index e8d54d8b27e0..b665714d09cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -25,7 +25,6 @@ import org.apache.paimon.flink.source.FlinkSourceBuilder; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.table.AppendOnlyFileStoreTable; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; @@ -45,8 +44,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Compact with sort action. */ public class SortCompactAction extends CompactAction { @@ -63,9 +60,6 @@ public SortCompactAction( Map tableConf) { super(warehouse, database, tableName, catalogConfig, tableConf); - checkArgument( - table instanceof AppendOnlyFileStoreTable, - "Only sort compaction works with append-only table for now."); table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); } @@ -85,10 +79,9 @@ public void build(StreamExecutionEnvironment env) { env.setRuntimeMode(RuntimeExecutionMode.BATCH); } FileStoreTable fileStoreTable = (FileStoreTable) table; - if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) { - throw new IllegalArgumentException("Sort Compact only supports append-only table yet"); - } - if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) { + + if (fileStoreTable.bucketMode() != BucketMode.UNAWARE + && fileStoreTable.bucketMode() != BucketMode.DYNAMIC) { throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet."); } Map tableConfig = fileStoreTable.options(); @@ -120,8 +113,7 @@ public void build(StreamExecutionEnvironment env) { new FlinkSinkBuilder(fileStoreTable) .withInput(sorter.sort()) - // This should use empty map to tag it on overwrite action, otherwise there is no - // overwrite action. + .forCompact(true) .withOverwritePartition(new HashMap<>()) .build(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java new file mode 100644 index 000000000000..11e21eed638f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.UUID; + +/** This class is only used for generate compact sink topology for dynamic bucket table. */ +public class DynamicBucketCompactSink extends RowDynamicBucketSink { + + public DynamicBucketCompactSink( + FileStoreTable table, @Nullable Map overwritePartition) { + super(table, overwritePartition); + } + + @Override + public DataStreamSink build(DataStream input, @Nullable Integer parallelism) { + String initialCommitUser = UUID.randomUUID().toString(); + + // This input is sorted and compacted. So there is no shuffle here, we just assign bucket + // for each record, and sink them to table. + + // bucket-assigner + HashBucketAssignerOperator assignerOperator = + new HashBucketAssignerOperator<>( + initialCommitUser, table, extractorFunction(), true); + TupleTypeInfo> rowWithBucketType = + new TupleTypeInfo<>(input.getType(), BasicTypeInfo.INT_TYPE_INFO); + DataStream> bucketAssigned = + input.transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator) + .setParallelism(input.getParallelism()); + return sinkFrom(bucketAssigned, initialCommitUser); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java index 31a455f749cd..5efa87356a3f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java @@ -57,7 +57,8 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis String initialCommitUser = UUID.randomUUID().toString(); // Topology: - // input -- shuffle by key hash --> bucket-assigner -- shuffle by bucket --> writer --> + // input -- shuffle by key hash --> bucket-assigner -- shuffle by partition & bucket --> + // writer --> // committer // 1. shuffle by key hash @@ -70,7 +71,8 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis // 2. bucket-assigner HashBucketAssignerOperator assignerOperator = - new HashBucketAssignerOperator<>(initialCommitUser, table, extractorFunction()); + new HashBucketAssignerOperator<>( + initialCommitUser, table, extractorFunction(), false); TupleTypeInfo> rowWithBucketType = new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO); DataStream> bucketAssigned = @@ -78,7 +80,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer parallelis .transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator) .setParallelism(partitionByKeyHash.getParallelism()); - // 3. shuffle by bucket + // 3. shuffle by partition & bucket DataStream> partitionByBucket = partition(bucketAssigned, channelComputer2(), parallelism); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 2e3a783f050b..56ead8a8584e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -44,6 +44,7 @@ public class FlinkSinkBuilder { @Nullable private Map overwritePartition; @Nullable private LogSinkFunction logSinkFunction; @Nullable private Integer parallelism; + private boolean compactSink = false; public FlinkSinkBuilder(FileStoreTable table) { this.table = table; @@ -78,6 +79,11 @@ public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) { return this; } + public FlinkSinkBuilder forCompact(boolean compactSink) { + this.compactSink = compactSink; + return this; + } + public DataStreamSink build() { DataStream input = MapToInternalRow.map(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { @@ -108,9 +114,14 @@ public DataStreamSink build() { private DataStreamSink buildDynamicBucketSink( DataStream input, boolean globalIndex) { checkArgument(logSinkFunction == null, "Dynamic bucket mode can not work with log system."); - return globalIndex - ? new GlobalDynamicBucketSink(table, overwritePartition).build(input, parallelism) - : new RowDynamicBucketSink(table, overwritePartition).build(input, parallelism); + return compactSink && !globalIndex + // todo support global index sort compact + ? new DynamicBucketCompactSink(table, overwritePartition).build(input, parallelism) + : globalIndex + ? new GlobalDynamicBucketSink(table, overwritePartition) + .build(input, parallelism) + : new RowDynamicBucketSink(table, overwritePartition) + .build(input, parallelism); } private DataStreamSink buildForFixedBucket(DataStream input) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java index 2d256d9bf75f..eb79344183a8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java @@ -18,7 +18,9 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.index.BucketAssigner; import org.apache.paimon.index.HashBucketAssigner; +import org.apache.paimon.index.SimpleHashBucketAssigner; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; @@ -41,17 +43,20 @@ public class HashBucketAssignerOperator extends AbstractStreamOperator> extractorFunction; + private final boolean overwrite; - private transient HashBucketAssigner assigner; + private transient BucketAssigner assigner; private transient PartitionKeyExtractor extractor; public HashBucketAssignerOperator( String commitUser, Table table, - SerializableFunction> extractorFunction) { + SerializableFunction> extractorFunction, + boolean overwrite) { this.initialCommitUser = commitUser; this.table = (AbstractFileStoreTable) table; this.extractorFunction = extractorFunction; + this.overwrite = overwrite; } @Override @@ -66,13 +71,18 @@ public void initializeState(StateInitializationContext context) throws Exception context, "commit_user_state", String.class, initialCommitUser); this.assigner = - new HashBucketAssigner( - table.snapshotManager(), - commitUser, - table.store().newIndexFileHandler(), - getRuntimeContext().getNumberOfParallelSubtasks(), - getRuntimeContext().getIndexOfThisSubtask(), - table.coreOptions().dynamicBucketTargetRowNum()); + overwrite + ? new SimpleHashBucketAssigner( + getRuntimeContext().getNumberOfParallelSubtasks(), + getRuntimeContext().getIndexOfThisSubtask(), + table.coreOptions().dynamicBucketTargetRowNum()) + : new HashBucketAssigner( + table.snapshotManager(), + commitUser, + table.store().newIndexFileHandler(), + getRuntimeContext().getNumberOfParallelSubtasks(), + getRuntimeContext().getIndexOfThisSubtask(), + table.coreOptions().dynamicBucketTargetRowNum()); this.extractor = extractorFunction.apply(table.schema()); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java index 8e9ce02e6348..85e71b73132d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java @@ -18,12 +18,24 @@ package org.apache.paimon.flink; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.table.AbstractFileStoreTable; + import org.apache.flink.types.Row; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; import static org.assertj.core.api.Assertions.assertThat; /** ITCase for batch file store. */ @@ -74,4 +86,32 @@ public void testWriteWithAssignerParallelism() { assertThat(sql("SELECT DISTINCT bucket FROM T$files")) .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2)); } + + @Test + public void testOverwrite() throws Exception { + sql("INSERT INTO T VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)"); + assertThat(sql("SELECT * FROM T")) + .containsExactlyInAnyOrder( + Row.of(1, 1, 1), + Row.of(1, 2, 2), + Row.of(1, 3, 3), + Row.of(1, 4, 4), + Row.of(1, 5, 5)); + + // overwrite the whole table, we should update the index file by this sql + sql("INSERT OVERWRITE T SELECT * FROM T LIMIT 4"); + + AbstractFileStoreTable table = + (AbstractFileStoreTable) + (CatalogFactory.createCatalog(CatalogContext.create(new Path(path)))) + .getTable(Identifier.create("default", "T")); + IndexFileHandler indexFileHandler = table.store().newIndexFileHandler(); + List partitions = table.newScan().listPartitions(); + List entries = new ArrayList<>(); + partitions.forEach(p -> entries.addAll(indexFileHandler.scan(HASH_INDEX, p))); + + Long records = + entries.stream().map(entry -> entry.indexFile().rowCount()).reduce(Long::sum).get(); + Assertions.assertThat(records).isEqualTo(4); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java new file mode 100644 index 000000000000..4d219e0a984c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.ChangelogWithKeyFileStoreTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.DynamicBucketRow; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +/** Sort Compact Action tests for dynamic bucket table. */ +public class SortCompactActionForDynamicBucketITCase extends ActionITCaseBase { + + private static final Random RANDOM = new Random(); + + @Test + public void testDynamicBucketSort() throws Exception { + createTable(); + + commit(writeData(100)); + PredicateBuilder predicateBuilder = new PredicateBuilder(getTable().rowType()); + Predicate predicate = predicateBuilder.between(1, 100L, 200L); + + List files = ((FileStoreTable) getTable()).store().newScan().plan().files(); + List filesFilter = + ((ChangelogWithKeyFileStoreTable) getTable()) + .store() + .newScan() + .withValueFilter(predicate) + .plan() + .files(); + + zorder(Arrays.asList("f2", "f1")); + + List filesZorder = + ((FileStoreTable) getTable()).store().newScan().plan().files(); + List filesFilterZorder = + ((ChangelogWithKeyFileStoreTable) getTable()) + .store() + .newScan() + .withValueFilter(predicate) + .plan() + .files(); + Assertions.assertThat(filesFilterZorder.size() / (double) filesZorder.size()) + .isLessThan(filesFilter.size() / (double) files.size()); + } + + @Test + public void testDynamicBucketSortWithOrderAndZorder() throws Exception { + createTable(); + + commit(writeData(100)); + PredicateBuilder predicateBuilder = new PredicateBuilder(getTable().rowType()); + Predicate predicate = predicateBuilder.between(1, 100L, 200L); + + // order f2,f1 will make predicate of f1 perform worse. + order(Arrays.asList("f2", "f1")); + List files = ((FileStoreTable) getTable()).store().newScan().plan().files(); + List filesFilter = + ((ChangelogWithKeyFileStoreTable) getTable()) + .store() + .newScan() + .withValueFilter(predicate) + .plan() + .files(); + + zorder(Arrays.asList("f2", "f1")); + + List filesZorder = + ((FileStoreTable) getTable()).store().newScan().plan().files(); + List filesFilterZorder = + ((ChangelogWithKeyFileStoreTable) getTable()) + .store() + .newScan() + .withValueFilter(predicate) + .plan() + .files(); + + Assertions.assertThat(filesFilterZorder.size() / (double) filesZorder.size()) + .isLessThan(filesFilter.size() / (double) files.size()); + } + + private void zorder(List columns) throws Exception { + if (RANDOM.nextBoolean()) { + new SortCompactAction( + warehouse, + database, + tableName, + Collections.emptyMap(), + Collections.emptyMap()) + .withOrderStrategy("zorder") + .withOrderColumns(columns) + .run(); + } else { + callProcedure("zorder", columns); + } + } + + private void order(List columns) throws Exception { + if (RANDOM.nextBoolean()) { + new SortCompactAction( + warehouse, + database, + tableName, + Collections.emptyMap(), + Collections.emptyMap()) + .withOrderStrategy("order") + .withOrderColumns(columns) + .run(); + } else { + callProcedure("order", columns); + } + } + + private void callProcedure(String orderStrategy, List orderByColumns) { + callProcedure( + String.format( + "CALL compact('%s.%s', 'ALL', '%s', '%s')", + database, tableName, orderStrategy, String.join(",", orderByColumns)), + false, + true); + } + + // schema with all the basic types. + private static Schema schema() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.BIGINT()); + schemaBuilder.column("f1", DataTypes.BIGINT()); + schemaBuilder.column("f2", DataTypes.BIGINT()); + schemaBuilder.column("f3", DataTypes.BIGINT()); + schemaBuilder.option("bucket", "-1"); + schemaBuilder.option("scan.parallelism", "6"); + schemaBuilder.option("sink.parallelism", "3"); + schemaBuilder.option("dynamic-bucket.target-row-num", "100"); + schemaBuilder.primaryKey("f0"); + return schemaBuilder.build(); + } + + private List writeData(int size) throws Exception { + List messages; + Table table = getTable(); + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite batchTableWrite = builder.newWrite()) { + for (int i = 0; i < size; i++) { + for (int j = 0; j < 100; j++) { + batchTableWrite.write(data(i)); + } + } + messages = batchTableWrite.prepareCommit(); + } + + return messages; + } + + private void commit(List messages) throws Exception { + getTable().newBatchWriteBuilder().newCommit().commit(messages); + } + + private void createTable() throws Exception { + catalog.createDatabase(database, true); + catalog.createTable(identifier(), schema(), true); + } + + private Table getTable() throws Exception { + return catalog.getTable(identifier()); + } + + private Identifier identifier() { + return Identifier.create(database, tableName); + } + + private static InternalRow data(int bucket) { + GenericRow row = + GenericRow.of( + RANDOM.nextLong(), + (long) RANDOM.nextInt(10000), + (long) RANDOM.nextInt(10000), + (long) RANDOM.nextInt(10000)); + return new DynamicBucketRow(row, bucket); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java similarity index 96% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java index 152ca35e9df2..00bdbecae78d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java @@ -48,9 +48,9 @@ import java.util.concurrent.atomic.AtomicInteger; /** Order Rewrite Action tests for {@link SortCompactAction}. */ -public class SortCompactActionITCase extends ActionITCaseBase { +public class SortCompactActionForUnawareBucketITCase extends ActionITCaseBase { - private static final Random random = new Random(); + private static final Random RANDOM = new Random(); private void prepareData(int size, int loop) throws Exception { createTable(); @@ -230,7 +230,7 @@ public void testTableConf() throws Exception { } private void zorder(List columns) throws Exception { - if (random.nextBoolean()) { + if (RANDOM.nextBoolean()) { new SortCompactAction( warehouse, database, @@ -246,7 +246,7 @@ private void zorder(List columns) throws Exception { } private void order(List columns) throws Exception { - if (random.nextBoolean()) { + if (RANDOM.nextBoolean()) { new SortCompactAction( warehouse, database, @@ -270,12 +270,12 @@ private void callProcedure(String orderStrategy, List orderByColumns) { true); } - public void createTable() throws Exception { + private void createTable() throws Exception { catalog.createDatabase(database, true); catalog.createTable(identifier(), schema(), true); } - public Identifier identifier() { + private Identifier identifier() { return Identifier.create(database, tableName); } @@ -319,7 +319,7 @@ private List writeData(int size) throws Exception { return messages; } - public Table getTable() throws Exception { + private Table getTable() throws Exception { return catalog.getTable(identifier()); } @@ -356,8 +356,8 @@ private static InternalRow data(int p, int i, int j) { } private static byte[] randomBytes() { - byte[] binary = new byte[random.nextInt(10)]; - random.nextBytes(binary); + byte[] binary = new byte[RANDOM.nextInt(10)]; + RANDOM.nextBytes(binary); return binary; } }