Skip to content

Commit

Permalink
Use mock grid index system for unittest (#241)
Browse files Browse the repository at this point in the history
* trying out mock index system in tests

* experiment1

* build test helper

* test improvements

* refactor tests for spatial functions

* 1st part of feedback Milos

Co-authored-by: Tim Dikland <[email protected]>
  • Loading branch information
tdikland and TimDikland-DB authored Nov 4, 2022
1 parent c1a69e4 commit 4fe2970
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -1,76 +1,63 @@
package com.databricks.labs.mosaic.expressions.geometry

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index._
import com.databricks.labs.mosaic.functions.MosaicContext
import com.databricks.labs.mosaic.test.mocks
import org.apache.spark.sql.QueryTest
import com.databricks.labs.mosaic.test.{MosaicSpatialQueryTest, mocks}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext}
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{col, lit}
import org.scalatest.matchers.must.Matchers.noException
import org.scalatest.matchers.should.Matchers.{an, be, convertToAnyShouldWrapper}

trait ST_UnaryUnionBehaviours extends QueryTest {
def unaryUnionBehavior(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = {
val mc = MosaicContext.build(indexSystem, geometryAPI)
import mc.functions._
trait ST_UnaryUnionBehaviours extends MosaicSpatialQueryTest {

def behavior(mc: MosaicContext): Unit = {
val sc = spark
mc.register(sc)
import sc.implicits._
mc.register(spark)

val multiPolygon = List("MULTIPOLYGON (((10 10, 20 10, 20 20, 10 20, 10 10)), ((15 15, 25 15, 25 25, 15 25, 15 15)))")
val expected = List("POLYGON ((20 15, 20 10, 10 10, 10 20, 15 20, 15 25, 25 25, 25 15, 20 15))")
.map(mc.getGeometryAPI.geometry(_, "WKT"))
import mc.functions._

val results = multiPolygon
.toDF("multiPolygon")
.withColumn("result", st_unaryunion($"multiPolygon"))
.select($"result")
.as[String]
.collect()
.map(mc.getGeometryAPI.geometry(_, "WKT"))
val input = List("MULTIPOLYGON (((10 10, 20 10, 20 20, 10 20, 10 10)), ((15 15, 25 15, 25 25, 15 25, 15 15)))").toDF("input_geom")
val expected = List("POLYGON ((20 15, 20 10, 10 10, 10 20, 15 20, 15 25, 25 25, 25 15, 20 15))").toDF("result_geom")
val result = input.withColumn("result_geom", st_unaryunion(col("input_geom")))

results.zip(expected).foreach { case (l, r) => l.equals(r) shouldEqual true }
checkGeometryTopo(mc, result, expected, "result_geom")
}

def unaryUnionCodegen(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = {
def codegenCompilation(mc: MosaicContext): Unit = {
spark.sparkContext.setLogLevel("FATAL")
val mc = MosaicContext.build(indexSystem, geometryAPI)

val sc = spark
import mc.functions._
mc.register(sc)
import sc.implicits._
mc.register(spark)
import mc.functions._

val result = mocks.getWKTRowsDf(mc).select(st_unaryunion($"wkt"))

val queryExecution = result.queryExecution
val plan = queryExecution.executedPlan

val plan = result.queryExecution.executedPlan
val wholeStageCodegenExec = plan.find(_.isInstanceOf[WholeStageCodegenExec])

wholeStageCodegenExec.isDefined shouldBe true

val codeGenStage = wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec]
val (_, code) = codeGenStage.doCodeGen()

noException should be thrownBy CodeGenerator.compile(code)

val stUnaryUnion = ST_UnaryUnion(lit(1).expr, "JTS")
val stUnaryUnion = ST_UnaryUnion(lit(1).expr, "illegalAPI")
val ctx = new CodegenContext
an[Error] should be thrownBy stUnaryUnion.genCode(ctx)
}

def auxiliaryMethods(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = {
def auxiliaryMethods(mc: MosaicContext): Unit = {
spark.sparkContext.setLogLevel("FATAL")
val mc = MosaicContext.build(indexSystem, geometryAPI)
mc.register(spark)

val stUnaryUnion = ST_UnaryUnion(lit("MULTIPOLYGON (((10 10, 20 10, 20 20, 10 20, 10 10)), ((15 15, 25 15, 25 25, 15 25, 15 15)))").expr, "illegalAPI")
val sc = spark
mc.register(sc)

val input = "MULTIPOLYGON (((10 10, 20 10, 20 20, 10 20, 10 10)), ((15 15, 25 15, 25 25, 15 25, 15 15)))"

stUnaryUnion.child shouldEqual lit("MULTIPOLYGON (((10 10, 20 10, 20 20, 10 20, 10 10)), ((15 15, 25 15, 25 25, 15 25, 15 15)))").expr
stUnaryUnion.dataType shouldEqual lit("MULTIPOLYGON (((10 10, 20 10, 20 20, 10 20, 10 10)), ((15 15, 25 15, 25 25, 15 25, 15 15)))").expr.dataType
val stUnaryUnion = ST_UnaryUnion(lit(input).expr, "illegalAPI")
stUnaryUnion.child shouldEqual lit(input).expr
stUnaryUnion.dataType shouldEqual lit(input).expr.dataType
noException should be thrownBy stUnaryUnion.makeCopy(Array(stUnaryUnion.child))
}

}
}
Original file line number Diff line number Diff line change
@@ -1,41 +1,13 @@
package com.databricks.labs.mosaic.expressions.geometry

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI.{ESRI, JTS}
import com.databricks.labs.mosaic.core.index.{BNGIndexSystem, H3IndexSystem}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.internal.SQLConf
import com.databricks.labs.mosaic.test.MosaicSpatialQueryTest
import org.apache.spark.sql.test.SharedSparkSession

class ST_UnaryUnionTest extends QueryTest with SharedSparkSession with ST_UnaryUnionBehaviours {
private val noCodegen =
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString
) _
class ST_UnaryUnionTest extends MosaicSpatialQueryTest with SharedSparkSession with ST_UnaryUnionBehaviours {

private val codegenOnly =
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.CODEGEN_ONLY.toString
) _

test("Testing stUnaryUnion (H3, JTS) NO_CODEGEN") { noCodegen { unaryUnionBehavior(H3IndexSystem, JTS) } }
test("Testing stUnaryUnion (H3, ESRI) NO_CODEGEN") { noCodegen { unaryUnionBehavior(H3IndexSystem, ESRI) } }
test("Testing stUnaryUnion (BNG, JTS) NO_CODEGEN") { noCodegen { unaryUnionBehavior(BNGIndexSystem, JTS) } }
test("Testing stUnaryUnion (BNG, ESRI) NO_CODEGEN") { noCodegen { unaryUnionBehavior(BNGIndexSystem, ESRI) } }
test("Testing stUnaryUnion (H3, JTS) CODEGEN compilation") { codegenOnly { unaryUnionCodegen(H3IndexSystem, JTS) } }
test("Testing stUnaryUnion (H3, ESRI) CODEGEN compilation") { codegenOnly { unaryUnionCodegen(H3IndexSystem, ESRI) } }
test("Testing stUnaryUnion (BNG, JTS) CODEGEN compilation") { codegenOnly { unaryUnionCodegen(BNGIndexSystem, JTS) } }
test("Testing stUnaryUnion (BNG, ESRI) CODEGEN compilation") { codegenOnly { unaryUnionCodegen(BNGIndexSystem, ESRI) } }
test("Testing stUnaryUnion (H3, JTS) CODEGEN_ONLY") { codegenOnly { unaryUnionBehavior(H3IndexSystem, JTS) } }
test("Testing stUnaryUnion (H3, ESRI) CODEGEN_ONLY") { codegenOnly { unaryUnionBehavior(H3IndexSystem, ESRI) } }
test("Testing stUnaryUnion (BNG, JTS) CODEGEN_ONLY") { codegenOnly { unaryUnionBehavior(BNGIndexSystem, JTS) } }
test("Testing stUnaryUnion (BNG, ESRI) CODEGEN_ONLY") { codegenOnly { unaryUnionBehavior(BNGIndexSystem, ESRI) } }
test("Testing stUnaryUnion auxiliaryMethods (H3, JTS)") { noCodegen { auxiliaryMethods(H3IndexSystem, JTS) } }
test("Testing stUnaryUnion auxiliaryMethods (H3, ESRI)") { noCodegen { auxiliaryMethods(H3IndexSystem, ESRI) } }
test("Testing stUnaryUnion auxiliaryMethods (BNG, JTS)") { noCodegen { auxiliaryMethods(BNGIndexSystem, JTS) } }
test("Testing stUnaryUnion auxiliaryMethods (BNG, ESRI)") { noCodegen { auxiliaryMethods(BNGIndexSystem, ESRI) } }
testAllGeometriesCodegen("ST_UnaryUnion behavior") { behavior }
testAllGeometriesNoCodegen("ST_UnaryUnion behavior") { behavior }
testAllGeometriesCodegen("ST_UnaryUnion codegen compilation") { codegenCompilation }
testAllGeometriesNoCodegen("ST_UnaryUnion auxiliary methods") { auxiliaryMethods }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package com.databricks.labs.mosaic.test

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI.{ESRI, JTS}
import com.databricks.labs.mosaic.functions.MosaicContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.PlanTest
import com.databricks.labs.mosaic.core.index.{BNGIndexSystem, H3IndexSystem, IndexSystem}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.scalatest.{Assertions, BeforeAndAfterEach, Suite, Tag}

/**
* Provides helper methods for running tests against a matrix of geometry apis, grid index systems and SQL confs.
*/
abstract class MosaicSpatialQueryTest extends PlanTest with MosaicHelper {

private val geometryApis = Seq(ESRI, JTS)

private val indexSystems = Seq(H3IndexSystem, BNGIndexSystem)

protected def spark: SparkSession

/**
* Runs the testcase with all different geometry APIs while the grid index system is mocked out.
* Tests the codegen path of the query plan.
*/
protected def testAllGeometriesCodegen(testName: String, testTags: Tag*)(testFun: MosaicContext => Unit): Unit = {
val is = MockIndexSystem
for (geom <- geometryApis) {
super.test(testName + s" (codegen) (${geom.name}, ${is.name})", testTags: _*)(
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.CODEGEN_ONLY.toString
) {
withMosaicContext(geom, is) {
testFun
}
}
)

}
}

/**
* Runs the testcase with all different geometry APIs while the grid index system is mocked out.
* Tests the interpreted path of the query plan.
*/
protected def testAllGeometriesNoCodegen(testName: String, testTags: Tag*)(testFun: MosaicContext => Unit): Unit = {
val is = MockIndexSystem
for (geom <- geometryApis) {
super.test(testName + s" (no codegen) (${geom.name}, ${is.name})", testTags: _*)(
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString
) {
withMosaicContext(geom, is) {
testFun
}
}
)
}
}

/**
* Runs the testcase with all valid combinations of geometry API + grid index system.
* Tests the codegen path of the query plan.
*/
protected def testAllCodegen(testName: String, testTags: Tag*)(testFun: MosaicContext => Unit): Unit = {
for (geom <- geometryApis) {
for (is <- indexSystems) {
super.test(testName + s" (codegen) (${geom.name}, ${is.name})", testTags: _*)(
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.CODEGEN_ONLY.toString
) {
withMosaicContext(geom, is) {
testFun
}
}
)
}
}
}

/**
* Runs the testcase with all valid combinations of geometry API + grid index system.
* Tests the interpreted path of the query plan.
*/
protected def testAllNoCodegen(testName: String, testTags: Tag*)(testFun: MosaicContext => Unit): Unit = {
for (geom <- geometryApis) {
for (is <- indexSystems) {
super.test(testName + s" (no codegen) (${geom.name}, ${is.name})", testTags: _*)(
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString
) {
withMosaicContext(geom, is) {
testFun
}
}
)
}
}
}

def checkGeometryTopo(
mc: MosaicContext,
actualAnswer: DataFrame,
expectedAnswer: DataFrame,
geometryFieldName: String
): Unit = {
MosaicSpatialQueryTest.checkGeometryTopo(mc, actualAnswer, expectedAnswer, geometryFieldName)
}

}

object MosaicSpatialQueryTest extends Assertions {
/**
* Runs the query plan and checks if the answer is toplogogically equal to the expected result.
*
* @param mc the mosaic context that performs the equality check.
* @param actualAnswer the actual result as a [[DataFrame]].
* @param expectedAnswer the expected result as a [[DataFrame]].
* @param geometryFieldName the name of the column containing the geometries to be compared.
*/
def checkGeometryTopo(
mc: MosaicContext,
actualAnswer: DataFrame,
expectedAnswer: DataFrame,
geometryFieldName: String
): Unit = {
import mc.functions.st_aswkt

val actualGeoms = actualAnswer
.withColumn("answer_wkt", st_aswkt(col(geometryFieldName)))
.select(col("answer_wkt"))
.collect()
.map(_.getString(0))
.map(mc.getGeometryAPI.geometry(_, "WKT"))
val expectedGeoms = expectedAnswer
.withColumn("answer_wkt", st_aswkt(col(geometryFieldName)))
.select(col("answer_wkt"))
.collect()
.map(_.getString(0))
.map(mc.getGeometryAPI.geometry(_, "WKT"))

actualGeoms.zip(expectedGeoms).foreach { case (actualGeom, expectedGeom) =>
assert(actualGeom.equalsTopo(expectedGeom), s"$actualGeom did not topologically equal $expectedGeom")
}
}
}

trait MosaicHelper extends BeforeAndAfterEach { self: Suite =>

/**
* Constructs the MosaicContext from its parts and calls `f`.
*/
protected def withMosaicContext(geometry: GeometryAPI, indexSystem: IndexSystem)(f: MosaicContext => Unit): Unit = {
val mc: MosaicContext = MosaicContext.build(indexSystem, geometry)
f(mc)

}
}
41 changes: 38 additions & 3 deletions src/test/scala/com/databricks/labs/mosaic/test/package.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.databricks.labs.mosaic

import com.databricks.labs.mosaic.core.index.{BNGIndexSystem, H3IndexSystem}
import com.databricks.labs.mosaic.core.geometry.MosaicGeometry
import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index.{BNGIndexSystem, H3IndexSystem, IndexSystem, IndexSystemID}
import com.databricks.labs.mosaic.functions.MosaicContext

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.types.{DataType, IntegerType, StringType}

package object test {

Expand Down Expand Up @@ -256,6 +257,7 @@ package object test {
val rows = indexSystem match {
case H3IndexSystem => wkt_rows_epsg4326.map { x => Row(x: _*) }
case BNGIndexSystem => wkt_rows_epsg27700.map { x => Row(x: _*) }
case _ => wkt_rows_epsg4326.map { x => Row(x: _*) }
}
val rdd = spark.sparkContext.makeRDD(rows)
val schema = StructType(
Expand Down Expand Up @@ -326,4 +328,37 @@ package object test {
}

}

object MockIndexSystem extends IndexSystem {

override def name: String = "MOCK"

override def defaultDataTypeID: DataType = StringType

override def getIndexSystemID: IndexSystemID = ???

override def polyfill(geometry: MosaicGeometry, resolution: Int, geometryAPI: Option[GeometryAPI]): Seq[Long] = ???

override def format(id: Long): String = ???

override def getResolutionStr(resolution: Int): String = ???

override def pointToIndex(lon: Double, lat: Double, resolution: Int): Long = ???

override def kDisk(index: Long, n: Int): Seq[Long] = ???

override def kRing(index: Long, n: Int): Seq[Long] = ???

override def getResolution(res: Any): Int = ???

override def resolutions: Set[Int] = ???

override def indexToGeometry(index: Long, geometryAPI: GeometryAPI): MosaicGeometry = ???

override def indexToGeometry(index: String, geometryAPI: GeometryAPI): MosaicGeometry = ???

override def getBufferRadius(geometry: MosaicGeometry, resolution: Int, geometryAPI: GeometryAPI): Double = ???

}

}

0 comments on commit 4fe2970

Please sign in to comment.