diff --git a/.github/workflows/core-ci.yml b/.github/workflows/core-ci.yml new file mode 100644 index 00000000..3ef8368c --- /dev/null +++ b/.github/workflows/core-ci.yml @@ -0,0 +1,22 @@ +name: core-ci + +on: + push: + branches: + - main + pull_request: + +jobs: + build: + strategy: + fail-fast: false + matrix: + spark: ["3.0.1", "3.1.3", "3.2.4", "3.3.4"] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v1 + - uses: olafurpg/setup-scala@v10 + - name: Test + run: sbt -Dspark.testVersion=${{ matrix.spark }} +"project core" test + - name: Code Quality + run: sbt "project core" scalafmtCheckAll diff --git a/.github/workflows/ci.yml b/.github/workflows/unsafe-ci.yml similarity index 57% rename from .github/workflows/ci.yml rename to .github/workflows/unsafe-ci.yml index 64e912e4..9b25c316 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/unsafe-ci.yml @@ -1,22 +1,22 @@ -name: ci +name: unsafe-ci on: push: branches: - main + pull_request: jobs: build: strategy: fail-fast: false matrix: - scala: ["2.12.12"] - spark: ["3.0.1"] + spark: ["3.2.4", "3.3.4"] runs-on: ubuntu-latest steps: - uses: actions/checkout@v1 - uses: olafurpg/setup-scala@v10 - name: Test - run: sbt -Dspark.testVersion=${{ matrix.spark }} ++${{ matrix.scala }} test + run: sbt -Dspark.testVersion=${{ matrix.spark }} +"project unsafe" test - name: Code Quality - run: sbt scalafmtCheckAll + run: sbt "project unsafe" scalafmtCheckAll diff --git a/.scalafmt.conf b/.scalafmt.conf index 9d5ae7e6..9d222f86 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,5 +1,5 @@ version = 2.6.3 - +lineEndings = preserve align = more maxColumn = 150 docstrings = JavaDoc diff --git a/build.sbt b/build.sbt index 94057629..2bf0b032 100644 --- a/build.sbt +++ b/build.sbt @@ -4,27 +4,62 @@ organization := "com.github.mrpowers" name := "spark-daria" version := "1.2.3" + crossScalaVersions := Seq("2.12.15", "2.13.8") scalaVersion := "2.12.15" -val sparkVersion = "3.2.1" +val versionRegex = """^(.*)\.(.*)\.(.*)$""".r + +val scala2_13 = "2.13.14" +val scala2_12 = "2.12.20" + +val sparkVersion = System.getProperty("spark.testVersion", "3.3.4") +crossScalaVersions := { + sparkVersion match { + case versionRegex("3", m, _) if m.toInt >= 2 => Seq(scala2_12, scala2_13) + case versionRegex("3", _, _) => Seq(scala2_12) + } +} + +scalaVersion := crossScalaVersions.value.head + +lazy val commonSettings = Seq( + javaOptions ++= { + Seq("-Xms512M", "-Xmx2048M", "-Duser.timezone=GMT") ++ (if (System.getProperty("java.version").startsWith("1.8.0")) + Seq("-XX:+CMSClassUnloadingEnabled") + else Seq.empty) + }, + libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", + "com.github.mrpowers" %% "spark-fast-tests" % "1.1.0" % "test", + "com.lihaoyi" %% "utest" % "0.7.11" % "test", + "com.lihaoyi" %% "os-lib" % "0.8.0" % "test" + ), +) + +lazy val core = (project in file("core")) + .settings( + commonSettings, + name := "core", + ) + +lazy val unsafe = (project in file("unsafe")) + .settings( + commonSettings, + name := "unsafe", + ) -libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" -libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided" -libraryDependencies += "com.github.mrpowers" %% "spark-fast-tests" % "1.1.0" % "test" -libraryDependencies += "com.lihaoyi" %% "utest" % "0.7.11" % "test" -libraryDependencies += "com.lihaoyi" %% "os-lib" % "0.8.0" % "test" testFrameworks += new TestFramework("com.github.mrpowers.spark.daria.CustomFramework") credentials += Credentials(Path.userHome / ".sbt" / "sonatype_credentials") Test / fork := true -javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:+CMSClassUnloadingEnabled", "-Duser.timezone=GMT") - licenses := Seq("MIT" -> url("http://opensource.org/licenses/MIT")) homepage := Some(url("https://github.com/MrPowers/spark-daria")) + developers ++= List( Developer("MrPowers", "Matthew Powers", "@MrPowers", url("https://github.com/MrPowers")) ) diff --git a/src/main/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/elt/Parser.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/elt/Parser.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/elt/Parser.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/elt/Parser.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/ColumnExt.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/ColumnExt.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/ColumnExt.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/ColumnExt.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/CustomTransform.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/CustomTransform.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/CustomTransform.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/CustomTransform.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DariaValidator.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DariaValidator.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DariaValidator.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DariaValidator.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DariaWriters.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DariaWriters.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DariaWriters.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DariaWriters.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsence.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsence.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsence.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsence.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsChecker.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsChecker.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsChecker.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsChecker.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameExt.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameExt.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameExt.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameExt.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaChecker.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaChecker.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaChecker.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaChecker.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidator.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidator.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidator.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidator.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/EtlDefinition.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/EtlDefinition.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/EtlDefinition.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/EtlDefinition.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExt.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExt.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExt.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExt.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactor.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactor.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactor.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactor.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExt.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExt.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExt.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExt.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/functions.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/functions.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/functions.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/functions.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/transformations.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/transformations.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/transformations.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/transformations.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcat.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcat.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcat.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcat.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/utils/DirHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/utils/DirHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/utils/DirHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/utils/DirHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/utils/NioUtils.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/utils/NioUtils.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/utils/NioUtils.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/utils/NioUtils.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/utils/RowHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/utils/RowHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/utils/RowHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/utils/RowHelpers.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriter.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriter.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriter.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriter.scala diff --git a/src/main/scala/com/github/mrpowers/spark/daria/utils/StringHelpers.scala b/core/src/main/scala/com/github/mrpowers/spark/daria/utils/StringHelpers.scala similarity index 100% rename from src/main/scala/com/github/mrpowers/spark/daria/utils/StringHelpers.scala rename to core/src/main/scala/com/github/mrpowers/spark/daria/utils/StringHelpers.scala diff --git a/src/test/resources/csvs/file1.csv b/core/src/test/resources/csvs/file1.csv similarity index 100% rename from src/test/resources/csvs/file1.csv rename to core/src/test/resources/csvs/file1.csv diff --git a/src/test/resources/csvs/file2.csv b/core/src/test/resources/csvs/file2.csv similarity index 100% rename from src/test/resources/csvs/file2.csv rename to core/src/test/resources/csvs/file2.csv diff --git a/src/test/resources/csvs/file3.csv b/core/src/test/resources/csvs/file3.csv similarity index 100% rename from src/test/resources/csvs/file3.csv rename to core/src/test/resources/csvs/file3.csv diff --git a/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties similarity index 100% rename from src/test/resources/log4j.properties rename to core/src/test/resources/log4j.properties diff --git a/src/test/resources/people.csv b/core/src/test/resources/people.csv similarity index 100% rename from src/test/resources/people.csv rename to core/src/test/resources/people.csv diff --git a/src/test/resources/some_data.csv b/core/src/test/resources/some_data.csv similarity index 100% rename from src/test/resources/some_data.csv rename to core/src/test/resources/some_data.csv diff --git a/src/test/resources/zipcodes.txt b/core/src/test/resources/zipcodes.txt similarity index 100% rename from src/test/resources/zipcodes.txt rename to core/src/test/resources/zipcodes.txt diff --git a/src/test/scala/com/github/mrpowers/spark/daria/CustomFramework.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/CustomFramework.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/CustomFramework.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/CustomFramework.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/delta/DeltaLogHelpersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParser.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParser.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParser.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParser.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParserTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParserTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParserTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/elt/StagingParserTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/hadoop/FsHelpersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/ColumnExtTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/ColumnExtTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/ColumnExtTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/ColumnExtTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DariaValidatorTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DariaValidatorTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DariaValidatorTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DariaValidatorTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DariaWritersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DariaWritersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DariaWritersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DariaWritersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsenceTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsenceTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsenceTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsAbsenceTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsCheckerTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsCheckerTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsCheckerTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameColumnsCheckerTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameExtTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameExtTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameExtTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameExtTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameHelpersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaCheckerTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaCheckerTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaCheckerTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameSchemaCheckerTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidatorTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidatorTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidatorTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/DataFrameValidatorTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/EtlDefinitionTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/EtlDefinitionTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/EtlDefinitionTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/EtlDefinitionTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/EtlHelpers.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/EtlHelpers.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/EtlHelpers.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/EtlHelpers.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/ExampleTransforms.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/ExampleTransforms.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/ExampleTransforms.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/ExampleTransforms.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExtTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExtTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExtTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsAsColumnExtTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/FunctionsTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactorTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactorTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactorTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/ParquetCompactorTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExtTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExtTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExtTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionExtTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionTestWrapper.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionTestWrapper.scala similarity index 78% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionTestWrapper.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionTestWrapper.scala index 7e1aff81..0516c8eb 100644 --- a/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionTestWrapper.scala +++ b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/SparkSessionTestWrapper.scala @@ -5,7 +5,7 @@ import org.apache.spark.sql.SparkSession trait SparkSessionTestWrapper { lazy val spark: SparkSession = { - SparkSession + val session = SparkSession .builder() .master("local") .appName("spark session") @@ -14,6 +14,8 @@ trait SparkSessionTestWrapper { "1" ) .getOrCreate() + session.sparkContext.setLogLevel("ERROR") + session } } diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/TransformationsTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/TransformationsTest.scala similarity index 98% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/TransformationsTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/TransformationsTest.scala index dbe913b6..e3276b96 100644 --- a/src/test/scala/com/github/mrpowers/spark/daria/sql/TransformationsTest.scala +++ b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/TransformationsTest.scala @@ -1047,22 +1047,6 @@ object TransformationsTest extends TestSuite with DataFrameComparer with ColumnC } 'withParquetCompatibleColumnNames - { - "blows up if the column name is invalid for Parquet" - { - val df = spark - .createDF( - List( - ("pablo") - ), - List( - ("Column That {Will} Break\t;", StringType, true) - ) - ) - val path = new java.io.File("./tmp/blowup/example").getCanonicalPath - val e = intercept[org.apache.spark.sql.AnalysisException] { - df.write.parquet(path) - } - } - "converts column names to be Parquet compatible" - { val actualDF = spark .createDF( diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructFieldHelpersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/types/StructTypeHelpersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcatTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcatTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcatTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/sql/udafs/ArrayConcatTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/utils/ArrayHelpersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/utils/RowHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/utils/RowHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/utils/RowHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/utils/RowHelpersTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriterTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriterTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriterTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/utils/SchemaSafeWriterTest.scala diff --git a/src/test/scala/com/github/mrpowers/spark/daria/utils/StringHelpersTest.scala b/core/src/test/scala/com/github/mrpowers/spark/daria/utils/StringHelpersTest.scala similarity index 100% rename from src/test/scala/com/github/mrpowers/spark/daria/utils/StringHelpersTest.scala rename to core/src/test/scala/com/github/mrpowers/spark/daria/utils/StringHelpersTest.scala diff --git a/src/main/scala/org/apache/spark/sql/catalyst/expressions/RandGamma.scala b/src/main/scala/org/apache/spark/sql/catalyst/expressions/RandGamma.scala deleted file mode 100644 index e8b9dd55..00000000 --- a/src/main/scala/org/apache/spark/sql/catalyst/expressions/RandGamma.scala +++ /dev/null @@ -1,84 +0,0 @@ -package org.apache.spark.sql.catalyst.expressions - -import org.apache.commons.math3.distribution.GammaDistribution -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.UnresolvedSeed -import org.apache.spark.sql.catalyst.expressions.codegen.FalseLiteral -import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode} -import org.apache.spark.sql.types._ -import org.apache.spark.util.random.XORShiftRandomAdapted - -case class RandGamma(child: Expression, shape: Expression, scale: Expression, hideSeed: Boolean = false) extends TernaryExpression - with ExpectsInputTypes - with Stateful - with ExpressionWithRandomSeed { - - override def seedExpression: Expression = child - - @transient protected lazy val seed: Long = seedExpression match { - case e if e.dataType == IntegerType => e.eval().asInstanceOf[Int] - case e if e.dataType == LongType => e.eval().asInstanceOf[Long] - } - - @transient protected lazy val shapeVal: Double = shape.dataType match { - case IntegerType => shape.eval().asInstanceOf[Int] - case LongType => shape.eval().asInstanceOf[Long] - case FloatType | DoubleType => shape.eval().asInstanceOf[Double] - } - - @transient protected lazy val scaleVal: Double = scale.dataType match { - case IntegerType => scale.eval().asInstanceOf[Int] - case LongType => scale.eval().asInstanceOf[Long] - case FloatType | DoubleType => scale.eval().asInstanceOf[Double] - } - - override protected def initializeInternal(partitionIndex: Int): Unit = { - distribution = new GammaDistribution(new XORShiftRandomAdapted(seed + partitionIndex), shapeVal, scaleVal) - } - @transient private var distribution: GammaDistribution = _ - - def this() = this(UnresolvedSeed, Literal(1.0, DoubleType), Literal(1.0, DoubleType), true) - - def this(child: Expression, shape: Expression, scale: Expression) = this(child, shape, scale, false) - - override def withNewSeed(seed: Long): RandGamma = RandGamma(Literal(seed, LongType), shape, scale, hideSeed) - - override protected def evalInternal(input: InternalRow): Double = distribution.sample() - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val distributionClassName = classOf[GammaDistribution].getName - val rngClassName = classOf[XORShiftRandomAdapted].getName - val disTerm = ctx.addMutableState(distributionClassName, "distribution") - ctx.addPartitionInitializationStatement( - s"$disTerm = new $distributionClassName(new $rngClassName(${seed}L + partitionIndex), $shapeVal, $scaleVal);") - ev.copy(code = code""" - final ${CodeGenerator.javaType(dataType)} ${ev.value} = $disTerm.sample();""", - isNull = FalseLiteral) - } - - override def freshCopy(): RandGamma = RandGamma(child, shape, scale, hideSeed) - - override def flatArguments: Iterator[Any] = Iterator(child, shape, scale) - - override def prettyName: String = "rand_gamma" - - override def sql: String = s"rand_gamma(${if (hideSeed) "" else s"${child.sql}, ${shape.sql}, ${scale.sql}"})" - - override def inputTypes: Seq[AbstractDataType] = Seq(LongType, DoubleType, DoubleType) - - override def dataType: DataType = DoubleType - - override def first: Expression = child - - override def second: Expression = shape - - override def third: Expression = scale - - override protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = - copy(child = newFirst, shape = newSecond, scale = newThird) -} - -object RandGamma { - def apply(seed: Long, shape: Double, scale: Double): RandGamma = RandGamma(Literal(seed, LongType), Literal(shape, DoubleType), Literal(scale, DoubleType)) -} diff --git a/unsafe/src/main/scala/org/apache/spark/sql/catalyst/expressions/RandGamma.scala b/unsafe/src/main/scala/org/apache/spark/sql/catalyst/expressions/RandGamma.scala new file mode 100644 index 00000000..ce3aed5a --- /dev/null +++ b/unsafe/src/main/scala/org/apache/spark/sql/catalyst/expressions/RandGamma.scala @@ -0,0 +1,96 @@ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.commons.math3.distribution.GammaDistribution +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.RandGamma.defaultSeedExpression +import org.apache.spark.sql.catalyst.expressions.codegen.FalseLiteral +import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode} +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils +import org.apache.spark.util.random.XORShiftRandomAdapted + +import scala.util.{Success, Try} + +case class RandGamma(child: Expression, shape: Expression, scale: Expression, hideSeed: Boolean = false) + extends TernaryExpression + with ExpectsInputTypes + with Stateful + with ExpressionWithRandomSeed { + + def seedExpression: Expression = child + + @transient protected lazy val seed: Long = seedExpression match { + case e if e.dataType == IntegerType => e.eval().asInstanceOf[Int] + case e if e.dataType == LongType => e.eval().asInstanceOf[Long] + } + + @transient protected lazy val shapeVal: Double = shape.dataType match { + case IntegerType => shape.eval().asInstanceOf[Int] + case LongType => shape.eval().asInstanceOf[Long] + case FloatType | DoubleType => shape.eval().asInstanceOf[Double] + } + + @transient protected lazy val scaleVal: Double = scale.dataType match { + case IntegerType => scale.eval().asInstanceOf[Int] + case LongType => scale.eval().asInstanceOf[Long] + case FloatType | DoubleType => scale.eval().asInstanceOf[Double] + } + + @transient private var distribution: GammaDistribution = _ + + protected def initializeInternal(partitionIndex: Int): Unit = { + distribution = new GammaDistribution(new XORShiftRandomAdapted(seed + partitionIndex), shapeVal, scaleVal) + } + + def this() = this(defaultSeedExpression, Literal(1.0, DoubleType), Literal(1.0, DoubleType), true) + + def this(child: Expression, shape: Expression, scale: Expression) = this(child, shape, scale, false) + + def withNewSeed(seed: Long): RandGamma = RandGamma(Literal(seed, LongType), shape, scale, hideSeed) + + protected def evalInternal(input: InternalRow): Double = distribution.sample() + + def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val distributionClassName = classOf[GammaDistribution].getName + val rngClassName = classOf[XORShiftRandomAdapted].getName + val disTerm = ctx.addMutableState(distributionClassName, "distribution") + ctx.addPartitionInitializationStatement( + s"$disTerm = new $distributionClassName(new $rngClassName(${seed}L + partitionIndex), $shapeVal, $scaleVal);" + ) + ev.copy(code = code""" + final ${CodeGenerator.javaType(dataType)} ${ev.value} = $disTerm.sample();""", isNull = FalseLiteral) + } + + def freshCopy(): RandGamma = RandGamma(child, shape, scale, hideSeed) + + override def flatArguments: Iterator[Any] = Iterator(child, shape, scale) + + override def prettyName: String = "rand_gamma" + + override def sql: String = s"rand_gamma(${if (hideSeed) "" else s"${child.sql}, ${shape.sql}, ${scale.sql}"})" + + def inputTypes: Seq[AbstractDataType] = Seq(LongType, DoubleType, DoubleType) + + def dataType: DataType = DoubleType + + def first: Expression = child + + def second: Expression = shape + + def third: Expression = scale + + protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = + copy(child = newFirst, shape = newSecond, scale = newThird) +} + +object RandGamma { + def apply(seed: Long, shape: Double, scale: Double): RandGamma = + RandGamma(Literal(seed, LongType), Literal(shape, DoubleType), Literal(scale, DoubleType)) + + def defaultSeedExpression: Expression = + Try(Class.forName("org.apache.spark.sql.catalyst.analysis.UnresolvedSeed")) match { + case Success(clazz) => clazz.getConstructor().newInstance().asInstanceOf[Expression] + case _ => Literal(Utils.random.nextLong(), LongType) + } +} diff --git a/src/main/scala/org/apache/spark/sql/daria/functions.scala b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala similarity index 70% rename from src/main/scala/org/apache/spark/sql/daria/functions.scala rename to unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala index c88ac9c3..d4322e5b 100644 --- a/src/main/scala/org/apache/spark/sql/daria/functions.scala +++ b/unsafe/src/main/scala/org/apache/spark/sql/daria/functions.scala @@ -9,18 +9,18 @@ object functions { private def withExpr(expr: Expression): Column = Column(expr) def randGamma(seed: Long, shape: Double, scale: Double): Column = withExpr(RandGamma(seed, shape, scale)).alias("gamma_random") - def randGamma(shape: Double, scale: Double): Column = randGamma(Utils.random.nextLong, shape, scale) - def randGamma(): Column = randGamma(1.0, 1.0) + def randGamma(shape: Double, scale: Double): Column = randGamma(Utils.random.nextLong, shape, scale) + def randGamma(): Column = randGamma(1.0, 1.0) def randLaplace(seed: Long, mu: Double, beta: Double): Column = { - val mu_ = lit(mu) + val mu_ = lit(mu) val beta_ = lit(beta) - val u = rand(seed) + val u = rand(seed) when(u < 0.5, mu_ + beta_ * log(lit(2) * u)) .otherwise(mu_ - beta_ * log(lit(2) * (lit(1) - u))) .alias("laplace_random") } def randLaplace(mu: Double, beta: Double): Column = randLaplace(Utils.random.nextLong, mu, beta) - def randLaplace(): Column = randLaplace(0.0, 1.0) + def randLaplace(): Column = randLaplace(0.0, 1.0) } diff --git a/src/main/scala/org/apache/spark/util/random/XORShiftRandomAdapted.scala b/unsafe/src/main/scala/org/apache/spark/util/random/XORShiftRandomAdapted.scala similarity index 94% rename from src/main/scala/org/apache/spark/util/random/XORShiftRandomAdapted.scala rename to unsafe/src/main/scala/org/apache/spark/util/random/XORShiftRandomAdapted.scala index ac9b5d7e..cd2bcf6d 100644 --- a/src/main/scala/org/apache/spark/util/random/XORShiftRandomAdapted.scala +++ b/unsafe/src/main/scala/org/apache/spark/util/random/XORShiftRandomAdapted.scala @@ -14,7 +14,7 @@ class XORShiftRandomAdapted(init: Long) extends java.util.Random(init: Long) wit nextSeed ^= (nextSeed >>> 35) nextSeed ^= (nextSeed << 4) seed = nextSeed - (nextSeed & ((1L << bits) -1)).asInstanceOf[Int] + (nextSeed & ((1L << bits) - 1)).asInstanceOf[Int] } override def setSeed(s: Long): Unit = { @@ -29,4 +29,3 @@ class XORShiftRandomAdapted(init: Long) extends java.util.Random(init: Long) wit this.seed = XORShiftRandom.hashSeed(RandomGeneratorFactory.convertToLong(seed)) } } - diff --git a/unsafe/src/test/scala/org/apache/spark/sql/daria/SparkSessionTestWrapper.scala b/unsafe/src/test/scala/org/apache/spark/sql/daria/SparkSessionTestWrapper.scala new file mode 100644 index 00000000..bc0706c9 --- /dev/null +++ b/unsafe/src/test/scala/org/apache/spark/sql/daria/SparkSessionTestWrapper.scala @@ -0,0 +1,21 @@ +package org.apache.spark.sql.daria + +import org.apache.spark.sql.SparkSession + +trait SparkSessionTestWrapper { + + lazy val spark: SparkSession = { + val session = SparkSession + .builder() + .master("local") + .appName("spark session") + .config( + "spark.sql.shuffle.partitions", + "1" + ) + .getOrCreate() + session.sparkContext.setLogLevel("ERROR") + session + } + +} diff --git a/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala b/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala similarity index 72% rename from src/test/scala/org/apache/spark/sql/daria/functionsTests.scala rename to unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala index 73e75613..5147c135 100644 --- a/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala +++ b/unsafe/src/test/scala/org/apache/spark/sql/daria/functionsTests.scala @@ -1,6 +1,5 @@ package org.apache.spark.sql.daria -import com.github.mrpowers.spark.daria.sql.SparkSessionTestWrapper import com.github.mrpowers.spark.fast.tests.{ColumnComparer, DataFrameComparer} import org.apache.spark.sql.daria.functions._ import org.apache.spark.sql.functions._ @@ -13,12 +12,14 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar 'rand_gamma - { "has correct mean and standard deviation" - { val sourceDF = spark.range(100000).select(randGamma(2.0, 2.0)) - val stats = sourceDF.agg( - mean("gamma_random").as("mean"), - stddev("gamma_random").as("stddev") - ).collect()(0) - - val gammaMean = stats.getAs[Double]("mean") + val stats = sourceDF + .agg( + mean("gamma_random").as("mean"), + stddev("gamma_random").as("stddev") + ) + .collect()(0) + + val gammaMean = stats.getAs[Double]("mean") val gammaStddev = stats.getAs[Double]("stddev") // Gamma distribution with shape=2.0 and scale=2.0 has mean=4.0 and stddev=sqrt(8.0) @@ -31,12 +32,14 @@ object functionsTests extends TestSuite with DataFrameComparer with ColumnCompar 'rand_laplace - { "has correct mean and standard deviation" - { val sourceDF = spark.range(100000).select(randLaplace()) - val stats = sourceDF.agg( - mean("laplace_random").as("mean"), - stddev("laplace_random").as("std_dev") - ).collect()(0) - - val laplaceMean = stats.getAs[Double]("mean") + val stats = sourceDF + .agg( + mean("laplace_random").as("mean"), + stddev("laplace_random").as("std_dev") + ) + .collect()(0) + + val laplaceMean = stats.getAs[Double]("mean") val laplaceStdDev = stats.getAs[Double]("std_dev") // Laplace distribution with mean=0.0 and scale=1.0 has mean=0.0 and stddev=sqrt(2.0)