diff --git a/.github/workflows/scala-test.yml b/.github/workflows/scala-test.yml new file mode 100644 index 0000000..b301d2e --- /dev/null +++ b/.github/workflows/scala-test.yml @@ -0,0 +1,24 @@ +name: Space Filling Curve CI + +on: + push: + branches: + - main + - feature/* + - dev/* + - release/* + pull_request: + branches: + - main + +jobs: + test: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Run tests + run: sbt -mem 2048 test + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/src/main/scala/io/dustinsmith/spacefillingcurves/Morton.scala b/src/main/scala/io/dustinsmith/spacefillingcurves/Morton.scala index 08b2a51..ae0da83 100644 --- a/src/main/scala/io/dustinsmith/spacefillingcurves/Morton.scala +++ b/src/main/scala/io/dustinsmith/spacefillingcurves/Morton.scala @@ -34,6 +34,10 @@ import org.apache.spark.sql.functions._ class Morton(val df: DataFrame, val cols: Array[String]) extends LazyLogging with SparkSessionWrapper { import spark.implicits._ + if (cols.length == 1) { + throw new Exception("You need at least 2 columns to morton order your data.") + } + private val columnTypes: Seq[(String, String)] = matchColumnWithType() private val nonString = columnTypes.filter(t => t._2 != "StringType") private val stringType = columnTypes.filter(t => t._2 == "StringType") @@ -132,7 +136,10 @@ class Morton(val df: DataFrame, val cols: Array[String]) extends LazyLogging wit */ private def getNonStringBinaryDF: DataFrame = { - df - .select($"*" +: nonString.map(tup => getBinaryFunc(tup._2)(col(tup._1)).alias(tup._1 + "_binary")): _*) + if (nonString.nonEmpty) { + df + .select($"*" +: nonString.map(tup => getBinaryFunc(tup._2)(col(tup._1)).alias(tup._1 + "_binary")): _*) + } + else df } } diff --git a/src/test/resources/mixed_binary/._SUCCESS.crc b/src/test/resources/mixed_binary/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/src/test/resources/mixed_binary/._SUCCESS.crc differ diff --git a/src/test/resources/mixed_binary/.part-00000-9fd8ef85-048d-4d38-b167-7b9ee0f5d076-c000.snappy.parquet.crc b/src/test/resources/mixed_binary/.part-00000-9fd8ef85-048d-4d38-b167-7b9ee0f5d076-c000.snappy.parquet.crc new file mode 100644 index 0000000..8c0f047 Binary files /dev/null and b/src/test/resources/mixed_binary/.part-00000-9fd8ef85-048d-4d38-b167-7b9ee0f5d076-c000.snappy.parquet.crc differ diff --git a/src/test/resources/mixed_binary/_SUCCESS b/src/test/resources/mixed_binary/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/test/resources/mixed_binary/part-00000-9fd8ef85-048d-4d38-b167-7b9ee0f5d076-c000.snappy.parquet b/src/test/resources/mixed_binary/part-00000-9fd8ef85-048d-4d38-b167-7b9ee0f5d076-c000.snappy.parquet new file mode 100644 index 0000000..188529e Binary files /dev/null and b/src/test/resources/mixed_binary/part-00000-9fd8ef85-048d-4d38-b167-7b9ee0f5d076-c000.snappy.parquet differ diff --git a/src/test/resources/numeric_binary/._SUCCESS.crc b/src/test/resources/numeric_binary/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/src/test/resources/numeric_binary/._SUCCESS.crc differ diff --git a/src/test/resources/numeric_binary/.part-00000-23538a43-13e8-4916-be8b-42a275799d26-c000.snappy.parquet.crc b/src/test/resources/numeric_binary/.part-00000-23538a43-13e8-4916-be8b-42a275799d26-c000.snappy.parquet.crc new file mode 100644 index 0000000..b9ea8b9 Binary files /dev/null and b/src/test/resources/numeric_binary/.part-00000-23538a43-13e8-4916-be8b-42a275799d26-c000.snappy.parquet.crc differ diff --git a/src/test/resources/numeric_binary/_SUCCESS b/src/test/resources/numeric_binary/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/test/resources/numeric_binary/part-00000-23538a43-13e8-4916-be8b-42a275799d26-c000.snappy.parquet b/src/test/resources/numeric_binary/part-00000-23538a43-13e8-4916-be8b-42a275799d26-c000.snappy.parquet new file mode 100644 index 0000000..bd8fd28 Binary files /dev/null and b/src/test/resources/numeric_binary/part-00000-23538a43-13e8-4916-be8b-42a275799d26-c000.snappy.parquet differ diff --git a/src/test/resources/str_binary/._SUCCESS.crc b/src/test/resources/str_binary/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/src/test/resources/str_binary/._SUCCESS.crc differ diff --git a/src/test/resources/str_binary/.part-00000-9c5d0d00-b2d6-452b-92a3-4a50f747eecb-c000.snappy.parquet.crc b/src/test/resources/str_binary/.part-00000-9c5d0d00-b2d6-452b-92a3-4a50f747eecb-c000.snappy.parquet.crc new file mode 100644 index 0000000..5b4c759 Binary files /dev/null and b/src/test/resources/str_binary/.part-00000-9c5d0d00-b2d6-452b-92a3-4a50f747eecb-c000.snappy.parquet.crc differ diff --git a/src/test/resources/str_binary/_SUCCESS b/src/test/resources/str_binary/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/test/resources/str_binary/part-00000-9c5d0d00-b2d6-452b-92a3-4a50f747eecb-c000.snappy.parquet b/src/test/resources/str_binary/part-00000-9c5d0d00-b2d6-452b-92a3-4a50f747eecb-c000.snappy.parquet new file mode 100644 index 0000000..f5d6962 Binary files /dev/null and b/src/test/resources/str_binary/part-00000-9c5d0d00-b2d6-452b-92a3-4a50f747eecb-c000.snappy.parquet differ diff --git a/src/test/resources/z_index/._SUCCESS.crc b/src/test/resources/z_index/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/src/test/resources/z_index/._SUCCESS.crc differ diff --git a/src/test/resources/z_index/.part-00000-ab3411a5-be7b-4049-a358-3347a356908b-c000.snappy.parquet.crc b/src/test/resources/z_index/.part-00000-ab3411a5-be7b-4049-a358-3347a356908b-c000.snappy.parquet.crc new file mode 100644 index 0000000..2601197 Binary files /dev/null and b/src/test/resources/z_index/.part-00000-ab3411a5-be7b-4049-a358-3347a356908b-c000.snappy.parquet.crc differ diff --git a/src/test/resources/z_index/_SUCCESS b/src/test/resources/z_index/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/src/test/resources/z_index/part-00000-ab3411a5-be7b-4049-a358-3347a356908b-c000.snappy.parquet b/src/test/resources/z_index/part-00000-ab3411a5-be7b-4049-a358-3347a356908b-c000.snappy.parquet new file mode 100644 index 0000000..41c0115 Binary files /dev/null and b/src/test/resources/z_index/part-00000-ab3411a5-be7b-4049-a358-3347a356908b-c000.snappy.parquet differ diff --git a/src/test/scala/io/dustinsmith/spacefillingcurves/BinarySpec.scala b/src/test/scala/io/dustinsmith/spacefillingcurves/BinarySpec.scala index 329674c..2983b71 100644 --- a/src/test/scala/io/dustinsmith/spacefillingcurves/BinarySpec.scala +++ b/src/test/scala/io/dustinsmith/spacefillingcurves/BinarySpec.scala @@ -29,10 +29,7 @@ import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.types.{DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType} -class BinarySpec extends AnyWordSpec - with Matchers - with PrivateMethodTester - with BeforeAndAfterAll { +class BinarySpec extends AnyWordSpec with Matchers with PrivateMethodTester with BeforeAndAfterAll { val spark: SparkSession = SparkSession .builder() diff --git a/src/test/scala/io/dustinsmith/spacefillingcurves/HashDataFrame.scala b/src/test/scala/io/dustinsmith/spacefillingcurves/HashDataFrame.scala new file mode 100644 index 0000000..2c6042e --- /dev/null +++ b/src/test/scala/io/dustinsmith/spacefillingcurves/HashDataFrame.scala @@ -0,0 +1,55 @@ +/* + * Copyright 2021 DustinSmith.Io. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package io.dustinsmith.spacefillingcurves + +import scala.util.hashing.MurmurHash3 + +import org.apache.spark.sql.functions.{col, hash} + +/** + * Helper object to compare dataframes without using Holdenkarau (has some problems with Spark 3) + */ +object HashDataFrame extends SparkSessionWrapper { + import spark.implicits._ + + /** + * Computes a checksum on the entire contents of the supplied DataFrame. Checksum values can be used to confirm that + * dataframe contents are unchanged after operations that MUST NOT alter actual data + * (e.g. HDFS leaf file compaction, etc) + * + * This method builds hierarchichal hashes (from row hashes -> RDD partition hashes -> to a final DF hash) which + * makes it relatively inexpensive compared to other ways of comparing dataframes (e.g. joins, minus, etc). + * It can be used even for very large data/paths. + * Credit to https://github.com/beljun for this method + * + * @param df Dataframe to compute the checksum for. + * @param numParts Level of parallelism. Note that checksum value changes with different numParts value so it should + * remain the same across comparisons. + * @return Checksum for dataframe. + */ + def checksumDataFrame(df: org.apache.spark.sql.DataFrame, numParts: Int): Int = { + + MurmurHash3.orderedHash( + df + .select(hash(df.columns.map(col): _*).as("row_hash")) + .repartition(numParts, $"row_hash") + .sortWithinPartitions("row_hash") + .mapPartitions(p => Array(MurmurHash3.orderedHash(p)).toIterator) + .orderBy($"value") + .collect() + ) + } +} diff --git a/src/test/scala/io/dustinsmith/spacefillingcurves/MortonSpec.scala b/src/test/scala/io/dustinsmith/spacefillingcurves/MortonSpec.scala index f0d1791..999ac37 100644 --- a/src/test/scala/io/dustinsmith/spacefillingcurves/MortonSpec.scala +++ b/src/test/scala/io/dustinsmith/spacefillingcurves/MortonSpec.scala @@ -15,6 +15,174 @@ */ package io.dustinsmith.spacefillingcurves -class MortonSpec { +import java.io.File +import scala.reflect.io.Directory + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.PrivateMethodTester +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import org.apache.spark.sql.{DataFrame, SparkSession} + + +class MortonSpec extends AnyWordSpec with Matchers with PrivateMethodTester with BeforeAndAfterAll { + + val spark: SparkSession = SparkSession + .builder() + .appName("MortonIndexTesting") + .master("local[2]") + .getOrCreate() + + import spark.implicits._ + + override def afterAll(): Unit = { + new Directory(new File("spark-warehouse")).deleteRecursively + super.afterAll() + } + + val df: DataFrame = Seq( + (1, 1, 12.23, "a", "m"), + (4, 9, 5.05, "b", "m"), + (3, 0, 1.23, "c", "f"), + (2, 2, 100.4, "d", "f"), + (1, 25, 3.25, "a", "m") + ).toDF("x", "y", "amnt", "id", "sex") + val mortonNum: Morton = new Morton(df, Array("x", "y")) + val mortonStr: Morton = new Morton(df, Array("id", "sex")) + val mortonMixed: Morton = new Morton(df, Array("x", "id", "amnt")) + + "matchColumnWithType numeric columns" should { + + "return a tuple with column and data type" in { + val privateMethod: PrivateMethod[Seq[(String, String)]] = + PrivateMethod[Seq[(String, String)]]('matchColumnWithType) + val resultArray: Seq[(String, String)] = mortonNum invokePrivate privateMethod() + val expectedArray: Seq[(String, String)] = Seq(("x", "IntegerType"), ("y", "IntegerType")) + + assert(resultArray == expectedArray) + } + } + + "matchColumnWithType mixed columns" should { + + "return a tuple with column and data type" in { + val privateMethod: PrivateMethod[Seq[(String, String)]] = + PrivateMethod[Seq[(String, String)]]('matchColumnWithType) + val resultArray: Seq[(String, String)] = mortonMixed invokePrivate privateMethod() + val expectedArray: Seq[(String, String)] = Seq(("x", "IntegerType"), ("amnt", "DoubleType"), ("id", "StringType")) + + assert(resultArray == expectedArray) + } + } + + "getNonStringBinaryDF str columns" should { + + "return the original dataframe" in { + val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getNonStringBinaryDF) + val resultDF: DataFrame = mortonStr invokePrivate privateMethod() + val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1) + val expectedChecksum: Int = HashDataFrame.checksumDataFrame(df, 1) + + assert(resultChecksum == expectedChecksum) + } + } + + "getNonStringBinaryDF num columns" should { + + "return the original dataframe with the binary columns for numeric" in { + val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getNonStringBinaryDF) + val resultDF: DataFrame = mortonNum invokePrivate privateMethod() + val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1) + val expectedDF: DataFrame = spark.read + .format("parquet") + .load(getClass.getResource("/numeric_binary").getPath) + val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1) + + assert(resultChecksum == expectedChecksum) + } + } + + "getBinaryDF str columns" should { + + "return the original dataframe with the binary columns for strings" in { + val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getBinaryDF) + val resultDF: DataFrame = mortonStr invokePrivate privateMethod() + val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1) + val expectedDF: DataFrame = spark.read + .format("parquet") + .load(getClass.getResource("/str_binary").getPath) + val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1) + + assert(resultChecksum == expectedChecksum) + } + } + + "getBinaryDF num columns" should { + + "return the original dataframe with the binary columns for numeric" in { + val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getBinaryDF) + val resultDF: DataFrame = mortonNum invokePrivate privateMethod() + val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1) + val expectedDF: DataFrame = spark.read + .format("parquet") + .load(getClass.getResource("/numeric_binary").getPath) + val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1) + + assert(resultChecksum == expectedChecksum) + } + } + + "getBinaryDF mixed columns" should { + + "return the original dataframe with the binary columns for numeric and string" in { + val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getBinaryDF) + val resultDF: DataFrame = mortonMixed invokePrivate privateMethod() + val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1) + val expectedDF: DataFrame = spark.read + .format("parquet") + .load(getClass.getResource("/mixed_binary").getPath) + val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1) + + assert(resultChecksum == expectedChecksum) + } + } + + // TODO: Figure out testing private UDFS + "interleaveBits" ignore { + + "interleave the binary bit columns" in { + val numDF: DataFrame = spark.read + .format("parquet") + .load(getClass.getResource("/numeric_binary").getPath) + val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('interleaveBits) + } + } + + "Morton class" should { + + "throw an exception ig supplied column array is not greater than 1" in { + val thrown: Exception = the [Exception] thrownBy new Morton(df, Array("x")) + + thrown.getMessage should equal( + "You need at least 2 columns to morton order your data." + ) + } + } + + "mortonIndex" should { + + "return a dataframe with the column z_index" in { + val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('mortonIndex) + val resultDF: DataFrame = mortonNum invokePrivate privateMethod() + val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1) + val expectedDF: DataFrame = spark.read + .format("parquet") + .load(getClass.getResource("/z_index").getPath) + val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1) + + assert(resultChecksum == expectedChecksum) + } + } }