diff --git a/LICENSE b/LICENSE index d3f258fa3916..ef08963ea4a7 100644 --- a/LICENSE +++ b/LICENSE @@ -234,6 +234,8 @@ from http://flink.apache.org/ version 1.17.0 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java +paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java +paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java from http://iceberg.apache.org/ version 1.3.0 paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql diff --git a/docs/content/concepts/append-only-table.md b/docs/content/concepts/append-only-table.md index cc469019d8f4..f81a906d8467 100644 --- a/docs/content/concepts/append-only-table.md +++ b/docs/content/concepts/append-only-table.md @@ -227,6 +227,27 @@ behavior is exactly the same as [Append For Qeueue]({{< ref "#compaction" >}}). The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in flink by flink action in paimon and disable all the other compaction by set `write-only`. +### Sort Compact + +The data in a per-partition out of order will lead a slow select, compaction may slow down the inserting. It is a good choice for you to set +write-only for inserting job, and after per-partition data done, trigger a partition `Sort Compact` action. + +You can trigger action by shell script: +```shell +/bin/flink run \ + /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \ + compact \ + --warehouse hdfs:///path/to/warehouse \ + --database test_db \ + --table \ + --order-strategy \ + --order-by +``` + +{{< generated/sort-compact >}} + +Other config is the same as [Compact Table]({{< ref "concepts/file-operations#compact-table" >}}) + ### Streaming Source Unaware-bucket mode append-only table supported streaming read and write, but no longer guarantee order anymore. You cannot regard it diff --git a/docs/layouts/shortcodes/generated/sort-compact.html b/docs/layouts/shortcodes/generated/sort-compact.html new file mode 100644 index 000000000000..654904c3e95e --- /dev/null +++ b/docs/layouts/shortcodes/generated/sort-compact.html @@ -0,0 +1,55 @@ +{{/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/}} +{{ $ref := ref . "maintenance/configurations.md" }} + + + + + + + + + + + + + + + + + + + +
ConfigurationDescription
--order-strategy
the order strategy now only support zorder. For example: --order-strategy zorder
--order-by
Specify the order columns. For example: --order-by col0, col1
\ No newline at end of file diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java new file mode 100644 index 000000000000..de281aae39f1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sort.zorder; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.ZOrderByteUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.function.BiFunction; + +import static org.apache.paimon.utils.ZOrderByteUtils.NULL_BYTES; +import static org.apache.paimon.utils.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE; + +/** Z-indexer for responsibility to generate z-index. */ +public class ZIndexer implements Serializable { + + private final Set functionSet; + private final int[] fieldsIndex; + private final int totalBytes; + private transient ByteBuffer reuse; + + public ZIndexer(RowType rowType, List orderColumns) { + List fields = rowType.getFieldNames(); + fieldsIndex = new int[orderColumns.size()]; + for (int i = 0; i < fieldsIndex.length; i++) { + int index = fields.indexOf(orderColumns.get(i)); + if (index == -1) { + throw new IllegalArgumentException( + "Can't find column: " + + orderColumns.get(i) + + " in row type fields: " + + fields); + } + fieldsIndex[i] = index; + } + this.functionSet = constructFunctionMap(rowType.getFields()); + this.totalBytes = PRIMITIVE_BUFFER_SIZE * this.fieldsIndex.length; + } + + public void open() { + this.reuse = ByteBuffer.allocate(totalBytes); + functionSet.forEach(RowProcessor::open); + } + + public int size() { + return totalBytes; + } + + public byte[] index(InternalRow row) { + byte[][] columnBytes = new byte[fieldsIndex.length][]; + + int index = 0; + for (RowProcessor f : functionSet) { + columnBytes[index++] = f.zvalue(row); + } + + return ZOrderByteUtils.interleaveBits(columnBytes, totalBytes, reuse); + } + + public Set constructFunctionMap(List fields) { + Set zorderFunctionSet = new LinkedHashSet<>(); + // Construct zorderFunctionSet and fill dataTypes, rowFields + for (int fieldIndex = 0; fieldIndex < fieldsIndex.length; fieldIndex++) { + int index = fieldsIndex[fieldIndex]; + DataField field = fields.get(index); + zorderFunctionSet.add(zmapColumnToCalculator(field, index)); + } + return zorderFunctionSet; + } + + public static RowProcessor zmapColumnToCalculator(DataField field, int index) { + DataType type = field.type(); + return type.accept(new TypeVisitor(index)); + } + + /** Type Visitor to generate function map from row column to z-index. */ + public static class TypeVisitor implements DataTypeVisitor { + + private final int fieldIndex; + + public TypeVisitor(int index) { + this.fieldIndex = index; + } + + @Override + public RowProcessor visit(CharType charType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(charType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.stringToOrderedBytes( + o.toString(), PRIMITIVE_BUFFER_SIZE, reuse) + .array(); + }); + } + + @Override + public RowProcessor visit(VarCharType varCharType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(varCharType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.stringToOrderedBytes( + o.toString(), PRIMITIVE_BUFFER_SIZE, reuse) + .array(); + }); + } + + @Override + public RowProcessor visit(BooleanType booleanType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(booleanType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + if (o == null) { + return NULL_BYTES; + } + ZOrderByteUtils.reuse(reuse, PRIMITIVE_BUFFER_SIZE); + reuse.put(0, (byte) ((boolean) o ? -127 : 0)); + return reuse.array(); + }); + } + + @Override + public RowProcessor visit(BinaryType binaryType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(binaryType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.byteTruncateOrFill( + (byte[]) o, PRIMITIVE_BUFFER_SIZE, reuse) + .array(); + }); + } + + @Override + public RowProcessor visit(VarBinaryType varBinaryType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(varBinaryType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.byteTruncateOrFill( + (byte[]) o, PRIMITIVE_BUFFER_SIZE, reuse) + .array(); + }); + } + + @Override + public RowProcessor visit(DecimalType decimalType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(decimalType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.byteTruncateOrFill( + ((Decimal) o).toUnscaledBytes(), + PRIMITIVE_BUFFER_SIZE, + reuse) + .array(); + }); + } + + @Override + public RowProcessor visit(TinyIntType tinyIntType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(tinyIntType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.tinyintToOrderedBytes((byte) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(SmallIntType smallIntType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(smallIntType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.shortToOrderedBytes((short) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(IntType intType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(intType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.intToOrderedBytes((int) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(BigIntType bigIntType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(bigIntType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.longToOrderedBytes((long) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(FloatType floatType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(floatType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.floatToOrderedBytes((float) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(DoubleType doubleType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(doubleType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.doubleToOrderedBytes((double) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(DateType dateType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(dateType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.intToOrderedBytes((int) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(TimeType timeType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(timeType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.intToOrderedBytes((int) o, reuse).array(); + }); + } + + @Override + public RowProcessor visit(TimestampType timestampType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(timestampType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.longToOrderedBytes( + ((Timestamp) o).getMillisecond(), reuse) + .array(); + }); + } + + @Override + public RowProcessor visit(LocalZonedTimestampType localZonedTimestampType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(localZonedTimestampType, fieldIndex); + return new RowProcessor( + (row, reuse) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null + ? NULL_BYTES + : ZOrderByteUtils.longToOrderedBytes( + ((Timestamp) o).getMillisecond(), reuse) + .array(); + }); + } + + @Override + public RowProcessor visit(ArrayType arrayType) { + throw new RuntimeException("Unsupported type"); + } + + @Override + public RowProcessor visit(MultisetType multisetType) { + throw new RuntimeException("Unsupported type"); + } + + @Override + public RowProcessor visit(MapType mapType) { + throw new RuntimeException("Unsupported type"); + } + + @Override + public RowProcessor visit(RowType rowType) { + throw new RuntimeException("Unsupported type"); + } + } + + /** Be used as converting row field record to devoted bytes. */ + public static class RowProcessor implements Serializable { + + private transient ByteBuffer reuse; + private final ZProcessFunction process; + + public RowProcessor(ZProcessFunction process) { + this.process = process; + } + + public void open() { + reuse = ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE); + } + + public byte[] zvalue(InternalRow o) { + return process.apply(o, reuse); + } + } + + interface ZProcessFunction extends BiFunction, Serializable {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java new file mode 100644 index 000000000000..1ea4194c2e11 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.paimon.utils; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Within Z-Ordering the byte representations of objects being compared must be ordered, this + * requires several types to be transformed when converted to bytes. The goal is to map object's + * whose byte representation are not lexicographically ordered into representations that are + * lexicographically ordered. Bytes produced should be compared lexicographically as unsigned bytes, + * big-endian. + * + *

All types except for String are stored within an 8 Byte Buffer + * + *

Most of these techniques are derived from + * https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/ + */ +public class ZOrderByteUtils { + + public static final int PRIMITIVE_BUFFER_SIZE = 8; + public static final byte[] NULL_BYTES = new byte[PRIMITIVE_BUFFER_SIZE]; + private static ThreadLocal encoderThreadLocal = new ThreadLocal<>(); + + static { + Arrays.fill(NULL_BYTES, (byte) 0x00); + } + + private ZOrderByteUtils() {} + + static ByteBuffer allocatePrimitiveBuffer() { + return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE); + } + + /** + * Signed ints do not have their bytes in magnitude order because of the sign bit. To fix this, + * flip the sign bit so that all negatives are ordered before positives. This essentially shifts + * the 0 value so that we don't break our ordering when we cross the new 0 value. + */ + public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) { + ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(((long) val) ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, + * ByteBuffer)}. + */ + public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) { + ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(val ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, + * ByteBuffer)}. + */ + public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) { + ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(((long) val) ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, + * ByteBuffer)}. + */ + public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse) { + ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE); + bytes.putLong(((long) val) ^ 0x8000000000000000L); + return bytes; + } + + /** + * IEEE 754 : “If two floating-point numbers in the same format are ordered (say, x {@literal <} + * y), they are ordered the same way when their bits are reinterpreted as sign-magnitude + * integers.” + * + *

Which means floats can be treated as sign magnitude integers which can then be converted + * into lexicographically comparable bytes. + */ + public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) { + ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE); + long lval = Double.doubleToLongBits(val); + lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE); + bytes.putLong(lval); + return bytes; + } + + /** + * Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)}. + */ + public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer reuse) { + ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE); + long lval = Double.doubleToLongBits(val); + lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE); + bytes.putLong(lval); + return bytes; + } + + /** + * Strings are lexicographically sortable BUT if different byte array lengths will ruin the + * Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every + * time). This implementation just uses a set size to for all output byte representations. + * Truncating longer strings and right padding 0 for shorter strings. + */ + @SuppressWarnings("ByteBufferBackingArray") + public static ByteBuffer stringToOrderedBytes(String val, int length, ByteBuffer reuse) { + CharsetEncoder encoder = encoderThreadLocal.get(); + if (encoder == null) { + encoder = StandardCharsets.UTF_8.newEncoder(); + encoderThreadLocal.set(encoder); + } + + ByteBuffer bytes = reuse(reuse, length); + Arrays.fill(bytes.array(), 0, length, (byte) 0x00); + if (val != null) { + CharBuffer inputBuffer = CharBuffer.wrap(val); + encoder.encode(inputBuffer, bytes, true); + } + return bytes; + } + + /** + * Return a bytebuffer with the given bytes truncated to length, or filled with 0's to length + * depending on whether the given bytes are larger or smaller than the given length. + */ + @SuppressWarnings("ByteBufferBackingArray") + public static ByteBuffer byteTruncateOrFill(byte[] val, int length, ByteBuffer reuse) { + ByteBuffer bytes = reuse(reuse, length); + if (val.length < length) { + bytes.put(val, 0, val.length); + Arrays.fill(bytes.array(), val.length, length, (byte) 0x00); + } else { + bytes.put(val, 0, length); + } + return bytes; + } + + public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize) { + return interleaveBits(columnsBinary, interleavedSize, ByteBuffer.allocate(interleavedSize)); + } + + /** + * Interleave bits using a naive loop. Variable length inputs are allowed but to get a + * consistent ordering it is required that every column contribute the same number of bytes in + * each invocation. Bits are interleaved from all columns that have a bit available at that + * position. Once a Column has no more bits to produce it is skipped in the interleaving. + * + * @param columnsBinary an array of ordered byte representations of the columns being ZOrdered + * @param interleavedSize the number of bytes to use in the output + * @return the columnbytes interleaved + */ + // NarrowingCompoundAssignment is intended here. See + // https://github.com/apache/iceberg/pull/5200#issuecomment-1176226163 + @SuppressWarnings({"ByteBufferBackingArray", "NarrowingCompoundAssignment"}) + public static byte[] interleaveBits( + byte[][] columnsBinary, int interleavedSize, ByteBuffer reuse) { + byte[] interleavedBytes = reuse.array(); + Arrays.fill(interleavedBytes, 0, interleavedSize, (byte) 0x00); + + int sourceColumn = 0; + int sourceByte = 0; + int sourceBit = 7; + int interleaveByte = 0; + int interleaveBit = 7; + + while (interleaveByte < interleavedSize) { + // Take the source bit from source byte and move it to the output bit position + interleavedBytes[interleaveByte] |= + (columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) + >>> sourceBit + << interleaveBit; + --interleaveBit; + + // Check if an output byte has been completed + if (interleaveBit == -1) { + // Move to the next output byte + interleaveByte++; + // Move to the highest order bit of the new output byte + interleaveBit = 7; + } + + // Check if the last output byte has been completed + if (interleaveByte == interleavedSize) { + break; + } + + // Find the next source bit to interleave + do { + // Move to next column + ++sourceColumn; + if (sourceColumn == columnsBinary.length) { + // If the last source column was used, reset to next bit of first column + sourceColumn = 0; + --sourceBit; + if (sourceBit == -1) { + // If the last bit of the source byte was used, reset to the highest bit of + // the next + // byte + sourceByte++; + sourceBit = 7; + } + } + } while (columnsBinary[sourceColumn].length <= sourceByte); + } + return interleavedBytes; + } + + public static ByteBuffer reuse(ByteBuffer reuse, int length) { + reuse.position(0); + reuse.limit(length); + return reuse; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java new file mode 100644 index 000000000000..4e9c0899360d --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sort.zorder; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ZOrderByteUtils; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +/** Tests for {@link ZIndexer}. */ +public class ZIndexerTest { + + private static final Random RANDOM = new Random(); + + @Test + public void testZIndexer() { + RowType rowType = RowType.of(new IntType(), new BigIntType()); + + ZIndexer zIndexer = new ZIndexer(rowType, Arrays.asList("f0", "f1")); + zIndexer.open(); + + for (int i = 0; i < 1000; i++) { + int a = RANDOM.nextInt(); + long b = RANDOM.nextLong(); + + InternalRow internalRow = GenericRow.of(a, b); + + byte[] zOrder = zIndexer.index(internalRow); + + byte[][] zCache = new byte[2][]; + ByteBuffer byteBuffer = ByteBuffer.allocate(8); + ZOrderByteUtils.intToOrderedBytes(a, byteBuffer); + zCache[0] = Arrays.copyOf(byteBuffer.array(), 8); + + ZOrderByteUtils.longToOrderedBytes(b, byteBuffer); + zCache[1] = Arrays.copyOf(byteBuffer.array(), 8); + + byte[] expectedZOrder = ZOrderByteUtils.interleaveBits(zCache, 16); + + for (int j = 0; j < 16; j++) { + Assertions.assertThat(zOrder[j]).isEqualTo(expectedZOrder[j]); + } + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java new file mode 100644 index 000000000000..de1bf7709cb1 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +package org.apache.paimon.utils; + +import org.junit.Assert; +import org.junit.Test; +import org.testcontainers.shaded.com.google.common.primitives.UnsignedBytes; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +/** Tests for {@link ZOrderByteUtils}. */ +public class TestZOrderByteUtil { + private static final byte IIIIIIII = (byte) 255; + private static final byte IOIOIOIO = (byte) 170; + private static final byte OIOIOIOI = (byte) 85; + private static final byte OOOOIIII = (byte) 15; + private static final byte OOOOOOOI = (byte) 1; + private static final byte OOOOOOOO = (byte) 0; + + private static final int NUM_TESTS = 100000; + private static final int NUM_INTERLEAVE_TESTS = 1000; + + private final Random random = new Random(42); + + private String bytesToString(byte[] bytes) { + StringBuilder result = new StringBuilder(); + for (byte b : bytes) { + result.append(String.format("%8s", Integer.toBinaryString(b & 0xFF)).replace(' ', '0')); + } + return result.toString(); + } + + /** Returns a non-0 length byte array. */ + private byte[] generateRandomBytes() { + int length = Math.abs(random.nextInt(100) + 1); + return generateRandomBytes(length); + } + + /** Returns a byte array of a specified length. */ + private byte[] generateRandomBytes(int length) { + byte[] result = new byte[length]; + random.nextBytes(result); + return result; + } + + /** Test method to ensure correctness of byte interleaving code. */ + private String interleaveStrings(String[] strings) { + StringBuilder result = new StringBuilder(); + int totalLength = Arrays.stream(strings).mapToInt(String::length).sum(); + int substringIndex = 0; + int characterIndex = 0; + while (characterIndex < totalLength) { + for (String str : strings) { + if (substringIndex < str.length()) { + result.append(str.charAt(substringIndex)); + characterIndex++; + } + } + substringIndex++; + } + return result.toString(); + } + + /** + * Compares the result of a string based interleaving algorithm implemented above versus the + * binary bit-shifting algorithm used in ZOrderByteUtils. Either both algorithms are identically + * wrong or are both identically correct. + */ + @Test + public void testInterleaveRandomExamples() { + for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) { + int numByteArrays = Math.abs(random.nextInt(6)) + 1; + byte[][] testBytes = new byte[numByteArrays][]; + String[] testStrings = new String[numByteArrays]; + for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) { + testBytes[byteIndex] = generateRandomBytes(); + testStrings[byteIndex] = bytesToString(testBytes[byteIndex]); + } + + int zOrderSize = Arrays.stream(testBytes).mapToInt(column -> column.length).sum(); + byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes, zOrderSize); + String byteResultAsString = bytesToString(byteResult); + + String stringResult = interleaveStrings(testStrings); + + Assert.assertEquals( + "String interleave didn't match byte interleave", + stringResult, + byteResultAsString); + } + } + + @Test + public void testReuseInterleaveBuffer() { + int numByteArrays = 2; + int colLength = 16; + ByteBuffer interleaveBuffer = ByteBuffer.allocate(numByteArrays * colLength); + for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) { + byte[][] testBytes = new byte[numByteArrays][]; + String[] testStrings = new String[numByteArrays]; + for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) { + testBytes[byteIndex] = generateRandomBytes(colLength); + testStrings[byteIndex] = bytesToString(testBytes[byteIndex]); + } + + byte[] byteResult = + ZOrderByteUtils.interleaveBits( + testBytes, numByteArrays * colLength, interleaveBuffer); + String byteResultAsString = bytesToString(byteResult); + + String stringResult = interleaveStrings(testStrings); + + Assert.assertEquals( + "String interleave didn't match byte interleave", + stringResult, + byteResultAsString); + } + } + + @Test + public void testInterleaveEmptyBits() { + byte[][] test = new byte[4][10]; + byte[] expected = new byte[40]; + + Assert.assertArrayEquals( + "Should combine empty arrays", expected, ZOrderByteUtils.interleaveBits(test, 40)); + } + + @Test + public void testInterleaveFullBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[] {IIIIIIII, IIIIIIII}; + test[1] = new byte[] {IIIIIIII}; + test[2] = new byte[0]; + test[3] = new byte[] {IIIIIIII, IIIIIIII, IIIIIIII}; + byte[] expected = new byte[] {IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII}; + + Assert.assertArrayEquals( + "Should combine full arrays", expected, ZOrderByteUtils.interleaveBits(test, 6)); + } + + @Test + public void testInterleaveMixedBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[] {OOOOOOOI, IIIIIIII, OOOOOOOO, OOOOIIII}; + test[1] = new byte[] {OOOOOOOI, OOOOOOOO, IIIIIIII}; + test[2] = new byte[] {OOOOOOOI}; + test[3] = new byte[] {OOOOOOOI}; + byte[] expected = + new byte[] { + OOOOOOOO, OOOOOOOO, OOOOOOOO, OOOOIIII, IOIOIOIO, IOIOIOIO, OIOIOIOI, OIOIOIOI, + OOOOIIII + }; + Assert.assertArrayEquals( + "Should combine mixed byte arrays", + expected, + ZOrderByteUtils.interleaveBits(test, 9)); + } + + @Test + public void testIntOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + int aInt = random.nextInt(); + int bInt = random.nextInt(); + int intCompare = Integer.signum(Integer.compare(aInt, bInt)); + byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aInt, + bInt, + intCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + intCompare, + byteCompare); + } + } + + @Test + public void testLongOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + long aLong = random.nextInt(); + long bLong = random.nextInt(); + int longCompare = Integer.signum(Long.compare(aLong, bLong)); + byte[] aBytes = ZOrderByteUtils.longToOrderedBytes(aLong, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.longToOrderedBytes(bLong, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aLong, + bLong, + longCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + longCompare, + byteCompare); + } + } + + @Test + public void testShortOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + short aShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + short bShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aShort, bShort)); + byte[] aBytes = ZOrderByteUtils.shortToOrderedBytes(aShort, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.shortToOrderedBytes(bShort, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aShort, + bShort, + longCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + longCompare, + byteCompare); + } + } + + @Test + public void testTinyOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + byte aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + byte bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aByte, bByte)); + byte[] aBytes = ZOrderByteUtils.tinyintToOrderedBytes(aByte, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.tinyintToOrderedBytes(bByte, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aByte, + bByte, + longCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + longCompare, + byteCompare); + } + } + + @Test + public void testFloatOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + float aFloat = random.nextFloat(); + float bFloat = random.nextFloat(); + int floatCompare = Integer.signum(Float.compare(aFloat, bFloat)); + byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of floats should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aFloat, + bFloat, + floatCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + floatCompare, + byteCompare); + } + } + + @Test + public void testDoubleOrdering() { + ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer(); + for (int i = 0; i < NUM_TESTS; i++) { + double aDouble = random.nextDouble(); + double bDouble = random.nextDouble(); + int doubleCompare = Integer.signum(Double.compare(aDouble, bDouble)); + byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of doubles should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aDouble, + bDouble, + doubleCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + doubleCompare, + byteCompare); + } + } + + @Test + public void testStringOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(128); + ByteBuffer bBuffer = ByteBuffer.allocate(128); + for (int i = 0; i < NUM_TESTS; i++) { + String aString = randomString(); + String bString = randomString(); + int stringCompare = Integer.signum(aString.compareTo(bString)); + byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of strings should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aString, + bString, + stringCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + stringCompare, + byteCompare); + } + } + + @Test + public void testByteTruncateOrFill() { + ByteBuffer aBuffer = ByteBuffer.allocate(128); + ByteBuffer bBuffer = ByteBuffer.allocate(128); + for (int i = 0; i < NUM_TESTS; i++) { + byte[] aBytesRaw = randomBytes(); + byte[] bBytesRaw = randomBytes(); + int stringCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator() + .compare(aBytesRaw, bBytesRaw)); + byte[] aBytes = ZOrderByteUtils.byteTruncateOrFill(aBytesRaw, 128, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.byteTruncateOrFill(bBytesRaw, 128, bBuffer).array(); + int byteCompare = + Integer.signum( + UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals( + String.format( + "Ordering of strings should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aBytesRaw, + bBytesRaw, + stringCompare, + Arrays.toString(aBytes), + Arrays.toString(bBytes), + byteCompare), + stringCompare, + byteCompare); + } + } + + private byte[] randomBytes() { + byte[] binary = new byte[random.nextInt(50)]; + random.nextBytes(binary); + return binary; + } + + private String randomString() { + int length = random.nextInt(50); + byte[] buffer = new byte[length]; + + for (int i = 0; i < length; i += 1) { + buffer[i] = (byte) ('a' + random.nextInt(26)); + } + + return new String(buffer); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 6242bec27331..0d4d2535465f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -21,16 +21,24 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeCasts; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.types.logical.LogicalType; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; /** Abstract base of {@link Action} for table. */ public abstract class ActionBase implements Action { @@ -40,6 +48,8 @@ public abstract class ActionBase implements Action { protected final Catalog catalog; protected final FlinkCatalog flinkCatalog; protected final String catalogName = "paimon-" + UUID.randomUUID(); + protected final StreamExecutionEnvironment env; + protected final StreamTableEnvironment batchTEnv; public ActionBase(String warehouse, Map catalogConfig) { catalogOptions = Options.fromMap(catalogConfig); @@ -47,6 +57,12 @@ public ActionBase(String warehouse, Map catalogConfig) { catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog, catalogOptions); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + batchTEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + + // register flink catalog to table environment + batchTEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog); + batchTEnv.useCatalog(flinkCatalog.getName()); } protected void execute(StreamExecutionEnvironment env, String defaultName) throws Exception { @@ -60,4 +76,34 @@ protected Catalog.Loader catalogLoader() { Options catalogOptions = this.catalogOptions; return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions); } + + /** + * Extract {@link LogicalType}s from Flink {@link org.apache.flink.table.types.DataType}s and + * convert to Paimon {@link DataType}s. + */ + protected List toPaimonTypes( + List flinkDataTypes) { + return flinkDataTypes.stream() + .map(org.apache.flink.table.types.DataType::getLogicalType) + .map(LogicalTypeConversion::toDataType) + .collect(Collectors.toList()); + } + + /** + * Check whether each {@link DataType} of actualTypes is compatible with that of expectedTypes + * respectively. + */ + protected boolean compatibleCheck(List actualTypes, List expectedTypes) { + if (actualTypes.size() != expectedTypes.size()) { + return false; + } + + for (int i = 0; i < actualTypes.size(); i++) { + if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i), expectedTypes.get(i))) { + return false; + } + } + + return true; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 27c5c9046e32..9d0b99484a59 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -119,4 +119,8 @@ public void run() throws Exception { build(env); execute(env, "Compact job"); } + + public List> getPartitions() { + return partitions; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java index 3af89e8bfefd..b74bd8b2f714 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.MultipleParameterTool; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -41,8 +42,29 @@ public Optional create(MultipleParameterTool params) { Map catalogConfig = optionalConfigMap(params, "catalog-conf"); - CompactAction action = - new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); + CompactAction action; + if (params.has("order-strategy")) { + SortCompactAction sortCompactAction = + new SortCompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); + + String strategy = params.get("order-strategy"); + sortCompactAction.withOrderStrategy(strategy); + + if (params.has("order-by")) { + String sqlOrderBy = params.get("order-by"); + if (sqlOrderBy == null) { + throw new IllegalArgumentException("Please specify \"order-by\"."); + } + sortCompactAction.withOrderColumns(Arrays.asList(sqlOrderBy.split(","))); + } else { + throw new IllegalArgumentException( + "Please specify order columns in parameter --order-by."); + } + + action = sortCompactAction; + } else { + action = new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig); + } if (params.has("partition")) { List> partitions = getPartitions(params); @@ -61,7 +83,9 @@ public void printHelp() { System.out.println("Syntax:"); System.out.println( " compact --warehouse --database " - + "--table [--partition ]"); + + "--table [--partition ]" + + "[--order-strategy ]" + + "[--order-by ]"); System.out.println( " compact --warehouse s3://path/to/warehouse --database " + "--table [--catalog-conf [--catalog-conf ...]]"); @@ -70,6 +94,12 @@ public void printHelp() { System.out.println("Partition name syntax:"); System.out.println(" key1=value1,key2=value2,..."); + + System.out.println(); + System.out.println("Note:"); + System.out.println( + " order compact now only support append-only table with bucket=-1, please don't specify --order-strategy parameter if your table does not meet the request"); + System.out.println(" order-strategy now only support zorder in batch mode"); System.out.println(); System.out.println("Examples:"); @@ -84,6 +114,8 @@ public void printHelp() { " compact --warehouse s3:///path/to/warehouse " + "--database test_db " + "--table test_table " + + "--order-strategy zorder " + + "--order-by a,b,c " + "--catalog-conf s3.endpoint=https://****.com " + "--catalog-conf s3.access-key=***** " + "--catalog-conf s3.secret-key=***** "); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java new file mode 100644 index 000000000000..ff5e1a90d5f8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.flink.FlinkConnectorOptions; +import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.flink.sorter.TableSorter; +import org.apache.paimon.flink.source.FlinkSourceBuilder; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.AppendOnlyFileStoreTable; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.data.RowData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Compact with sort action. */ +public class SortCompactAction extends CompactAction { + + private String sortStrategy; + private List orderColumns; + + public SortCompactAction( + String warehouse, + String database, + String tableName, + Map catalogConfig) { + super(warehouse, database, tableName, catalogConfig); + + checkArgument( + table instanceof AppendOnlyFileStoreTable, + "Only sort compaction works with append-only table for now."); + table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true")); + } + + @Override + public void run() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + build(env); + execute(env, "Sort Compact Job"); + } + + public void build(StreamExecutionEnvironment env) { + // only support batch sort yet + if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) + != RuntimeExecutionMode.BATCH) { + throw new IllegalArgumentException( + "Only support batch mode yet, please set -Dexecution.runtime-mode=BATCH"); + } + FileStoreTable fileStoreTable = (FileStoreTable) table; + if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) { + throw new IllegalArgumentException("Sort Compact only supports append-only table yet"); + } + if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) { + throw new IllegalArgumentException("Sort Compact only supports bucket=-1 yet."); + } + Map tableConfig = fileStoreTable.options(); + FlinkSourceBuilder sourceBuilder = + new FlinkSourceBuilder( + ObjectIdentifier.of( + catalogName, + identifier.getDatabaseName(), + identifier.getObjectName()), + fileStoreTable); + + if (getPartitions() != null) { + Predicate partitionPredicate = + PredicateBuilder.or( + getPartitions().stream() + .map(p -> PredicateBuilder.partition(p, table.rowType())) + .toArray(Predicate[]::new)); + sourceBuilder.withPredicate(partitionPredicate); + } + + String scanParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key()); + if (scanParallelism != null) { + sourceBuilder.withParallelism(Integer.parseInt(scanParallelism)); + } + + DataStream source = sourceBuilder.withEnv(env).withContinuousMode(false).build(); + TableSorter sorter = + TableSorter.getSorter(env, source, fileStoreTable, sortStrategy, orderColumns); + DataStream sorted = sorter.sort(); + + FlinkSinkBuilder flinkSinkBuilder = new FlinkSinkBuilder(fileStoreTable); + flinkSinkBuilder.withInput(sorted).withOverwritePartition(new HashMap<>()); + String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key()); + if (sinkParallelism != null) { + flinkSinkBuilder.withParallelism(Integer.parseInt(sinkParallelism)); + } + + flinkSinkBuilder.build(); + } + + public void withOrderStrategy(String sortStrategy) { + this.sortStrategy = sortStrategy; + } + + public void withOrderColumns(List orderColumns) { + this.orderColumns = orderColumns; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index 84114d1c6491..3b8147a3efdc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -21,22 +21,15 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.flink.utils.TableEnvironmentUtils; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypeCasts; import org.apache.paimon.utils.Preconditions; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.LogicalType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,18 +37,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** Abstract base of {@link Action} for table. */ public abstract class TableActionBase extends ActionBase { private static final Logger LOG = LoggerFactory.getLogger(TableActionBase.class); - protected final StreamExecutionEnvironment env; - protected final StreamTableEnvironment batchTEnv; - protected final Identifier identifier; - protected Table table; + protected final Identifier identifier; TableActionBase( String warehouse, @@ -63,12 +52,6 @@ public abstract class TableActionBase extends ActionBase { String tableName, Map catalogConfig) { super(warehouse, catalogConfig); - env = StreamExecutionEnvironment.getExecutionEnvironment(); - batchTEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); - - // register flink catalog to table environment - batchTEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog); - batchTEnv.useCatalog(flinkCatalog.getName()); identifier = new Identifier(databaseName, tableName); try { table = catalog.getTable(identifier); @@ -79,36 +62,6 @@ public abstract class TableActionBase extends ActionBase { } } - /** - * Extract {@link LogicalType}s from Flink {@link org.apache.flink.table.types.DataType}s and - * convert to Paimon {@link DataType}s. - */ - protected List toPaimonTypes( - List flinkDataTypes) { - return flinkDataTypes.stream() - .map(org.apache.flink.table.types.DataType::getLogicalType) - .map(LogicalTypeConversion::toDataType) - .collect(Collectors.toList()); - } - - /** - * Check whether each {@link DataType} of actualTypes is compatible with that of expectedTypes - * respectively. - */ - protected boolean compatibleCheck(List actualTypes, List expectedTypes) { - if (actualTypes.size() != expectedTypes.size()) { - return false; - } - - for (int i = 0; i < actualTypes.size(); i++) { - if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i), expectedTypes.get(i))) { - return false; - } - } - - return true; - } - /** Sink {@link DataStream} dataStream to table with Flink Table API in batch environment. */ protected void batchSink(DataStream dataStream) { List> transformations = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java new file mode 100644 index 000000000000..d07615e9389c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.shuffle; + +import org.apache.paimon.utils.Pair; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.InputSelectable; +import org.apache.flink.streaming.api.operators.InputSelection; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; +import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.util.StreamRecordCollector; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.XORShiftRandom; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Random; + +/** + * RangeShuffle Util to shuffle the input stream by the sampling range. See `rangeShuffleBykey` + * method how to build the topo. + */ +public class RangeShuffle { + + /** + * The RelNode with range-partition distribution will create the following transformations. + * + *

