Skip to content

Commit

Permalink
Dev/morton ordering (#1)
Browse files Browse the repository at this point in the history
* started development on the tests for morton ordering

* added 2 test for matchColumnWithType for numerical types and mixed types

* added branch for getNonStringBinaryDF incase the user is only morton ordering string columns

* added another string column to the test data; add HashDataFrame helper object in testing for checksumming df; created a mortonStr class to test no numerical column

* created numeric test for getNonStringBinaryDF

* create test for getBinaryDF with str columns only

* created test for getBinaryDF with only numeric columns

* created test getBinaryDF with mixed data types

* added a column length check in Morton

* added a test for Morton class instaniation with one column

* added test for mortonIndex

* removed variable that shouldnt have been in the test

* created github workflow
  • Loading branch information
dwsmith1983 authored May 16, 2021
1 parent 3af3710 commit 72ccae1
Show file tree
Hide file tree
Showing 21 changed files with 258 additions and 7 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/scala-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Space Filling Curve CI

on:
push:
branches:
- main
- feature/*
- dev/*
- release/*
pull_request:
branches:
- main

jobs:
test:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Run tests
run: sbt -mem 2048 test
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
11 changes: 9 additions & 2 deletions src/main/scala/io/dustinsmith/spacefillingcurves/Morton.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import org.apache.spark.sql.functions._
class Morton(val df: DataFrame, val cols: Array[String]) extends LazyLogging with SparkSessionWrapper {
import spark.implicits._

if (cols.length == 1) {
throw new Exception("You need at least 2 columns to morton order your data.")
}

private val columnTypes: Seq[(String, String)] = matchColumnWithType()
private val nonString = columnTypes.filter(t => t._2 != "StringType")
private val stringType = columnTypes.filter(t => t._2 == "StringType")
Expand Down Expand Up @@ -132,7 +136,10 @@ class Morton(val df: DataFrame, val cols: Array[String]) extends LazyLogging wit
*/
private def getNonStringBinaryDF: DataFrame = {

df
.select($"*" +: nonString.map(tup => getBinaryFunc(tup._2)(col(tup._1)).alias(tup._1 + "_binary")): _*)
if (nonString.nonEmpty) {
df
.select($"*" +: nonString.map(tup => getBinaryFunc(tup._2)(col(tup._1)).alias(tup._1 + "_binary")): _*)
}
else df
}
}
Binary file added src/test/resources/mixed_binary/._SUCCESS.crc
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file added src/test/resources/numeric_binary/._SUCCESS.crc
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file added src/test/resources/str_binary/._SUCCESS.crc
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Binary file added src/test/resources/z_index/._SUCCESS.crc
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType}


