Skip to content

Commit

Permalink
[spark] SparkHilbertUDF hilbertCurvePosBytes classnotfound fix (#3415)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored May 29, 2024
1 parent fd6a287 commit 46cfa66
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public Long hilbertValue(InternalRow o) {
}
}

private byte[] hilbertCurvePosBytes(Long[] points) {
public static byte[] hilbertCurvePosBytes(Long[] points) {
long[] data = Arrays.stream(points).mapToLong(Long::longValue).toArray();
HilbertCurve hilbertCurve = HilbertCurve.bits(BITS_NUM).dimensions(points.length);
BigInteger index = hilbertCurve.index(data);
Expand Down
5 changes: 0 additions & 5 deletions paimon-spark/paimon-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ under the License.
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>hilbert-curve</artifactId>
<version>0.2.2</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.sort;

import org.apache.paimon.sort.hilbert.HilbertIndexer;
import org.apache.paimon.utils.ConvertBinaryUtil;

import org.apache.spark.sql.Column;
Expand All @@ -37,14 +38,11 @@
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.TimestampType;
import org.davidmoten.hilbert.HilbertCurve;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;

import scala.collection.JavaConverters;
import scala.collection.Seq;
Expand All @@ -53,20 +51,16 @@
public class SparkHilbertUDF implements Serializable {
private static final long PRIMITIVE_EMPTY = Long.MAX_VALUE;

private static final int BITS_NUM = 63;

SparkHilbertUDF() {}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}

byte[] hilbertCurvePosBytes(Seq<Long> points) {
List<Long> longs = JavaConverters.seqAsJavaList(points);
long[] data = longs.stream().mapToLong(Long::longValue).toArray();
HilbertCurve hilbertCurve = HilbertCurve.bits(BITS_NUM).dimensions(points.size());
BigInteger index = hilbertCurve.index(data);
return ConvertBinaryUtil.paddingToNByte(index.toByteArray(), BITS_NUM);
Long[] longs = JavaConverters.seqAsJavaList(points).stream().toArray(Long[]::new);
byte[] bytes = HilbertIndexer.hilbertCurvePosBytes(longs);
return bytes;
}

private UserDefinedFunction tinyToOrderedLongUDF() {
Expand Down

0 comments on commit 46cfa66

Please sign in to comment.