Explanation of the following figure: "[LSample, n]" means operator is LSample and + * parallelism is n, "LSample" means LocalSampleOperator, "GSample" means GlobalSampleOperator, + * "ARange" means AssignRangeId, "RRange" means RemoveRangeId. + * + *

{@code
+     * [IN,n]->[LSample,n]->[GSample,1]-BROADCAST
+     *    \                                    \
+     *     -----------------------------BATCH-[ARange,n]-PARTITION->[RRange,m]->
+     * }
+ * + *

The streams except the sample and histogram process stream will been blocked, so the the + * sample and histogram process stream does not care about requiredExchangeMode. + */ + public static DataStream> rangeShuffleByKey( + DataStream> inputDataStream, + Comparator keyComparator, + Class keyClass, + int sampleSize, + int rangeNum) { + Transformation> input = inputDataStream.getTransformation(); + + OneInputTransformation, T> keyInput = + new OneInputTransformation<>( + input, + "ABSTRACT KEY", + new StreamMap<>(Pair::getLeft), + TypeInformation.of(keyClass), + input.getParallelism()); + + // 1. Fixed size sample in each partitions. + OneInputTransformation> localSample = + new OneInputTransformation<>( + keyInput, + "LOCAL SAMPLE", + new LocalSampleOperator<>(sampleSize), + new TupleTypeInfo<>( + BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInformation.of(keyClass)), + keyInput.getParallelism()); + + // 2. Collect all the samples and gather them into a sorted key range. + OneInputTransformation, List> sampleAndHistogram = + new OneInputTransformation<>( + localSample, + "GLOBAL SAMPLE", + new GlobalSampleOperator<>(sampleSize, keyComparator, rangeNum), + new ListTypeInfo<>(TypeInformation.of(keyClass)), + 1); + + // 3. Take range boundaries as broadcast input and take the tuple of partition id and + // record as output. + // The shuffle mode of input edge must be BATCH to avoid dead lock. See + // DeadlockBreakupProcessor. + TwoInputTransformation, Pair, Tuple2>> + preparePartition = + new TwoInputTransformation<>( + new PartitionTransformation<>( + sampleAndHistogram, + new BroadcastPartitioner<>(), + StreamExchangeMode.BATCH), + new PartitionTransformation<>( + input, + new ForwardPartitioner<>(), + StreamExchangeMode.BATCH), + "ASSIGN RANGE INDEX", + new AssignRangeIndexOperator<>(keyComparator), + new TupleTypeInfo<>( + BasicTypeInfo.INT_TYPE_INFO, input.getOutputType()), + input.getParallelism()); + + // 4. Remove the partition id. (shuffle according range partition) + return new DataStream<>( + inputDataStream.getExecutionEnvironment(), + new OneInputTransformation<>( + new PartitionTransformation<>( + preparePartition, + new CustomPartitionerWrapper<>( + new AssignRangeIndexOperator.RangePartitioner(rangeNum), + new AssignRangeIndexOperator.Tuple2KeySelector<>()), + StreamExchangeMode.BATCH), + "REMOVE KEY", + new RemoveRangeIndexOperator<>(), + input.getOutputType(), + input.getParallelism())); + } + + /** + * LocalSampleOperator wraps the sample logic on the partition side (the first phase of + * distributed sample algorithm). Outputs sampled weight with record. + * + *

See {@link Sampler}. + */ + @Internal + public static class LocalSampleOperator extends TableStreamOperator> + implements OneInputStreamOperator>, BoundedOneInput { + + private static final long serialVersionUID = 1L; + + private final int numSample; + + private transient Collector> collector; + private transient Sampler sampler; + + public LocalSampleOperator(int numSample) { + this.numSample = numSample; + } + + @Override + public void open() throws Exception { + super.open(); + this.collector = new StreamRecordCollector<>(output); + sampler = new Sampler<>(numSample, System.nanoTime()); + } + + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + sampler.collect(streamRecord.getValue()); + } + + @Override + public void endInput() throws Exception { + Iterator> sampled = sampler.sample(); + while (sampled.hasNext()) { + collector.collect(sampled.next()); + } + } + } + + /** + * Global sample for range partition. Inputs weight with record. Outputs list of sampled record. + * + *

See {@link Sampler}. + */ + @Internal + public static class GlobalSampleOperator extends TableStreamOperator> + implements OneInputStreamOperator, List>, BoundedOneInput { + + private static final long serialVersionUID = 1L; + + private final int numSample; + private final int rangesNum; + private final Comparator keyComparator; + + private transient Collector> collector; + private transient Sampler sampler; + + public GlobalSampleOperator(int numSample, Comparator comparator, int rangesNum) { + this.numSample = numSample; + this.keyComparator = comparator; + this.rangesNum = rangesNum; + } + + @Override + public void open() throws Exception { + super.open(); + //noinspection unchecked + this.sampler = new Sampler<>(numSample, 0L); + this.collector = new StreamRecordCollector<>(output); + } + + @Override + public void processElement(StreamRecord> record) throws Exception { + Tuple2 tuple = record.getValue(); + sampler.collect(tuple.f0, tuple.f1); + } + + @Override + public void endInput() throws Exception { + Iterator> sampled = sampler.sample(); + + List sampledData = new ArrayList<>(); + while (sampled.hasNext()) { + sampledData.add(sampled.next().f1); + } + + sampledData.sort(keyComparator); + + int boundarySize = rangesNum - 1; + T[] boundaries = (T[]) new Object[boundarySize]; + if (sampledData.size() > 0) { + double avgRange = sampledData.size() / (double) rangesNum; + for (int i = 1; i < rangesNum; i++) { + T record = sampledData.get((int) (i * avgRange)); + boundaries[i - 1] = record; + } + } + + collector.collect(Arrays.asList(boundaries)); + } + } + + /** + * This two-input-operator require a input with RangeBoundaries as broadcast input, and generate + * Tuple2 which includes range index and record from the other input itself as output. + */ + @Internal + public static class AssignRangeIndexOperator + extends TableStreamOperator>> + implements TwoInputStreamOperator< + List, Pair, Tuple2>>, + InputSelectable { + + private static final long serialVersionUID = 1L; + + private transient List boundaries; + private transient Collector>> collector; + + private final Comparator keyComparator; + + public AssignRangeIndexOperator(Comparator comparator) { + this.keyComparator = comparator; + } + + @Override + public void open() throws Exception { + super.open(); + this.collector = new StreamRecordCollector<>(output); + } + + @Override + public void processElement1(StreamRecord> streamRecord) throws Exception { + this.boundaries = streamRecord.getValue(); + } + + @Override + public void processElement2(StreamRecord> streamRecord) throws Exception { + if (boundaries == null) { + throw new RuntimeException("There should be one data from the first input."); + } + Pair row = streamRecord.getValue(); + collector.collect(new Tuple2<>(binarySearch(row.getLeft()), row)); + } + + @Override + public InputSelection nextSelection() { + return boundaries == null ? InputSelection.FIRST : InputSelection.ALL; + } + + private int binarySearch(T key) { + int low = 0; + int high = this.boundaries.size() - 1; + + while (low <= high) { + final int mid = (low + high) >>> 1; + final int result = keyComparator.compare(key, this.boundaries.get(mid)); + + if (result > 0) { + low = mid + 1; + } else if (result < 0) { + high = mid - 1; + } else { + return mid; + } + } + // key not found, but the low index is the target + // bucket, since the boundaries are the upper bound + return low; + } + + /** A {@link KeySelector} to select by f0 of tuple2. */ + public static class Tuple2KeySelector + implements KeySelector>, Integer> { + + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Tuple2> tuple2) throws Exception { + return tuple2.f0; + } + } + + /** A {@link Partitioner} to partition by id with range. */ + public static class RangePartitioner implements Partitioner { + + private static final long serialVersionUID = 1L; + + private final int totalRangeNum; + + public RangePartitioner(int totalRangeNum) { + this.totalRangeNum = totalRangeNum; + } + + @Override + public int partition(Integer key, int numPartitions) { + Preconditions.checkArgument( + numPartitions < totalRangeNum, + "Num of subPartitions should < totalRangeNum: " + totalRangeNum); + int partition = key / (totalRangeNum / numPartitions); + return Math.min(numPartitions - 1, partition); + } + } + } + + /** Remove the range index and return the actual record. */ + @Internal + public static class RemoveRangeIndexOperator extends TableStreamOperator> + implements OneInputStreamOperator>, Pair> { + + private static final long serialVersionUID = 1L; + + private transient Collector> collector; + + @Override + public void open() throws Exception { + super.open(); + this.collector = new StreamRecordCollector<>(output); + } + + @Override + public void processElement(StreamRecord>> streamRecord) + throws Exception { + collector.collect(streamRecord.getValue().f1); + } + } + + /** + * A simple in memory implementation Sampling, and with only one pass through the input + * iteration whose size is unpredictable. The basic idea behind this sampler implementation is + * to generate a random number for each input element as its weight, select the top K elements + * with max weight. As the weights are generated randomly, so are the selected top K elements. + * In the first phase, we generate random numbers as the weights for each element and select top + * K elements as the output of each partitions. In the second phase, we select top K elements + * from all the outputs of the first phase. + * + *

This implementation refers to the algorithm described in "Optimal Random Sampling from + * Distributed Streams Revisited". + */ + @Internal + public static class Sampler { + + private final int numSamples; + private final Random random; + private final PriorityQueue> queue; + + private int index = 0; + private Tuple2 smallest = null; + + /** + * Create a new sampler with reservoir size and a supplied random number generator. + * + * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative. + */ + Sampler(int numSamples, long seed) { + Preconditions.checkArgument(numSamples >= 0, "numSamples should be non-negative."); + this.numSamples = numSamples; + this.random = new XORShiftRandom(seed); + this.queue = new PriorityQueue<>(numSamples, Comparator.comparingDouble(o -> o.f0)); + } + + void collect(T rowData) { + collect(random.nextDouble(), rowData); + } + + void collect(double weight, T key) { + if (index < numSamples) { + // Fill the queue with first K elements from input. + addQueue(weight, key); + } else { + // Remove the element with the smallest weight, + // and append current element into the queue. + if (weight > smallest.f0) { + queue.remove(); + addQueue(weight, key); + } + } + index++; + } + + private void addQueue(double weight, T row) { + queue.add(new Tuple2<>(weight, row)); + smallest = queue.peek(); + } + + Iterator> sample() { + return queue.iterator(); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java new file mode 100644 index 000000000000..d51cf1833cfe --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sorter; + +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.NormalizedKeyComputer; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.sort.BinaryExternalSortBuffer; +import org.apache.paimon.sort.BinaryInMemorySortBuffer; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.MutableObjectIterator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.runtime.operators.TableStreamOperator; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.disk.IOManagerImpl.splitPaths; + +/** SortOperator to sort the `InternalRow`s by the `KeyType`. */ +public class SortOperator extends TableStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private final RowType keyRowType; + private final RowType valueRowType; + private final long maxMemory; + private final int pageSize; + private final int arity; + private transient BinaryExternalSortBuffer buffer; + + public SortOperator(RowType keyType, RowType valueRowType, long maxMemory, int pageSize) { + this.keyRowType = keyType; + this.valueRowType = valueRowType; + this.maxMemory = maxMemory; + this.pageSize = pageSize; + this.arity = keyType.getFieldCount() + valueRowType.getFieldCount(); + } + + @Override + public void open() throws Exception { + super.open(); + + List keyFields = keyRowType.getFields(); + List dataFields = valueRowType.getFields(); + + List fields = new ArrayList<>(); + fields.addAll(keyFields); + fields.addAll(dataFields); + + RowType rowType = new RowType(fields); + + InternalRowSerializer serializer = InternalSerializers.create(rowType); + NormalizedKeyComputer normalizedKeyComputer = + CodeGenUtils.newNormalizedKeyComputer( + rowType.getFieldTypes(), "MemTableKeyComputer"); + RecordComparator keyComparator = + CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), "MemTableComparator"); + + MemorySegmentPool memoryPool = new HeapMemorySegmentPool(maxMemory, pageSize); + + BinaryInMemorySortBuffer inMemorySortBuffer = + BinaryInMemorySortBuffer.createBuffer( + normalizedKeyComputer, serializer, keyComparator, memoryPool); + + Configuration jobConfig = getContainingTask().getJobConfiguration(); + + buffer = + new BinaryExternalSortBuffer( + new BinaryRowSerializer(serializer.getArity()), + keyComparator, + memoryPool.pageSize(), + inMemorySortBuffer, + new IOManagerImpl(splitPaths(jobConfig.get(CoreOptions.TMP_DIRS))), + jobConfig.getInteger( + ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)); + } + + @Override + public void endInput() throws Exception { + if (buffer.size() > 0) { + MutableObjectIterator iterator = buffer.sortedIterator(); + BinaryRow binaryRow = new BinaryRow(arity); + while ((binaryRow = iterator.next(binaryRow)) != null) { + output.collect(new StreamRecord<>(binaryRow)); + } + } + } + + @Override + public void processElement(StreamRecord element) throws Exception { + buffer.write(element.getValue()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java new file mode 100644 index 000000000000..1cf3572bca6d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sorter; + +import org.apache.paimon.flink.action.SortCompactAction; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; + +import java.util.List; + +/** An abstract TableSorter for {@link SortCompactAction}. */ +public abstract class TableSorter { + + protected final StreamExecutionEnvironment batchTEnv; + protected final DataStream origin; + protected final FileStoreTable table; + protected final List orderColNames; + + public TableSorter( + StreamExecutionEnvironment batchTEnv, + DataStream origin, + FileStoreTable table, + List orderColNames) { + this.batchTEnv = batchTEnv; + this.origin = origin; + this.table = table; + this.orderColNames = orderColNames; + checkColNames(); + } + + private void checkColNames() { + if (orderColNames.size() < 1) { + throw new IllegalArgumentException("order column names should not be empty."); + } + List columnNames = table.rowType().getFieldNames(); + for (String zColumn : orderColNames) { + if (!columnNames.contains(zColumn)) { + throw new RuntimeException( + "Can't find column " + + zColumn + + " in table columns. Possible columns are [" + + columnNames.stream().reduce((a, b) -> a + "," + b).get() + + "]"); + } + } + } + + public abstract DataStream sort(); + + public static TableSorter getSorter( + StreamExecutionEnvironment batchTEnv, + DataStream origin, + FileStoreTable fileStoreTable, + String sortStrategy, + List orderColumns) { + switch (OrderType.of(sortStrategy)) { + case ORDER: + // todo support alphabetical order + throw new IllegalArgumentException("Not supported yet."); + case ZORDER: + return new ZorderSorter(batchTEnv, origin, fileStoreTable, orderColumns); + case HILBERT: + // todo support hilbert curve + throw new IllegalArgumentException("Not supported yet."); + default: + throw new IllegalArgumentException("cannot match order type: " + sortStrategy); + } + } + + enum OrderType { + ORDER("order"), + ZORDER("zorder"), + HILBERT("hilbert"); + + private final String orderType; + + OrderType(String orderType) { + this.orderType = orderType; + } + + @Override + public String toString() { + return "order type: " + orderType; + } + + public static OrderType of(String orderType) { + if (ORDER.orderType.equals(orderType)) { + return ORDER; + } else if (ZORDER.orderType.equals(orderType)) { + return ZORDER; + } else if (HILBERT.orderType.equals(orderType)) { + return HILBERT; + } + + throw new IllegalArgumentException("cannot match type: " + orderType + " for ordering"); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java new file mode 100644 index 000000000000..465ab89d0c3f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sorter; + +import org.apache.paimon.flink.action.SortCompactAction; +import org.apache.paimon.sort.zorder.ZIndexer; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; + +import java.util.List; + +/** + * This is a table sorter which will sort the records by the z-order of specified columns. It works + * on stream api. It computes the z-order-index by {@link ZIndexer}. After add the column of + * z-order, it does the range shuffle and sort. Finally, {@link SortCompactAction} will remove the + * "z-order" column and insert sorted record to overwrite the origin table. + */ +public class ZorderSorter extends TableSorter { + + public ZorderSorter( + StreamExecutionEnvironment batchTEnv, + DataStream origin, + FileStoreTable table, + List zOrderColNames) { + super(batchTEnv, origin, table, zOrderColNames); + } + + @Override + public DataStream sort() { + return sortStreamByZOrder(origin, table); + } + + /** + * Sort the input stream by the given order columns with z-order. + * + * @param inputStream the stream waited to be sorted + * @return the sorted data stream + */ + private DataStream sortStreamByZOrder( + DataStream inputStream, FileStoreTable table) { + ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames); + return ZorderSorterUtils.sortStreamByZorder(inputStream, zIndexer, table); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java new file mode 100644 index 000000000000..7e6c78192f1d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sorter; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.flink.FlinkRowData; +import org.apache.paimon.flink.FlinkRowWrapper; +import org.apache.paimon.flink.shuffle.RangeShuffle; +import org.apache.paimon.sort.zorder.ZIndexer; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.KeyProjectedRow; +import org.apache.paimon.utils.Pair; + +import org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.data.RowData; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; + +/** + * This is a table sorter which will sort the records by the z-order of the z-indexer generates. It + * is a global sort method, we will shuffle the input stream through z-order. After sorted, we + * convert datastream from paimon RowData back to Flink RowData + * + *

+ *                         toPaimonDataStream                        add z-order column                             range shuffle by z-order                                 local sort                                              remove z-index
+ * DataStream[RowData] -------------------> DataStream[PaimonRowData] -------------------> DataStream[PaimonRowData] -------------------------> DataStream[PaimonRowData] -----------------------> DataStream[PaimonRowData sorted] ----------------------------> DataStream[RowData sorted]
+ *                                                                                                                                                                                                                                back to flink RowData
+ * 
+ */ +public class ZorderSorterUtils { + + private static final RowType KEY_TYPE = + new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", DataTypes.BYTES()))); + + /** + * Sort the input stream by z-order. + * + * @param inputStream the stream wait to be ordered + * @param zIndexer generate z-index by the given row + * @param table the FileStoreTable + * @return + */ + public static DataStream sortStreamByZorder( + DataStream inputStream, ZIndexer zIndexer, FileStoreTable table) { + + final RowType valueRowType = table.rowType(); + final int fieldCount = valueRowType.getFieldCount(); + final int parallelism = inputStream.getParallelism(); + final int sampleSize = parallelism * 1000; + final int rangeNum = parallelism * 10; + final long maxSortMemory = table.coreOptions().writeBufferSize(); + final int pageSize = table.coreOptions().pageSize(); + + // generate the z-index as the key of Pair. + DataStream> inputWithKey = + inputStream + .map( + new RichMapFunction>() { + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + zIndexer.open(); + } + + @Override + public Pair map(RowData value) { + byte[] zorder = zIndexer.index(new FlinkRowWrapper(value)); + return Pair.of(Arrays.copyOf(zorder, zorder.length), value); + } + }) + .setParallelism(parallelism); + + // range shuffle by z-index key + return RangeShuffle.rangeShuffleByKey( + inputWithKey, + (Comparator & Serializable) + (b1, b2) -> { + assert b1.length == b2.length; + for (int i = 0; i < b1.length; i++) { + int ret = UnsignedBytes.compare(b1[i], b2[i]); + if (ret != 0) { + return ret; + } + } + return 0; + }, + byte[].class, + sampleSize, + rangeNum) + .map( + a -> + new JoinedRow( + GenericRow.of(a.getLeft()), + new FlinkRowWrapper(a.getRight())), + TypeInformation.of(InternalRow.class)) + .setParallelism(parallelism) + // sort the output locally by `SortOperator` + .transform( + "LOCAL SORT", + TypeInformation.of(InternalRow.class), + new SortOperator(KEY_TYPE, valueRowType, maxSortMemory, pageSize)) + .setParallelism(parallelism) + // remove the z-index column from every row + .map( + new RichMapFunction() { + + private transient KeyProjectedRow keyProjectedRow; + + @Override + public void open(Configuration parameters) throws Exception { + int[] map = new int[fieldCount]; + for (int i = 0; i < map.length; i++) { + map[i] = i + 1; + } + keyProjectedRow = new KeyProjectedRow(map); + } + + @Override + public InternalRow map(InternalRow value) throws Exception { + return keyProjectedRow.replaceRow(value); + } + }) + .setParallelism(parallelism) + .map(FlinkRowData::new, inputStream.getType()) + .setParallelism(parallelism); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java index 63e24026ff35..2fb4ee2e4145 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java @@ -35,7 +35,7 @@ import java.util.Queue; /** - * Pre-calculate which splits each task should process according to the weight, and then distribute + * Pre-calculate which splits each task should zvalue according to the weight, and then distribute * the splits fairly. */ public class PreAssignSplitAssigner implements SplitAssigner { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java new file mode 100644 index 000000000000..256a5460fd15 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.AppendOnlyFileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataTypes; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +/** Order Rewrite Action tests for {@link SortCompactAction}. */ +public class OrderRewriteActionITCase extends ActionITCaseBase { + + private static final Random random = new Random(); + + private Catalog catalog; + @TempDir private java.nio.file.Path path; + + private void prepareData(int size, int loop) throws Exception { + createTable(); + List commitMessages = new ArrayList<>(); + for (int i = 0; i < loop; i++) { + commitMessages.addAll(writeData(size)); + } + commit(commitMessages); + } + + @Test + public void testAllBasicTypeWorksWithZorder() throws Exception { + new CompactActionFactory().printHelp(); + prepareData(300, 1); + // All the basic types should support zorder + Assertions.assertThatCode( + () -> + zorder( + Arrays.asList( + "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", + "f8", "f9", "f10", "f11", "f12", "f13", "f14", + "f15"))) + .doesNotThrowAnyException(); + } + + @Test + public void testZorderActionWorks() throws Exception { + prepareData(300, 30); + PredicateBuilder predicateBuilder = new PredicateBuilder(getTable().rowType()); + Predicate predicate = predicateBuilder.between(1, 100, 200); + + List files = + ((AppendOnlyFileStoreTable) getTable()).store().newScan().plan().files(); + List filesFilter = + ((AppendOnlyFileStoreTable) getTable()) + .store() + .newScan() + .withFilter(predicate) + .plan() + .files(); + // before zorder, we don't filter any file + Assertions.assertThat(files.size()).isEqualTo(filesFilter.size()); + + zorder(Arrays.asList("f2", "f1")); + + files = ((AppendOnlyFileStoreTable) getTable()).store().newScan().plan().files(); + filesFilter = + ((AppendOnlyFileStoreTable) getTable()) + .store() + .newScan() + .withFilter(predicate) + .plan() + .files(); + Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size()); + System.out.println("before: " + files.size() + " after: " + filesFilter.size()); + } + + private void zorder(List columns) throws Exception { + SortCompactAction sortCompactAction = + new SortCompactAction( + new Path(path.toUri()).toUri().toString(), + "my_db", + "Orders1", + Collections.emptyMap()); + sortCompactAction.withOrderStrategy("zorder"); + sortCompactAction.withOrderColumns(columns); + sortCompactAction.run(); + } + + public Catalog getCatalog() { + if (catalog == null) { + Options options = new Options(); + options.set(CatalogOptions.WAREHOUSE, new Path(path.toUri()).toUri().toString()); + catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + } + return catalog; + } + + public void createTable() throws Exception { + getCatalog().createDatabase("my_db", true); + getCatalog().createTable(identifier(), schema(), true); + } + + public Identifier identifier() { + return Identifier.create("my_db", "Orders1"); + } + + private void commit(List messages) throws Exception { + getTable().newBatchWriteBuilder().newCommit().commit(messages); + } + + // schema with all the basic types. + private static Schema schema() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.TINYINT()); + schemaBuilder.column("f1", DataTypes.INT()); + schemaBuilder.column("f2", DataTypes.SMALLINT()); + schemaBuilder.column("f3", DataTypes.STRING()); + schemaBuilder.column("f4", DataTypes.DOUBLE()); + schemaBuilder.column("f5", DataTypes.CHAR(10)); + schemaBuilder.column("f6", DataTypes.VARCHAR(10)); + schemaBuilder.column("f7", DataTypes.BOOLEAN()); + schemaBuilder.column("f8", DataTypes.DATE()); + schemaBuilder.column("f9", DataTypes.TIME()); + schemaBuilder.column("f10", DataTypes.TIMESTAMP()); + schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2)); + schemaBuilder.column("f12", DataTypes.BYTES()); + schemaBuilder.column("f13", DataTypes.FLOAT()); + schemaBuilder.column("f14", DataTypes.BINARY(10)); + schemaBuilder.column("f15", DataTypes.VARBINARY(10)); + schemaBuilder.option("bucket", "-1"); + schemaBuilder.option("scan.parallelism", "6"); + schemaBuilder.option("sink.parallelism", "3"); + schemaBuilder.option("target-file-size", "1 M"); + schemaBuilder.partitionKeys("f0"); + return schemaBuilder.build(); + } + + private List writeData(int size) throws Exception { + List messages = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + messages.addAll(writeOnce(getTable(), i, size)); + } + + return messages; + } + + public Table getTable() throws Exception { + return getCatalog().getTable(identifier()); + } + + private static List writeOnce(Table table, int p, int size) throws Exception { + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite batchTableWrite = builder.newWrite()) { + for (int i = 0; i < size; i++) { + for (int j = 0; j < size; j++) { + batchTableWrite.write(data(p, i, j)); + } + } + return batchTableWrite.prepareCommit(); + } + } + + private static InternalRow data(int p, int i, int j) { + return GenericRow.of( + (byte) p, + j, + (short) i, + BinaryString.fromString(String.valueOf(j)), + 0.1 + i, + BinaryString.fromString(String.valueOf(j)), + BinaryString.fromString(String.valueOf(i)), + j % 2 == 1, + i, + j, + Timestamp.fromEpochMillis(i), + Decimal.zero(10, 2), + String.valueOf(i).getBytes(), + (float) 0.1 + j, + randomBytes(), + randomBytes()); + } + + private static byte[] randomBytes() { + byte[] binary = new byte[random.nextInt(10)]; + random.nextBytes(binary); + return binary; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java similarity index 99% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java index 4db0d41111e4..08e182464ced 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java @@ -43,7 +43,7 @@ import static org.assertj.core.api.Fail.fail; /** Tests for {@link KafkaLogStoreRegister}. */ -public class KafkaLogStoreRegisterTest extends KafkaTableTestBase { +public class KafkaLogStoreRegisterITCase extends KafkaTableTestBase { private static final String DATABASE = "mock_db"; private static final String TABLE = "mock_table";