class BinarySpec extends AnyWordSpec
with Matchers
with PrivateMethodTester
with BeforeAndAfterAll {
class BinarySpec extends AnyWordSpec with Matchers with PrivateMethodTester with BeforeAndAfterAll {

val spark: SparkSession = SparkSession
.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021 DustinSmith.Io. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/
package io.dustinsmith.spacefillingcurves

import scala.util.hashing.MurmurHash3

import org.apache.spark.sql.functions.{col, hash}

/**
* Helper object to compare dataframes without using Holdenkarau (has some problems with Spark 3)
*/
object HashDataFrame extends SparkSessionWrapper {
import spark.implicits._

/**
* Computes a checksum on the entire contents of the supplied DataFrame. Checksum values can be used to confirm that
* dataframe contents are unchanged after operations that MUST NOT alter actual data
* (e.g. HDFS leaf file compaction, etc)
*
* This method builds hierarchichal hashes (from row hashes -> RDD partition hashes -> to a final DF hash) which
* makes it relatively inexpensive compared to other ways of comparing dataframes (e.g. joins, minus, etc).
* It can be used even for very large data/paths.
* Credit to https://github.com/beljun for this method
*
* @param df Dataframe to compute the checksum for.
* @param numParts Level of parallelism. Note that checksum value changes with different numParts value so it should
* remain the same across comparisons.
* @return Checksum for dataframe.
*/
def checksumDataFrame(df: org.apache.spark.sql.DataFrame, numParts: Int): Int = {

MurmurHash3.orderedHash(
df
.select(hash(df.columns.map(col): _*).as("row_hash"))
.repartition(numParts, $"row_hash")
.sortWithinPartitions("row_hash")
.mapPartitions(p => Array(MurmurHash3.orderedHash(p)).toIterator)
.orderBy($"value")
.collect()
)
}
}
170 changes: 169 additions & 1 deletion src/test/scala/io/dustinsmith/spacefillingcurves/MortonSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,174 @@
*/
package io.dustinsmith.spacefillingcurves

class MortonSpec {
import java.io.File

import scala.reflect.io.Directory

import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import org.apache.spark.sql.{DataFrame, SparkSession}


class MortonSpec extends AnyWordSpec with Matchers with PrivateMethodTester with BeforeAndAfterAll {

val spark: SparkSession = SparkSession
.builder()
.appName("MortonIndexTesting")
.master("local[2]")
.getOrCreate()

import spark.implicits._

override def afterAll(): Unit = {
new Directory(new File("spark-warehouse")).deleteRecursively
super.afterAll()
}

val df: DataFrame = Seq(
(1, 1, 12.23, "a", "m"),
(4, 9, 5.05, "b", "m"),
(3, 0, 1.23, "c", "f"),
(2, 2, 100.4, "d", "f"),
(1, 25, 3.25, "a", "m")
).toDF("x", "y", "amnt", "id", "sex")
val mortonNum: Morton = new Morton(df, Array("x", "y"))
val mortonStr: Morton = new Morton(df, Array("id", "sex"))
val mortonMixed: Morton = new Morton(df, Array("x", "id", "amnt"))

"matchColumnWithType numeric columns" should {

"return a tuple with column and data type" in {
val privateMethod: PrivateMethod[Seq[(String, String)]] =
PrivateMethod[Seq[(String, String)]]('matchColumnWithType)
val resultArray: Seq[(String, String)] = mortonNum invokePrivate privateMethod()
val expectedArray: Seq[(String, String)] = Seq(("x", "IntegerType"), ("y", "IntegerType"))

assert(resultArray == expectedArray)
}
}

"matchColumnWithType mixed columns" should {

"return a tuple with column and data type" in {
val privateMethod: PrivateMethod[Seq[(String, String)]] =
PrivateMethod[Seq[(String, String)]]('matchColumnWithType)
val resultArray: Seq[(String, String)] = mortonMixed invokePrivate privateMethod()
val expectedArray: Seq[(String, String)] = Seq(("x", "IntegerType"), ("amnt", "DoubleType"), ("id", "StringType"))

assert(resultArray == expectedArray)
}
}

"getNonStringBinaryDF str columns" should {

"return the original dataframe" in {
val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getNonStringBinaryDF)
val resultDF: DataFrame = mortonStr invokePrivate privateMethod()
val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1)
val expectedChecksum: Int = HashDataFrame.checksumDataFrame(df, 1)

assert(resultChecksum == expectedChecksum)
}
}

"getNonStringBinaryDF num columns" should {

"return the original dataframe with the binary columns for numeric" in {
val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getNonStringBinaryDF)
val resultDF: DataFrame = mortonNum invokePrivate privateMethod()
val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1)
val expectedDF: DataFrame = spark.read
.format("parquet")
.load(getClass.getResource("/numeric_binary").getPath)
val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1)

assert(resultChecksum == expectedChecksum)
}
}

"getBinaryDF str columns" should {

"return the original dataframe with the binary columns for strings" in {
val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getBinaryDF)
val resultDF: DataFrame = mortonStr invokePrivate privateMethod()
val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1)
val expectedDF: DataFrame = spark.read
.format("parquet")
.load(getClass.getResource("/str_binary").getPath)
val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1)

assert(resultChecksum == expectedChecksum)
}
}

"getBinaryDF num columns" should {

"return the original dataframe with the binary columns for numeric" in {
val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getBinaryDF)
val resultDF: DataFrame = mortonNum invokePrivate privateMethod()
val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1)
val expectedDF: DataFrame = spark.read
.format("parquet")
.load(getClass.getResource("/numeric_binary").getPath)
val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1)

assert(resultChecksum == expectedChecksum)
}
}

"getBinaryDF mixed columns" should {

"return the original dataframe with the binary columns for numeric and string" in {
val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('getBinaryDF)
val resultDF: DataFrame = mortonMixed invokePrivate privateMethod()
val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1)
val expectedDF: DataFrame = spark.read
.format("parquet")
.load(getClass.getResource("/mixed_binary").getPath)
val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1)

assert(resultChecksum == expectedChecksum)
}
}

// TODO: Figure out testing private UDFS
"interleaveBits" ignore {

"interleave the binary bit columns" in {
val numDF: DataFrame = spark.read
.format("parquet")
.load(getClass.getResource("/numeric_binary").getPath)
val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('interleaveBits)
}
}

"Morton class" should {

"throw an exception ig supplied column array is not greater than 1" in {
val thrown: Exception = the [Exception] thrownBy new Morton(df, Array("x"))

thrown.getMessage should equal(
"You need at least 2 columns to morton order your data."
)
}
}

"mortonIndex" should {

"return a dataframe with the column z_index" in {
val privateMethod: PrivateMethod[DataFrame] = PrivateMethod[DataFrame]('mortonIndex)
val resultDF: DataFrame = mortonNum invokePrivate privateMethod()
val resultChecksum: Int = HashDataFrame.checksumDataFrame(resultDF, 1)
val expectedDF: DataFrame = spark.read
.format("parquet")
.load(getClass.getResource("/z_index").getPath)
val expectedChecksum: Int = HashDataFrame.checksumDataFrame(expectedDF, 1)

assert(resultChecksum == expectedChecksum)
}
}
}

0 comments on commit 72ccae1

Please sign in to comment.