diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md index 822cbd7d7c25..bbad1c66f504 100644 --- a/docs/content/engines/flink.md +++ b/docs/content/engines/flink.md @@ -357,7 +357,7 @@ table options syntax: we use string to represent table options. The format is 'k TO compact a table. Arguments:
  • identifier: the target table identifier. Cannot be empty.
  • partitions: partition filter.
  • -
  • order_strategy: 'order' or 'zorder' or 'none'. Left empty for 'none'.
  • +
  • order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. Left empty for 'none'.
  • order_columns: the columns need to be sort. Left empty if 'order_strategy' is 'none'.
  • table_options: additional dynamic options of the table.
  • diff --git a/docs/layouts/shortcodes/generated/sort-compact.html b/docs/layouts/shortcodes/generated/sort-compact.html index 5a7a36721fc4..c876ba730f89 100644 --- a/docs/layouts/shortcodes/generated/sort-compact.html +++ b/docs/layouts/shortcodes/generated/sort-compact.html @@ -45,7 +45,7 @@
    --order_strategy
    - the order strategy now only support "zorder" and "order". For example: --order_strategy zorder + the order strategy now support "zorder" and "hilbert" and "order". For example: --order_strategy zorder
    --order_by
    diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index 2652286b038d..965f2de672ea 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -190,6 +190,12 @@ under the License. 3.6.1 + + com.github.davidmoten + hilbert-curve + 0.2.2 + + org.xerial sqlite-jdbc diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java b/paimon-core/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java new file mode 100644 index 000000000000..e0e55795fda3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.sort.hilbert; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.ConvertBinaryUtil; + +import org.davidmoten.hilbert.HilbertCurve; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Hilbert indexer for responsibility to generate hilbert-index. */ +public class HilbertIndexer implements Serializable { + + private static final long PRIMITIVE_EMPTY = Long.MAX_VALUE; + private static final int BITS_NUM = 63; + + private final Set functionSet; + private final int[] fieldsIndex; + + public HilbertIndexer(RowType rowType, List orderColumns) { + checkArgument(orderColumns.size() > 1, "Hilbert sort needs at least two columns."); + List fields = rowType.getFieldNames(); + fieldsIndex = new int[orderColumns.size()]; + for (int i = 0; i < fieldsIndex.length; i++) { + int index = fields.indexOf(orderColumns.get(i)); + if (index == -1) { + throw new IllegalArgumentException( + "Can't find column: " + + orderColumns.get(i) + + " in row type fields: " + + fields); + } + fieldsIndex[i] = index; + } + this.functionSet = constructFunctionMap(rowType.getFields()); + } + + public void open() { + functionSet.forEach(RowProcessor::open); + } + + public byte[] index(InternalRow row) { + Long[] columnLongs = new Long[fieldsIndex.length]; + + int index = 0; + for (RowProcessor f : functionSet) { + columnLongs[index++] = f.hilbertValue(row); + } + return hilbertCurvePosBytes(columnLongs); + } + + public Set constructFunctionMap(List fields) { + Set hilbertFunctionSet = new LinkedHashSet<>(); + + // Construct hilbertFunctionSet and fill dataTypes, rowFields + for (int index : fieldsIndex) { + DataField field = fields.get(index); + hilbertFunctionSet.add(hmapColumnToCalculator(field, index)); + } + return hilbertFunctionSet; + } + + public static RowProcessor hmapColumnToCalculator(DataField field, int index) { + DataType type = field.type(); + return new RowProcessor(type.accept(new TypeVisitor(index))); + } + + /** Type Visitor to generate function map from row column to hilbert-index. */ + public static class TypeVisitor implements DataTypeVisitor, Serializable { + + private final int fieldIndex; + + public TypeVisitor(int index) { + this.fieldIndex = index; + } + + @Override + public HProcessFunction visit(CharType charType) { + return (row) -> { + if (row.isNullAt(fieldIndex)) { + return PRIMITIVE_EMPTY; + } else { + BinaryString binaryString = row.getString(fieldIndex); + + return ConvertBinaryUtil.convertBytesToLong(binaryString.toBytes()); + } + }; + } + + @Override + public HProcessFunction visit(VarCharType varCharType) { + return (row) -> { + if (row.isNullAt(fieldIndex)) { + return PRIMITIVE_EMPTY; + } else { + BinaryString binaryString = row.getString(fieldIndex); + + return ConvertBinaryUtil.convertBytesToLong(binaryString.toBytes()); + } + }; + } + + @Override + public HProcessFunction visit(BooleanType booleanType) { + return (row) -> { + if (row.isNullAt(fieldIndex)) { + return PRIMITIVE_EMPTY; + } + return row.getBoolean(fieldIndex) ? PRIMITIVE_EMPTY : 0; + }; + } + + @Override + public HProcessFunction visit(BinaryType binaryType) { + return (row) -> + row.isNullAt(fieldIndex) + ? PRIMITIVE_EMPTY + : ConvertBinaryUtil.convertBytesToLong(row.getBinary(fieldIndex)); + } + + @Override + public HProcessFunction visit(VarBinaryType varBinaryType) { + return (row) -> + row.isNullAt(fieldIndex) + ? PRIMITIVE_EMPTY + : ConvertBinaryUtil.convertBytesToLong(row.getBinary(fieldIndex)); + } + + @Override + public HProcessFunction visit(DecimalType decimalType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(decimalType, fieldIndex); + return (row) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null ? PRIMITIVE_EMPTY : ((Decimal) o).toBigDecimal().longValue(); + }; + } + + @Override + public HProcessFunction visit(TinyIntType tinyIntType) { + return (row) -> + row.isNullAt(fieldIndex) + ? PRIMITIVE_EMPTY + : ConvertBinaryUtil.convertBytesToLong( + new byte[] {row.getByte(fieldIndex)}); + } + + @Override + public HProcessFunction visit(SmallIntType smallIntType) { + return (row) -> + row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : (long) row.getShort(fieldIndex); + } + + @Override + public HProcessFunction visit(IntType intType) { + return (row) -> + row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : (long) row.getInt(fieldIndex); + } + + @Override + public HProcessFunction visit(BigIntType bigIntType) { + return (row) -> row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : row.getLong(fieldIndex); + } + + @Override + public HProcessFunction visit(FloatType floatType) { + return (row) -> + row.isNullAt(fieldIndex) + ? PRIMITIVE_EMPTY + : Double.doubleToLongBits(row.getFloat(fieldIndex)); + } + + @Override + public HProcessFunction visit(DoubleType doubleType) { + return (row) -> + row.isNullAt(fieldIndex) + ? PRIMITIVE_EMPTY + : Double.doubleToLongBits(row.getDouble(fieldIndex)); + } + + @Override + public HProcessFunction visit(DateType dateType) { + return (row) -> row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : row.getLong(fieldIndex); + } + + @Override + public HProcessFunction visit(TimeType timeType) { + return (row) -> row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : row.getLong(fieldIndex); + } + + @Override + public HProcessFunction visit(TimestampType timestampType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(timestampType, fieldIndex); + return (row) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null ? PRIMITIVE_EMPTY : ((Timestamp) o).getMillisecond(); + }; + } + + @Override + public HProcessFunction visit(LocalZonedTimestampType localZonedTimestampType) { + final InternalRow.FieldGetter fieldGetter = + InternalRow.createFieldGetter(localZonedTimestampType, fieldIndex); + return (row) -> { + Object o = fieldGetter.getFieldOrNull(row); + return o == null ? PRIMITIVE_EMPTY : ((Timestamp) o).getMillisecond(); + }; + } + + @Override + public HProcessFunction visit(ArrayType arrayType) { + throw new RuntimeException("Unsupported type"); + } + + @Override + public HProcessFunction visit(MultisetType multisetType) { + throw new RuntimeException("Unsupported type"); + } + + @Override + public HProcessFunction visit(MapType mapType) { + throw new RuntimeException("Unsupported type"); + } + + @Override + public HProcessFunction visit(RowType rowType) { + throw new RuntimeException("Unsupported type"); + } + } + + /** Be used as converting row field record to devoted bytes. */ + public static class RowProcessor implements Serializable { + private final HProcessFunction process; + + public RowProcessor(HProcessFunction process) { + this.process = process; + } + + public void open() {} + + public Long hilbertValue(InternalRow o) { + return process.apply(o); + } + } + + private byte[] hilbertCurvePosBytes(Long[] points) { + long[] data = Arrays.stream(points).mapToLong(Long::longValue).toArray(); + HilbertCurve hilbertCurve = HilbertCurve.bits(BITS_NUM).dimensions(points.length); + BigInteger index = hilbertCurve.index(data); + return ConvertBinaryUtil.paddingToNByte(index.toByteArray(), BITS_NUM); + } + + /** Process function interface. */ + public interface HProcessFunction extends Function, Serializable {} +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/ConvertBinaryUtil.java similarity index 98% rename from paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java rename to paimon-core/src/main/java/org/apache/paimon/utils/ConvertBinaryUtil.java index d2acbf42294d..000815c29420 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ConvertBinaryUtil.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.spark.utils; +package org.apache.paimon.utils; import java.nio.charset.StandardCharsets; diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ConvertBinaryUtilTest.java similarity index 88% rename from paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java rename to paimon-core/src/test/java/org/apache/paimon/utils/ConvertBinaryUtilTest.java index c97a2fbfb889..0a5efcfec6ae 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ConvertBinaryUtilTest.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.spark; - -import org.apache.paimon.spark.utils.ConvertBinaryUtil; +package org.apache.paimon.utils; import org.junit.Assert; import org.junit.jupiter.api.Test; @@ -26,8 +24,8 @@ import java.nio.charset.StandardCharsets; import java.util.Random; -import static org.apache.paimon.spark.utils.ConvertBinaryUtil.convertBytesToLong; -import static org.apache.paimon.spark.utils.ConvertBinaryUtil.convertStringToLong; +import static org.apache.paimon.utils.ConvertBinaryUtil.convertBytesToLong; +import static org.apache.paimon.utils.ConvertBinaryUtil.convertStringToLong; /** Test for {@link ConvertBinaryUtil}. */ public class ConvertBinaryUtilTest { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java new file mode 100644 index 000000000000..944e0fc430b0 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sorter; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkRowWrapper; +import org.apache.paimon.flink.action.SortCompactAction; +import org.apache.paimon.sort.hilbert.HilbertIndexer; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * This is a table sorter which will sort the records by the hilbert curve of specified columns. It + * works on stream api. It computes the hilbert index by {@link HilbertIndexer}. After add the + * column of hilbert, it does the range shuffle and sort. Finally, {@link SortCompactAction} will + * remove the "hilbert" column and insert sorted record to overwrite the origin table. + */ +public class HilbertSorter extends TableSorter { + + private static final RowType KEY_TYPE = + new RowType(Collections.singletonList(new DataField(0, "H_INDEX", DataTypes.BYTES()))); + + public HilbertSorter( + StreamExecutionEnvironment batchTEnv, + DataStream origin, + FileStoreTable table, + List colNames) { + super(batchTEnv, origin, table, colNames); + } + + @Override + public DataStream sort() { + return sortStreamByHilbert(origin, table); + } + + /** + * Sort the input stream by the given order columns with hilbert curve. + * + * @param inputStream the stream waited to be sorted + * @return the sorted data stream + */ + private DataStream sortStreamByHilbert( + DataStream inputStream, FileStoreTable table) { + final HilbertIndexer hilbertIndexer = new HilbertIndexer(table.rowType(), orderColNames); + return SortUtils.sortStreamByKey( + inputStream, + table, + KEY_TYPE, + TypeInformation.of(byte[].class), + () -> + (b1, b2) -> { + assert b1.length == b2.length; + for (int i = 0; i < b1.length; i++) { + int ret = UnsignedBytes.compare(b1[i], b2[i]); + if (ret != 0) { + return ret; + } + } + return 0; + }, + new SortUtils.KeyAbstract() { + @Override + public void open() { + hilbertIndexer.open(); + } + + @Override + public byte[] apply(RowData value) { + byte[] hilbert = hilbertIndexer.index(new FlinkRowWrapper(value)); + return Arrays.copyOf(hilbert, hilbert.length); + } + }, + GenericRow::of); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java index e5f1db0c228b..e4f85ea9d706 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java @@ -78,8 +78,7 @@ public static TableSorter getSorter( case ZORDER: return new ZorderSorter(batchTEnv, origin, fileStoreTable, orderColumns); case HILBERT: - // todo support hilbert curve - throw new IllegalArgumentException("Not supported yet."); + return new HilbertSorter(batchTEnv, origin, fileStoreTable, orderColumns); default: throw new IllegalArgumentException("cannot match order type: " + sortStrategy); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java index ff3616e65bbb..a5195c2f3bc9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java @@ -107,6 +107,36 @@ public void testDynamicBucketSortWithOrderAndZorder() throws Exception { .isLessThan(filesFilter.size() / (double) files.size()); } + @Test + public void testDynamicBucketSortWithOrderAndHilbert() throws Exception { + createTable(); + + commit(writeData(100)); + PredicateBuilder predicateBuilder = new PredicateBuilder(getTable().rowType()); + Predicate predicate = predicateBuilder.between(1, 100L, 200L); + + // order f2,f1 will make predicate of f1 perform worse. + order(Arrays.asList("f2", "f1")); + List files = getTable().store().newScan().plan().files(); + List filesFilter = + ((KeyValueFileStoreScan) getTable().store().newScan()) + .withValueFilter(predicate) + .plan() + .files(); + + hilbert(Arrays.asList("f2", "f1")); + + List filesHilbert = getTable().store().newScan().plan().files(); + List filesFilterHilbert = + ((KeyValueFileStoreScan) getTable().store().newScan()) + .withValueFilter(predicate) + .plan() + .files(); + + Assertions.assertThat(filesFilterHilbert.size() / (double) filesHilbert.size()) + .isLessThan(filesFilter.size() / (double) files.size()); + } + @Test public void testDynamicBucketSortWithStringType() throws Exception { createTable(); @@ -146,6 +176,14 @@ private void zorder(List columns) throws Exception { } } + private void hilbert(List columns) throws Exception { + if (RANDOM.nextBoolean()) { + createAction("hilbert", columns).run(); + } else { + callProcedure("hilbert", columns); + } + } + private void order(List columns) throws Exception { if (RANDOM.nextBoolean()) { createAction("order", columns).run(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java index cadd70e89d35..ee43ca58e8a3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java @@ -155,6 +155,20 @@ public void testAllBasicTypeWorksWithZorder() throws Exception { .doesNotThrowAnyException(); } + @Test + public void testAllBasicTypeWorksWithHilbert() throws Exception { + prepareData(300, 1); + // All the basic types should support hilbert + Assertions.assertThatCode( + () -> + hilbert( + Arrays.asList( + "f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7", + "f8", "f9", "f10", "f11", "f12", "f13", "f14", + "f15"))) + .doesNotThrowAnyException(); + } + @Test public void testZorderActionWorks() throws Exception { prepareData(300, 2); @@ -181,6 +195,33 @@ public void testZorderActionWorks() throws Exception { Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size()); } + @Test + public void testHilbertActionWorks() throws Exception { + prepareData(300, 2); + PredicateBuilder predicateBuilder = new PredicateBuilder(getTable().rowType()); + Predicate predicate = predicateBuilder.between(1, 100, 200); + + List files = getTable().store().newScan().plan().files(); + List filesFilter = + ((AppendOnlyFileStoreScan) getTable().store().newScan()) + .withFilter(predicate) + .plan() + .files(); + + // before hilbert, we don't filter any file + Assertions.assertThat(files.size()).isEqualTo(filesFilter.size()); + + hilbert(Arrays.asList("f2", "f1")); + + files = getTable().store().newScan().plan().files(); + filesFilter = + ((AppendOnlyFileStoreScan) getTable().store().newScan()) + .withFilter(predicate) + .plan() + .files(); + Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size()); + } + @Test public void testCompareZorderAndOrder() throws Exception { prepareData(300, 10); @@ -208,6 +249,33 @@ public void testCompareZorderAndOrder() throws Exception { .isLessThan(filesFilterOrder.size() / (double) filesOrder.size()); } + @Test + public void testCompareHilbertAndOrder() throws Exception { + prepareData(300, 10); + + hilbert(Arrays.asList("f2", "f1")); + PredicateBuilder predicateBuilder = new PredicateBuilder(getTable().rowType()); + Predicate predicate = predicateBuilder.between(1, 10, 20); + + List filesHilbert = getTable().store().newScan().plan().files(); + List filesFilterHilbert = + ((AppendOnlyFileStoreScan) getTable().store().newScan()) + .withFilter(predicate) + .plan() + .files(); + + order(Arrays.asList("f2", "f1")); + List filesOrder = getTable().store().newScan().plan().files(); + List filesFilterOrder = + ((AppendOnlyFileStoreScan) getTable().store().newScan()) + .withFilter(predicate) + .plan() + .files(); + + Assertions.assertThat(filesFilterHilbert.size() / (double) filesHilbert.size()) + .isLessThan(filesFilterOrder.size() / (double) filesOrder.size()); + } + @Test public void testTableConf() throws Exception { createTable(); @@ -270,6 +338,14 @@ private void zorder(List columns) throws Exception { } } + private void hilbert(List columns) throws Exception { + if (RANDOM.nextBoolean()) { + createAction("hilbert", columns).run(); + } else { + callProcedure("hilbert", columns); + } + } + private void order(List columns) throws Exception { if (RANDOM.nextBoolean()) { createAction("order", columns).run(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java index 115974ac08c8..ed2a1bab04bb 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java @@ -18,7 +18,7 @@ package org.apache.paimon.spark.sort; -import org.apache.paimon.spark.utils.ConvertBinaryUtil; +import org.apache.paimon.utils.ConvertBinaryUtil; import org.apache.spark.sql.Column; import org.apache.spark.sql.expressions.UserDefinedFunction;