diff --git a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/common/MetricsConfigurations.scala b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/common/MetricsConfigurations.scala index 3edeac9e80..37d95b1311 100644 --- a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/common/MetricsConfigurations.scala +++ b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/common/MetricsConfigurations.scala @@ -30,7 +30,7 @@ trait MetricsConfigurations { "The class makes available all metrics that shared across all algorithms, and ML problems." + " (classification, regression, dimension reduction)."), ModelMetricsSubstitutionContext( - "H2OBinomialMetrics", + "H2OBinomialMetricsBase", classOf[ModelMetricsBinomialV3[_, _]], Seq("H2OCommonMetrics"), "The class makes available all metrics that shared across all algorithms supporting binomial classification."), @@ -40,7 +40,7 @@ trait MetricsConfigurations { Seq("H2OBinomialMetrics", "H2OGLMMetrics"), "The class makes available all binomial metrics supported by GLM algorithm."), ModelMetricsSubstitutionContext( - "H2ORegressionMetrics", + "H2ORegressionMetricsBase", classOf[ModelMetricsRegressionV3[_, _]], Seq("H2OCommonMetrics"), "The class makes available all metrics that shared across all algorithms supporting regression."), @@ -55,7 +55,7 @@ trait MetricsConfigurations { Seq("H2ORegressionMetrics"), "The class makes available all regression metrics supported by CoxPH algorithm."), ModelMetricsSubstitutionContext( - "H2OMultinomialMetrics", + "H2OMultinomialMetricsBase", classOf[ModelMetricsMultinomialV3[_, _]], Seq("H2OCommonMetrics"), "The class makes available all metrics that shared across all algorithms supporting multinomial classification."), diff --git a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsFactoryTemplate.scala b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsFactoryTemplate.scala index 524b634825..c7db3a9c0a 100644 --- a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsFactoryTemplate.scala +++ b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsFactoryTemplate.scala @@ -22,7 +22,7 @@ import ai.h2o.sparkling.api.generation.common.{EntitySubstitutionContext, ModelM object MetricsFactoryTemplate extends ((Seq[ModelMetricsSubstitutionContext]) => String) with PythonEntityTemplate { def apply(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): String = { - val metricClasses = metricSubstitutionContexts.map(_.entityName) + val metricClasses = getEntityNames(metricSubstitutionContexts) val imports = Seq("py4j.java_gateway.JavaObject") ++ metricClasses.map(metricClass => s"ai.h2o.sparkling.ml.metrics.$metricClass.$metricClass") @@ -46,12 +46,22 @@ object MetricsFactoryTemplate extends ((Seq[ModelMetricsSubstitutionContext]) => } } - private def generatePatternMatchingCases(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): String = { + private def getEntityNames(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): Seq[String] = { metricSubstitutionContexts .map { metricSubstitutionContext => - val metricsObjectName = metricSubstitutionContext.entityName - s""" elif javaObject.getClass().getSimpleName() == "$metricsObjectName": - | return $metricsObjectName(javaObject)""".stripMargin + if (metricSubstitutionContext.entityName.endsWith("Base")) { + metricSubstitutionContext.entityName.substring(0, metricSubstitutionContext.entityName.length - 4) + } else { + metricSubstitutionContext.entityName + } + } + } + + private def generatePatternMatchingCases(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): String = { + getEntityNames(metricSubstitutionContexts) + .map { entityName => + s""" elif javaObject.getClass().getSimpleName() == "$entityName": + | return $entityName(javaObject)""".stripMargin } .mkString("\n") } diff --git a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsInitTemplate.scala b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsInitTemplate.scala index 1bdb65d56c..859cece2e8 100644 --- a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsInitTemplate.scala +++ b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/python/MetricsInitTemplate.scala @@ -22,7 +22,13 @@ import ai.h2o.sparkling.api.generation.common.{EntitySubstitutionContext, ModelM object MetricsInitTemplate extends ((Seq[ModelMetricsSubstitutionContext]) => String) with PythonEntityTemplate { def apply(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): String = { - val metricClasses = metricSubstitutionContexts.map(_.entityName) + val metricClasses = metricSubstitutionContexts.map { metricSubstitutionContext => + if (metricSubstitutionContext.entityName.endsWith("Base")) { + metricSubstitutionContext.entityName.substring(0, metricSubstitutionContext.entityName.length - 4) + } else { + metricSubstitutionContext.entityName + } + } val imports = metricClasses.map(metricClass => s"ai.h2o.sparkling.ml.metrics.$metricClass.$metricClass") val entitySubstitutionContext = EntitySubstitutionContext(null, null, null, imports) diff --git a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/r/MetricsFactoryTemplate.scala b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/r/MetricsFactoryTemplate.scala index bfe6d56b4f..29fdcd1c68 100644 --- a/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/r/MetricsFactoryTemplate.scala +++ b/api-generation/src/main/scala/ai/h2o/sparkling/api/generation/r/MetricsFactoryTemplate.scala @@ -22,7 +22,7 @@ import ai.h2o.sparkling.api.generation.common.ModelMetricsSubstitutionContext object MetricsFactoryTemplate extends ((Seq[ModelMetricsSubstitutionContext]) => String) { def apply(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): String = { - val metricClasses = metricSubstitutionContexts.map(_.entityName) + val metricClasses = getEntityNames(metricSubstitutionContexts) val imports = metricClasses.map(metricClass => s"""source(file.path("R", "${metricClass}.R"))""").mkString("\n") s"""# @@ -55,12 +55,23 @@ object MetricsFactoryTemplate extends ((Seq[ModelMetricsSubstitutionContext]) => |""".stripMargin } - private def generateCases(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): String = { + private def getEntityNames(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): Seq[String] = { metricSubstitutionContexts .map { metricSubstitutionContext => - val metricsObjectName = metricSubstitutionContext.entityName - s""" } else if (invoke(invoke(javaObject, "getClass"), "getSimpleName") == "$metricsObjectName") { - | rsparkling.$metricsObjectName(javaObject)""".stripMargin + if (metricSubstitutionContext.entityName.endsWith("Base")) { + metricSubstitutionContext.entityName.substring(0, metricSubstitutionContext.entityName.length - 4) + } else { + metricSubstitutionContext.entityName + } + } + } + + private def generateCases(metricSubstitutionContexts: Seq[ModelMetricsSubstitutionContext]): String = { + val names = getEntityNames(metricSubstitutionContexts) + names + .map { entityName => + s""" } else if (invoke(invoke(javaObject, "getClass"), "getSimpleName") == "$entityName") { + | rsparkling.$entityName(javaObject)""".stripMargin } .mkString("\n") } diff --git a/core/src/test/scala/ai/h2o/sparkling/TestUtils.scala b/core/src/test/scala/ai/h2o/sparkling/TestUtils.scala index feca5a8d02..f90c7d139f 100644 --- a/core/src/test/scala/ai/h2o/sparkling/TestUtils.scala +++ b/core/src/test/scala/ai/h2o/sparkling/TestUtils.scala @@ -24,7 +24,7 @@ import org.apache.spark.mllib import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.functions.{lit, rand} +import org.apache.spark.sql.functions.{lit, rand, col, abs} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.scalatest.Matchers @@ -100,6 +100,50 @@ object TestUtils extends Matchers { """.stripMargin) } + def assertDataFramesAreEqual( + expected: DataFrame, + produced: DataFrame, + identityColumn: String, + tolerance: Double): Unit = { + val tolerances = expected.schema.fields + .filterNot(_.name == identityColumn) + .filter(_.dataType.isInstanceOf[NumericType]) + .map(_.name -> tolerance) + .toMap + assertDataFramesAreEqual(expected, produced, identityColumn, tolerances) + } + + def assertDataFramesAreEqual( + expected: DataFrame, + produced: DataFrame, + identityColumn: String, + tolerances: Map[String, Double] = Map.empty): Unit = { + expected.schema shouldEqual produced.schema + val intersection = expected.as("expected").join(produced.as("produced"), identityColumn) + intersection.count() shouldEqual expected.count() + intersection.count() shouldEqual produced.count() + val isEqualExpression = expected.columns.foldLeft(lit(true)) { + case (partialExpression, columnName) => + val columnComparision = if (tolerances.contains(columnName)) { + val difference = abs(col(s"expected.$columnName") - col(s"produced.$columnName")) + difference <= lit(tolerances(columnName)) + } else if (columnName == identityColumn) { + lit(true) + } else { + col(s"expected.$columnName") === col(s"produced.$columnName") + } + partialExpression && columnComparision + } + val withComparisonDF = intersection.withColumn("isEqual", isEqualExpression) + val differentRowsDF = withComparisonDF + .filter(col("isEqual") === lit(false)) + .select(col(s"expected.$identityColumn") as "id") + val differentIds = differentRowsDF.collect().map(_.get(0)) + assert( + differentIds.length == 0, + s"The rows of ids($identityColumn) [${differentIds.mkString(", ")}] are not equal.") + } + def assertDatasetBasicProperties[T <: Product]( ds: Dataset[T], df: H2OFrame, diff --git a/doc/src/main/scala/ai/h2o/sparkling/doc/generation/Runner.scala b/doc/src/main/scala/ai/h2o/sparkling/doc/generation/Runner.scala index 2b0d592ac8..29ded6c37c 100644 --- a/doc/src/main/scala/ai/h2o/sparkling/doc/generation/Runner.scala +++ b/doc/src/main/scala/ai/h2o/sparkling/doc/generation/Runner.scala @@ -98,6 +98,7 @@ object Runner { } } else { val metricClasses = getParamClasses("ai.h2o.sparkling.ml.metrics") + .filter(_.getSimpleName.endsWith("Metrics")) writeResultToFile(MetricsTocTreeTemplate(metricClasses), "metrics", destinationDir) for (metricClass <- metricClasses) { val content = MetricsTemplate(metricClass) diff --git a/extensions/src/main/resources/META-INF/services/water.TypeMapExtension b/extensions/src/main/resources/META-INF/services/water.TypeMapExtension new file mode 100644 index 0000000000..9cd8d7c16b --- /dev/null +++ b/extensions/src/main/resources/META-INF/services/water.TypeMapExtension @@ -0,0 +1 @@ +hex.MetricsCalculationTypeExtensions diff --git a/extensions/src/main/scala/hex/MetricsCalculationTypeExtensions.java b/extensions/src/main/scala/hex/MetricsCalculationTypeExtensions.java new file mode 100644 index 0000000000..4a1aa22d49 --- /dev/null +++ b/extensions/src/main/scala/hex/MetricsCalculationTypeExtensions.java @@ -0,0 +1,57 @@ +package hex; + +import java.util.Arrays; +import water.TypeMapExtension; +import water.api.schemas3.*; + +public class MetricsCalculationTypeExtensions implements TypeMapExtension { + public static final String[] MODEL_BUILDER_CLASSES = { + ModelMetrics.MetricBuilder.class.getName(), + ModelMetricsSupervised.MetricBuilderSupervised.class.getName(), + ModelMetricsBinomial.MetricBuilderBinomial.class.getName(), + AUC2.AUCBuilder.class.getName(), + ModelMetricsRegression.MetricBuilderRegression.class.getName(), + Distribution.class.getName(), + GaussianDistribution.class.getName(), + BernoulliDistribution.class.getName(), + QuasibinomialDistribution.class.getName(), + ModifiedHuberDistribution.class.getName(), + MultinomialDistribution.class.getName(), + PoissonDistribution.class.getName(), + GammaDistribution.class.getName(), + TweedieDistribution.class.getName(), + HuberDistribution.class.getName(), + LaplaceDistribution.class.getName(), + QuantileDistribution.class.getName(), + CustomDistribution.class.getName(), + CustomDistributionWrapper.class.getName(), + LinkFunction.class.getName(), + IdentityFunction.class.getName(), + InverseFunction.class.getName(), + LogFunction.class.getName(), + LogitFunction.class.getName(), + OlogitFunction.class.getName(), + OloglogFunction.class.getName(), + OprobitFunction.class.getName(), + ModelMetricsMultinomial.MetricBuilderMultinomial.class.getName() + }; + + public static final String[] SCHEMA_CLASSES = { + ModelMetricsBaseV3.class.getName(), + ModelMetricsBinomialV3.class.getName(), + ModelMetricsMultinomialV3.class.getName(), + ModelMetricsRegressionV3.class.getName(), + ConfusionMatrixV3.class.getName(), + TwoDimTableV3.class.getName(), + TwoDimTableV3.ColumnSpecsBase.class.getName() + }; + + @Override + public String[] getBoostrapClasses() { + String[] result = + Arrays.copyOf(MODEL_BUILDER_CLASSES, MODEL_BUILDER_CLASSES.length + SCHEMA_CLASSES.length); + System.arraycopy( + SCHEMA_CLASSES, 0, result, MODEL_BUILDER_CLASSES.length, SCHEMA_CLASSES.length); + return result; + } +} diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/algos/BinomialPredictionTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/algos/BinomialPredictionTestSuite.scala index 6706574b58..7254f223d0 100644 --- a/ml/src/test/scala/ai/h2o/sparkling/ml/algos/BinomialPredictionTestSuite.scala +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/algos/BinomialPredictionTestSuite.scala @@ -183,57 +183,4 @@ class BinomialPredictionTestSuite extends FunSuite with Matchers with SharedH2OT assert(schema == expectedSchema) assert(schema == expectedSchemaByTransform) } - - private def assertMetrics[T](model: H2OMOJOModel): Unit = { - assertMetrics[T](model.getTrainingMetricsObject(), model.getTrainingMetrics()) - assertMetrics[T](model.getValidationMetricsObject(), model.getValidationMetrics()) - assert(model.getCrossValidationMetricsObject() == null) - assert(model.getCrossValidationMetrics() == Map()) - } - - private def assertMetrics[T](metricsObject: H2OMetrics, metrics: Map[String, Double]): Unit = { - metricsObject.isInstanceOf[T] should be(true) - MetricsAssertions.assertMetricsObjectAgainstMetricsMap(metricsObject, metrics) - val binomialObject = metricsObject.asInstanceOf[H2OBinomialMetrics] - binomialObject.getConfusionMatrix().count() > 0 - binomialObject.getConfusionMatrix().columns.length > 0 - binomialObject.getGainsLiftTable().count() > 0 - binomialObject.getGainsLiftTable().columns.length > 0 - binomialObject.getMaxCriteriaAndMetricScores().count() > 0 - binomialObject.getMaxCriteriaAndMetricScores().columns.length > 0 - binomialObject.getThresholdsAndMetricScores().count() > 0 - binomialObject.getThresholdsAndMetricScores().columns.length > 0 - } - - test("test binomial metric objects") { - val algo = new H2OGBM() - .setSplitRatio(0.8) - .setSeed(1) - .setFeaturesCols("sepal_len", "sepal_wid") - .setColumnsToCategorical("class") - .setLabelCol("class") - - val model = algo.fit(dataset) - assertMetrics[H2OBinomialMetrics](model) - - model.write.overwrite().save("ml/build/gbm_binomial_model_metrics") - val loadedModel = H2OGBMMOJOModel.load("ml/build/gbm_binomial_model_metrics") - assertMetrics[H2OBinomialMetrics](loadedModel) - } - - test("test binomial glm metric objects") { - val algo = new H2OGLM() - .setSplitRatio(0.8) - .setSeed(1) - .setFeaturesCols("sepal_len", "sepal_wid") - .setColumnsToCategorical("class") - .setLabelCol("class") - - val model = algo.fit(dataset) - assertMetrics[H2OBinomialGLMMetrics](model) - - model.write.overwrite().save("ml/build/glm_binomial_model_metrics") - val loadedModel = H2OGLMMOJOModel.load("ml/build/glm_binomial_model_metrics") - assertMetrics[H2OBinomialGLMMetrics](loadedModel) - } } diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/algos/RegressionPredictionTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/algos/RegressionPredictionTestSuite.scala index bfcdb5b3bc..637bae371f 100644 --- a/ml/src/test/scala/ai/h2o/sparkling/ml/algos/RegressionPredictionTestSuite.scala +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/algos/RegressionPredictionTestSuite.scala @@ -153,38 +153,4 @@ class RegressionPredictionTestSuite extends FunSuite with Matchers with SharedH2 metricsObject.isInstanceOf[T] should be(true) MetricsAssertions.assertMetricsObjectAgainstMetricsMap(metricsObject, metrics) } - - test("test regression metric objects") { - val algo = new algos.H2OGBM() - .setSplitRatio(0.8) - .setSeed(1) - .setWithContributions(true) - .setWithLeafNodeAssignments(true) - .setWithStageResults(true) - .setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") - .setLabelCol("AGE") - val model = algo.fit(dataset) - assertMetrics[H2ORegressionMetrics](model) - - model.write.overwrite().save("ml/build/gbm_regression_model_metrics") - val loadedModel = H2OGBMMOJOModel.load("ml/build/gbm_regression_model_metrics") - assertMetrics[H2ORegressionMetrics](loadedModel) - } - - test("test regression glm metric objects") { - val algo = new algos.H2OGLM() - .setSplitRatio(0.8) - .setSeed(1) - .setWithContributions(true) - .setWithLeafNodeAssignments(true) - .setWithStageResults(true) - .setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") - .setLabelCol("AGE") - val model = algo.fit(dataset) - assertMetrics[H2ORegressionGLMMetrics](model) - - model.write.overwrite().save("ml/build/glm_regression_model_metrics") - val loadedModel = H2OGLMMOJOModel.load("ml/build/glm_regression_model_metrics") - assertMetrics[H2ORegressionGLMMetrics](loadedModel) - } } diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/BinomialMetricsTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/BinomialMetricsTestSuite.scala new file mode 100644 index 0000000000..b34a5183f5 --- /dev/null +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/BinomialMetricsTestSuite.scala @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import ai.h2o.sparkling.ml.algos._ +import ai.h2o.sparkling.ml.models.{H2OGBMMOJOModel, H2OGLMMOJOModel, H2OMOJOModel} +import ai.h2o.sparkling.{SharedH2OTestContext, TestUtils} +import org.apache.spark.sql.functions.{rand, col} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.types._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FunSuite, Matchers} + +@RunWith(classOf[JUnitRunner]) +class BinomialMetricsTestSuite extends FunSuite with Matchers with SharedH2OTestContext { + + override def createSparkSession(): SparkSession = sparkSession("local[*]") + + import spark.implicits._ + + private lazy val dataset = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(TestUtils.locate("smalldata/prostate/prostate.csv")) + .withColumn("CAPSULE", 'CAPSULE.cast(StringType)) + .withColumn("RACE", 'RACE.cast(StringType)) + .withColumn("DCAPS", 'DCAPS.cast(StringType)) + .withColumn("WEIGHT", rand(42)) + .repartition(20) + + private lazy val Array(trainingDataset, validationDataset) = dataset.randomSplit(Array(0.8, 0.2), 1234L) + + private def assertMetrics[T](model: H2OMOJOModel): Unit = { + assertMetrics[T](model.getTrainingMetricsObject(), model.getTrainingMetrics()) + assertMetrics[T](model.getValidationMetricsObject(), model.getValidationMetrics()) + assert(model.getCrossValidationMetricsObject() == null) + assert(model.getCrossValidationMetrics() == Map()) + } + + private def assertMetrics[T](metricsObject: H2OMetrics, metrics: Map[String, Double]): Unit = { + metricsObject.isInstanceOf[T] should be(true) + MetricsAssertions.assertMetricsObjectAgainstMetricsMap(metricsObject, metrics) + val binomialObject = metricsObject.asInstanceOf[H2OBinomialMetrics] + binomialObject.getConfusionMatrix().count() should be > (0L) + binomialObject.getConfusionMatrix().columns should not be empty + binomialObject.getGainsLiftTable().count() should be > (0L) + binomialObject.getGainsLiftTable().columns should not be empty + binomialObject.getMaxCriteriaAndMetricScores().count() should be > (0L) + binomialObject.getMaxCriteriaAndMetricScores().columns should not be empty + binomialObject.getThresholdsAndMetricScores().count() should be > (0L) + binomialObject.getThresholdsAndMetricScores().columns should not be empty + } + + private def assertMetrics( + model: H2OMOJOModel, + trainingMetricObject: H2OBinomialMetrics, + validationMetricObject: H2OBinomialMetrics, + trainingMetricsTolerance: Double = 0.0, + validationMetricsTolerance: Double = 0.0): Unit = { + MetricsAssertions.assertEssentialMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + + if (trainingMetricsTolerance < Double.PositiveInfinity) { + val expectedTrainingMetricObject = model.getTrainingMetricsObject().asInstanceOf[H2OBinomialMetrics] + + // Confusion matrix is not correctly calculated in H2O-3 runtime. + val trainingConfusionMatrix = trainingMetricObject.getConfusionMatrix() + val expectedTrainingConfusionMatrix = expectedTrainingMetricObject.getConfusionMatrix() + if (expectedTrainingConfusionMatrix == null) { + trainingConfusionMatrix should be(null) + } else { + trainingConfusionMatrix.count() shouldBe >(0L) + trainingConfusionMatrix.count() shouldEqual expectedTrainingConfusionMatrix.count() + } + + val trainingMetricScores = trainingMetricObject.getThresholdsAndMetricScores().count() + val expectedTrainingMetricScores = expectedTrainingMetricObject.getThresholdsAndMetricScores().count() + trainingMetricScores shouldBe >(0L) + trainingMetricScores shouldEqual expectedTrainingMetricScores + TestUtils.assertDataFramesAreEqual( + trainingMetricObject.getMaxCriteriaAndMetricScores(), + expectedTrainingMetricObject.getMaxCriteriaAndMetricScores(), + "Metric", + trainingMetricsTolerance) + trainingMetricObject.getGainsLiftTable() shouldBe (null) // Gains-lift table is not supported yet. + } + + if (validationMetricsTolerance < Double.PositiveInfinity) { + val expectedValidationMetricObject = model.getValidationMetricsObject().asInstanceOf[H2OBinomialMetrics] + + // Confusion matrix is not correctly calculated in H2O-3 runtime. + val validationConfusionMatrix = validationMetricObject.getConfusionMatrix() + val expectedValidationConfusionMatrix = expectedValidationMetricObject.getConfusionMatrix() + if (expectedValidationConfusionMatrix == null) { + validationConfusionMatrix should be(null) + } else { + validationConfusionMatrix.count() shouldBe >(0L) + validationConfusionMatrix.count() shouldEqual expectedValidationConfusionMatrix.count() + } + + val validationMetricScores = validationMetricObject.getThresholdsAndMetricScores().count() + val expectedValidationMetricScores = expectedValidationMetricObject.getThresholdsAndMetricScores().count() + validationMetricScores shouldBe >(0L) + validationMetricScores shouldEqual expectedValidationMetricScores + TestUtils.assertDataFramesAreEqual( + validationMetricObject.getMaxCriteriaAndMetricScores(), + expectedValidationMetricObject.getMaxCriteriaAndMetricScores(), + "Metric", + validationMetricsTolerance) + validationMetricObject.getGainsLiftTable() shouldBe (null) // Gains-lift table is not supported yet. + } + } + + test("test binomial metric objects") { + val algo = new H2OGBM() + .setSplitRatio(0.8) + .setSeed(1) + .setFeaturesCols("AGE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("CAPSULE") + + val model = algo.fit(dataset) + assertMetrics[H2OBinomialMetrics](model) + + model.write.overwrite().save("ml/build/gbm_binomial_model_metrics") + val loadedModel = H2OGBMMOJOModel.load("ml/build/gbm_binomial_model_metrics") + assertMetrics[H2OBinomialMetrics](loadedModel) + } + + test("test binomial glm metric objects") { + val algo = new H2OGLM() + .setSplitRatio(0.8) + .setSeed(1) + .setFeaturesCols("AGE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("CAPSULE") + + val model = algo.fit(dataset) + assertMetrics[H2OBinomialGLMMetrics](model) + + model.write.overwrite().save("ml/build/glm_binomial_model_metrics") + val loadedModel = H2OGLMMOJOModel.load("ml/build/glm_binomial_model_metrics") + assertMetrics[H2OBinomialGLMMetrics](loadedModel) + } + + { + val algorithmsAndTolerances: Seq[(() => H2OSupervisedAlgorithm[_], Double, Double)] = Seq( + (() => new H2ODeepLearning(), 0.00001, 0.000001), + (() => new H2OXGBoost(), 0.0001, 0.0001), + (() => new H2OGBM(), 0.0001, 0.0001), + (() => new H2OGLM(), 0.00001, 0.000001), + (() => new H2ODRF(), Double.PositiveInfinity, 0.0001)) + + for ((algorithmGetter, trainingMetricsTolerance, validationMetricsTolerance) <- algorithmsAndTolerances) { + val algorithmName = algorithmGetter().getClass.getSimpleName + + test(s"test calculation of binomial $algorithmName metrics on arbitrary dataset") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("AGE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("CAPSULE") + + val model = algorithm.fit(trainingDataset) + + val domain = model.getDomainValues()("CAPSULE") + val trainingMetricObject = + H2OBinomialMetrics.calculate(model.transform(trainingDataset), domain, labelCol = "CAPSULE") + val validationMetricObject = + H2OBinomialMetrics.calculate(model.transform(validationDataset), domain, labelCol = "CAPSULE") + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + + test(s"test calculation of binomial $algorithmName metrics with probabilities passed to predictionCol") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("AGE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("CAPSULE") + + val model = algorithm.fit(trainingDataset) + val domain = model.getDomainValues()("CAPSULE") + + def extractProbability(df: DataFrame): DataFrame = { + df.withColumn("probability", col(s"detailed_prediction.probabilities.${domain(1)}")) + } + + val trainingMetricObject = + H2OBinomialMetrics.calculate( + extractProbability(model.transform(trainingDataset)), + domain, + labelCol = "CAPSULE", + predictionCol = "probability") + val validationMetricObject = + H2OBinomialMetrics.calculate( + extractProbability(model.transform(validationDataset)), + domain, + labelCol = "CAPSULE", + predictionCol = "probability") + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + + test(s"test calculation of binomial $algorithmName metrics with weightCol set on arbitrary dataset") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("AGE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("CAPSULE") + .setWeightCol("WEIGHT") + + val model = algorithm.fit(trainingDataset) + val domain = model.getDomainValues()("CAPSULE") + val trainingMetricObject = H2OBinomialMetrics.calculate( + model.transform(trainingDataset), + domain, + labelCol = "CAPSULE", + weightColOption = Some("WEIGHT")) + val validationMetricObject = H2OBinomialMetrics.calculate( + model.transform(validationDataset), + domain, + labelCol = "CAPSULE", + weightColOption = Some("WEIGHT")) + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + } + } + { + val algorithmsAndTolerances: Seq[(H2OSupervisedAlgorithm[_], Double, Double)] = + Seq((new H2OXGBoost(), 0.00001, 0.00001), (new H2OGBM(), 1, 0.00001), (new H2OGLM(), 0.00001, 0.000001)) + + for ((algorithm, trainingMetricsTolerance, validationMetricsTolerance) <- algorithmsAndTolerances) { + val algorithmName = algorithm.getClass.getSimpleName + + test(s"test calculation of binomial $algorithmName metrics with offsetCol set on arbitrary dataset") { + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("AGE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("CAPSULE") + .setOffsetCol("ID") + + val model = algorithm.fit(trainingDataset) + val domain = model.getDomainValues()("CAPSULE") + val trainingMetricObject = H2OBinomialMetrics.calculate( + model.transform(trainingDataset), + domain, + labelCol = "CAPSULE", + offsetColOption = Some("ID")) + val validationMetricObject = H2OBinomialMetrics.calculate( + model.transform(validationDataset), + domain, + labelCol = "CAPSULE", + offsetColOption = Some("ID")) + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + } + } +} diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MetricsAssertions.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MetricsAssertions.scala index 84f984a3e7..5c94b84316 100644 --- a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MetricsAssertions.scala +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MetricsAssertions.scala @@ -17,13 +17,19 @@ package ai.h2o.sparkling.ml.metrics +import ai.h2o.sparkling.ml.models.H2OMOJOModel +import org.apache.spark.sql.DataFrame import org.scalatest.Matchers object MetricsAssertions extends Matchers { - def assertMetricsObjectAgainstMetricsMap(metricsObject: H2OMetrics, metrics: Map[String, Double]): Unit = { + def assertMetricsObjectAgainstMetricsMap( + metricsObject: H2OMetrics, + metrics: Map[String, Double], + ignoredGetters: Set[String] = Set("getCustomMetricValue"), + tolerance: Double = 0.0): Unit = { for (getter <- metricsObject.getClass.getMethods if getter.getName.startsWith("get") - if getter.getName != "getCustomMetricValue" + if !ignoredGetters.contains("getCustomMetricValue") if getter.getParameterCount == 0 if getter.getReturnType.isPrimitive) { val value = getter.invoke(metricsObject) @@ -32,9 +38,57 @@ object MetricsAssertions extends Matchers { val metricValue = metrics.get(metricName).get if (metricValue.isNaN) { assert(value.asInstanceOf[Double].isNaN) + } else if (tolerance > 0.0) { + metricValue shouldBe (asInstanceOf[Double] +- tolerance) } else { - value shouldEqual metricValue + metricValue shouldBe value } } } + + def assertEqual( + expected: Map[String, Double], + actual: Map[String, Double], + ignored: Set[String] = Set("ScoringTime"), + tolerance: Double = 0.0, + skipExtraMetrics: Boolean = false): Unit = { + val expectedKeys = expected.keySet + val actualKeys = actual.keySet + + if (!skipExtraMetrics) { + expectedKeys shouldEqual actualKeys + } + + for (key <- actualKeys.diff(ignored)) { + if (expected(key).isNaN && actual(key).isNaN) { + // Values are equal + } else if (tolerance > 0.0) { + expected(key) shouldBe (actual(key) +- tolerance) + } else { + expected(key) shouldBe actual(key) + } + } + } + + def assertEssentialMetrics( + model: H2OMOJOModel, + trainingMetricsObject: H2OMetrics, + validationMetricsObject: H2OMetrics, + trainingMetricsTolerance: Double = 0.0, + validationMetricsTolerance: Double = 0.0): Unit = { + val expectedTrainingMetrics = model.getTrainingMetrics() + val expectedValidationMetrics = model.getValidationMetrics() + val ignoredGetters = Set("getCustomMetricValue", "getScoringTime") + + MetricsAssertions.assertMetricsObjectAgainstMetricsMap( + trainingMetricsObject, + expectedTrainingMetrics, + ignoredGetters, + trainingMetricsTolerance) + MetricsAssertions.assertMetricsObjectAgainstMetricsMap( + validationMetricsObject, + expectedValidationMetrics, + ignoredGetters, + validationMetricsTolerance) + } } diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MultinomialMetricsTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MultinomialMetricsTestSuite.scala new file mode 100644 index 0000000000..5398f2a252 --- /dev/null +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MultinomialMetricsTestSuite.scala @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import ai.h2o.sparkling.ml.algos._ +import ai.h2o.sparkling.ml.models.{H2OGBMMOJOModel, H2OGLMMOJOModel, H2OMOJOModel} +import ai.h2o.sparkling.{SharedH2OTestContext, TestUtils} +import hex.genmodel.GenModel +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions.{monotonically_increasing_id, rand, udf} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FunSuite, Matchers} +import org.apache.spark.sql.functions.{array, col} + +@RunWith(classOf[JUnitRunner]) +class MultinomialMetricsTestSuite extends FunSuite with Matchers with SharedH2OTestContext { + + override def createSparkSession(): SparkSession = sparkSession("local[*]") + + private lazy val dataset = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(TestUtils.locate("smalldata/iris/iris_wheader.csv")) + .withColumn("ID", monotonically_increasing_id) + .withColumn("WEIGHT", rand(42)) + .repartition(20) + + private lazy val Array(trainingDataset, validationDataset) = dataset.randomSplit(Array(0.8, 0.2), 42L) + + private def assertMetrics[T](model: H2OMOJOModel): Unit = { + assertMetrics[T](model.getTrainingMetricsObject(), model.getTrainingMetrics()) + assertMetrics[T](model.getValidationMetricsObject(), model.getValidationMetrics()) + assert(model.getCrossValidationMetricsObject() == null) + assert(model.getCrossValidationMetrics() == Map()) + } + + private def assertMetrics[T](metricsObject: H2OMetrics, metrics: Map[String, Double]): Unit = { + metricsObject.isInstanceOf[T] should be(true) + MetricsAssertions.assertMetricsObjectAgainstMetricsMap(metricsObject, metrics) + val multinomialObject = metricsObject.asInstanceOf[H2OMultinomialMetrics] + multinomialObject.getConfusionMatrix().count() should be > (0L) + multinomialObject.getConfusionMatrix().columns should not be empty + multinomialObject.getHitRatioTable().count() should be > (0L) + multinomialObject.getHitRatioTable().columns should not be empty + } + + private def assertMetrics( + model: H2OMOJOModel, + trainingMetricObject: H2OMultinomialMetrics, + validationMetricObject: H2OMultinomialMetrics, + trainingMetricsTolerance: Double = 0.0, + validationMetricsTolerance: Double = 0.0): Unit = { + MetricsAssertions.assertEssentialMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + + if (trainingMetricsTolerance < Double.PositiveInfinity) { + val expectedTrainingMetricObject = model.getTrainingMetricsObject().asInstanceOf[H2OMultinomialMetrics] + TestUtils.assertDataFramesAreEqual( + trainingMetricObject.getMultinomialAUCTable(), + expectedTrainingMetricObject.getMultinomialAUCTable(), + "Type", + trainingMetricsTolerance) + TestUtils.assertDataFramesAreEqual( + trainingMetricObject.getMultinomialPRAUCTable(), + expectedTrainingMetricObject.getMultinomialPRAUCTable(), + "Type", + trainingMetricsTolerance) + TestUtils.assertDataFramesAreIdentical( + trainingMetricObject.getConfusionMatrix(), + expectedTrainingMetricObject.getConfusionMatrix()) + TestUtils.assertDataFramesAreEqual( + trainingMetricObject.getHitRatioTable(), + expectedTrainingMetricObject.getHitRatioTable(), + "K", + trainingMetricsTolerance) + } + + if (validationMetricsTolerance < Double.PositiveInfinity) { + val expectedValidationMetricObject = model.getValidationMetricsObject().asInstanceOf[H2OMultinomialMetrics] + TestUtils.assertDataFramesAreEqual( + validationMetricObject.getMultinomialAUCTable(), + expectedValidationMetricObject.getMultinomialAUCTable(), + "Type", + validationMetricsTolerance) + TestUtils.assertDataFramesAreEqual( + validationMetricObject.getMultinomialPRAUCTable(), + expectedValidationMetricObject.getMultinomialPRAUCTable(), + "Type", + validationMetricsTolerance) + TestUtils.assertDataFramesAreIdentical( + validationMetricObject.getConfusionMatrix(), + expectedValidationMetricObject.getConfusionMatrix()) + TestUtils.assertDataFramesAreEqual( + validationMetricObject.getHitRatioTable(), + expectedValidationMetricObject.getHitRatioTable(), + "K", + validationMetricsTolerance) + } + } + + test("test multinomial metric objects") { + val algo = new H2OGBM() + .setSplitRatio(0.8) + .setSeed(1) + .setFeaturesCols("sepal_len", "sepal_wid", "petal_len", "petal_wid") + .setColumnsToCategorical("class") + .setLabelCol("class") + val model = algo.fit(dataset) + assertMetrics[H2OMultinomialMetrics](model) + + model.write.overwrite().save("ml/build/gbm_multinomial_model_metrics") + val loadedModel = H2OGBMMOJOModel.load("ml/build/gbm_multinomial_model_metrics") + assertMetrics[H2OMultinomialMetrics](loadedModel) + } + + test("test multinomial glm metric objects") { + val algo = new H2OGLM() + .setSplitRatio(0.8) + .setSeed(1) + .setFeaturesCols("sepal_len", "sepal_wid", "petal_len", "petal_wid") + .setColumnsToCategorical("class") + .setLabelCol("class") + val model = algo.fit(dataset) + assertMetrics[H2OMultinomialGLMMetrics](model) + model.write.overwrite().save("ml/build/glm_multinomial_model_metrics") + val loadedModel = H2OGLMMOJOModel.load("ml/build/glm_multinomial_model_metrics") + assertMetrics[H2OMultinomialGLMMetrics](loadedModel) + } + + { + val algorithmsAndTolerances: Seq[(() => H2OSupervisedAlgorithm[_], Double, Double)] = Seq( + (() => new H2ODeepLearning(), 0.00001, 0.00000001), + (() => new H2OXGBoost(), 0.00001, 0.00000001), + (() => new H2OGBM(), 0.00001, 0.00000001), + (() => new H2OGLM(), 0.00001, 0.00000001), + (() => new H2ODRF(), Double.PositiveInfinity, 0.00000001)) + + for ((algorithmGetter, trainingMetricsTolerance, validationMetricsTolerance) <- algorithmsAndTolerances) { + val algorithmName = algorithmGetter().getClass.getSimpleName + + test(s"test calculation of multinomial $algorithmName metrics on arbitrary dataset") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("sepal_len", "sepal_wid", "petal_len", "petal_wid") + .setColumnsToCategorical("class") + .set(algorithm.getParam("aucType"), "MACRO_OVR") + .setLabelCol("class") + + val model = algorithm.fit(trainingDataset) + val domain = model.getDomainValues()("class") + val trainingMetricObject = + H2OMultinomialMetrics.calculate( + model.transform(trainingDataset), + domain, + labelCol = "class", + aucType = "MACRO_OVR") + val validationMetricObject = + H2OMultinomialMetrics.calculate( + model.transform(validationDataset), + domain, + labelCol = "class", + aucType = "MACRO_OVR") + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + + test(s"test calculation of multinomial $algorithmName metrics with just probabilities") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("sepal_len", "sepal_wid", "petal_len", "petal_wid") + .setColumnsToCategorical("class") + .set(algorithm.getParam("aucType"), "MACRO_OVR") + .setLabelCol("class") + + val model = algorithm.fit(trainingDataset) + val priorClassDistribution = model.unwrapMojoModel()._priorClassDistrib + val domain = model.getDomainValues()("class") + def extractProbabilities(df: DataFrame) = { + val columns = domain.map(label => col(s"detailed_prediction.probabilities.$label")) + df.withColumn("probabilities", array(columns: _*)) + } + + val trainingMetricObject = + H2OMultinomialMetrics.calculate( + extractProbabilities(model.transform(trainingDataset)), + domain, + labelCol = "class", + predictionCol = "probabilities", + aucType = "MACRO_OVR") + val validationMetricObject = + H2OMultinomialMetrics.calculate( + extractProbabilities(model.transform(validationDataset)), + domain, + labelCol = "class", + predictionCol = "probabilities", + aucType = "MACRO_OVR") + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + + test(s"test calculation of multinomial $algorithmName metrics with weightCol set on arbitrary dataset") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("sepal_len", "sepal_wid", "petal_len", "petal_wid") + .setColumnsToCategorical("class") + .set(algorithm.getParam("aucType"), "MACRO_OVR") + .setLabelCol("class") + .setWeightCol("WEIGHT") + + val model = algorithm.fit(trainingDataset) + val domain = model.getDomainValues()("class") + val trainingMetricObject = H2OMultinomialMetrics.calculate( + model.transform(trainingDataset), + domain, + labelCol = "class", + weightColOption = Some("WEIGHT"), + aucType = "MACRO_OVR") + val validationMetricObject = H2OMultinomialMetrics.calculate( + model.transform(validationDataset), + domain, + labelCol = "class", + weightColOption = Some("WEIGHT"), + aucType = "MACRO_OVR") + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + } + } +} diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/NoRuntimeMetricsTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/NoRuntimeMetricsTestSuite.scala new file mode 100644 index 0000000000..de8123d065 --- /dev/null +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/NoRuntimeMetricsTestSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import ai.h2o.sparkling.ml.models.H2OMOJOModel +import ai.h2o.sparkling.SparkTestContext +import org.apache.spark.sql.SparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FunSuite, Matchers} + +@RunWith(classOf[JUnitRunner]) +class NoRuntimeMetricsTestSuite extends FunSuite with Matchers with SparkTestContext { + + override def createSparkSession(): SparkSession = sparkSession("local[*]") + + private lazy val irisDataFrame = { + spark.read.option("header", "true").option("inferSchema", "true").csv("examples/smalldata/iris/iris_wheader.csv") + } + + private lazy val prostateDataFrame = { + spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv("examples/smalldata/prostate/prostate.csv") + .withColumnRenamed("CAPSULE", "capsule") + } + + test("Test calculation of metrics on saved binomial model") { + val mojo = H2OMOJOModel.createFromMojo( + this.getClass.getClassLoader.getResourceAsStream("binom_model_prostate.mojo"), + "binom_model_prostate.mojo") + + val domain = mojo.getDomainValues()("capsule") + val metrics = H2OBinomialMetrics.calculate(mojo.transform(prostateDataFrame), domain, labelCol = "capsule") + metrics shouldNot be(null) + } + + test("Test calculation of metrics on saved regression model") { + val mojo = H2OMOJOModel.createFromMojo( + this.getClass.getClassLoader.getResourceAsStream("regre_model_prostate.mojo"), + "regre_model_prostate.mojo") + + val metrics = H2ORegressionMetrics.calculate(mojo.transform(prostateDataFrame), labelCol = "capsule") + metrics shouldNot be(null) + } + + test("Test calculation of metrics on saved multinomial model") { + val mojo = H2OMOJOModel.createFromMojo( + this.getClass.getClassLoader.getResourceAsStream("multi_model_iris.mojo"), + "multi_model_iris.mojo") + + val domain = mojo.getDomainValues()("class") + val metrics = H2OMultinomialMetrics.calculate(mojo.transform(irisDataFrame), domain, labelCol = "class") + metrics shouldNot be(null) + } +} diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/RegressionMetricsTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/RegressionMetricsTestSuite.scala new file mode 100644 index 0000000000..2992fa04f1 --- /dev/null +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/RegressionMetricsTestSuite.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import ai.h2o.sparkling.ml.algos +import ai.h2o.sparkling.ml.algos._ +import ai.h2o.sparkling.ml.models.{H2OGBMMOJOModel, H2OGLMMOJOModel, H2OMOJOModel} +import ai.h2o.sparkling.{SharedH2OTestContext, TestUtils} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StringType +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FunSuite, Matchers} + +@RunWith(classOf[JUnitRunner]) +class RegressionMetricsTestSuite extends FunSuite with Matchers with SharedH2OTestContext { + + override def createSparkSession(): SparkSession = sparkSession("local[*]") + + import spark.implicits._ + + private lazy val dataset = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(TestUtils.locate("smalldata/prostate/prostate.csv")) + .withColumn("RACE", 'RACE.cast(StringType)) + .withColumn("DCAPS", 'DCAPS.cast(StringType)) + .repartition(20) + + private lazy val Array(trainingDataset, validationDataset) = dataset.randomSplit(Array(0.8, 0.2), 42L) + + private def assertMetrics[T](model: H2OMOJOModel): Unit = { + assertMetrics[T](model.getTrainingMetricsObject(), model.getTrainingMetrics()) + assertMetrics[T](model.getValidationMetricsObject(), model.getValidationMetrics()) + assert(model.getCrossValidationMetricsObject() == null) + assert(model.getCrossValidationMetrics() == Map()) + } + + private def assertMetrics[T](metricsObject: H2OMetrics, metrics: Map[String, Double]): Unit = { + metricsObject.isInstanceOf[T] should be(true) + MetricsAssertions.assertMetricsObjectAgainstMetricsMap(metricsObject, metrics) + } + + test("test regression metric objects") { + val algo = new algos.H2OGBM() + .setSplitRatio(0.8) + .setSeed(1) + .setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("AGE") + val model = algo.fit(dataset) + assertMetrics[H2ORegressionMetrics](model) + + model.write.overwrite().save("ml/build/gbm_regression_model_metrics") + val loadedModel = H2OGBMMOJOModel.load("ml/build/gbm_regression_model_metrics") + assertMetrics[H2ORegressionMetrics](loadedModel) + } + + test("test regression glm metric objects") { + val algo = new algos.H2OGLM() + .setSplitRatio(0.8) + .setSeed(1) + .setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("AGE") + val model = algo.fit(dataset) + assertMetrics[H2ORegressionGLMMetrics](model) + + model.write.overwrite().save("ml/build/glm_regression_model_metrics") + val loadedModel = H2OGLMMOJOModel.load("ml/build/glm_regression_model_metrics") + assertMetrics[H2ORegressionGLMMetrics](loadedModel) + } + + { + val algorithmsAndTolerances: Seq[(() => H2OSupervisedAlgorithm[_], Double, Double)] = Seq( + (() => new H2ODeepLearning(), 0.00001, 0.00000001), + (() => new H2OXGBoost(), 0.00001, 0.00000001), + (() => new H2OGBM(), 0.0001, 0.00000001), + (() => new H2OGLM(), 0.00001, 0.00000001), + (() => new H2ODRF(), Double.PositiveInfinity, 0.00000001)) // ignore comparision on the training dataset + + for ((algorithmGetter, trainingMetricsTolerance, validationMetricsTolerance) <- algorithmsAndTolerances) { + val algorithmName = algorithmGetter().getClass.getSimpleName + + test(s"test calculation of regression $algorithmName metrics on arbitrary dataset") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "VOL", "GLEASON") + .setLabelCol("AGE") + + val model = algorithm.fit(trainingDataset) + val trainingMetrics = + H2ORegressionMetrics.calculate(dataFrame = model.transform(trainingDataset), labelCol = "AGE") + val validationMetrics = + H2ORegressionMetrics.calculate(dataFrame = model.transform(validationDataset), labelCol = "AGE") + + MetricsAssertions.assertEssentialMetrics( + model, + trainingMetrics, + validationMetrics, + trainingMetricsTolerance, + validationMetricsTolerance) + } + + test(s"test calculation of regression $algorithmName metrics with weight column set on arbitrary dataset ") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "VOL", "GLEASON") + .setLabelCol("AGE") + .setWeightCol("ID") + + val model = algorithm.fit(trainingDataset) + val trainingMetrics = H2ORegressionMetrics.calculate( + dataFrame = model.transform(trainingDataset), + labelCol = "AGE", + weightColOption = Some("ID")) + val validationMetrics = H2ORegressionMetrics.calculate( + dataFrame = model.transform(validationDataset), + labelCol = "AGE", + weightColOption = Some("ID")) + + MetricsAssertions.assertEssentialMetrics( + model, + trainingMetrics, + validationMetrics, + trainingMetricsTolerance, + validationMetricsTolerance) + } + } + } + { + val algorithmsAndTolerances: Seq[(H2OSupervisedAlgorithm[_], Double, Double)] = Seq( + (new H2OXGBoost(), 0.00001, 0.00000001), + (new H2OGBM(), 0.001, 0.00000001), + (new H2OGLM(), 0.00001, 0.00000001)) + + for ((algorithm, trainingMetricsTolerance, validationMetricsTolerance) <- algorithmsAndTolerances) { + val algorithmName = algorithm.getClass.getSimpleName + test(s"test calculation of regression $algorithmName metrics with offset column set on arbitrary dataset ") { + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("CAPSULE", "RACE", "DPROS", "DCAPS", "VOL", "GLEASON") + .setLabelCol("AGE") + .setOffsetCol("ID") + + val model = algorithm.fit(trainingDataset) + val trainingMetrics = H2ORegressionMetrics.calculate( + dataFrame = model.transform(trainingDataset), + labelCol = "AGE", + offsetColOption = Some("ID")) + val validationMetrics = H2ORegressionMetrics.calculate( + dataFrame = model.transform(validationDataset), + labelCol = "AGE", + offsetColOption = Some("ID")) + + MetricsAssertions.assertEssentialMetrics( + model, + trainingMetrics, + validationMetrics, + trainingMetricsTolerance, + validationMetricsTolerance) + } + } + } +} diff --git a/py-scoring/src/ai/h2o/sparkling/ml/__init__.py b/py-scoring/src/ai/h2o/sparkling/ml/__init__.py index 66247a7147..c06196c636 100644 --- a/py-scoring/src/ai/h2o/sparkling/ml/__init__.py +++ b/py-scoring/src/ai/h2o/sparkling/ml/__init__.py @@ -20,3 +20,4 @@ from ai.h2o.sparkling.ml.models import H2ODeepLearningMOJOModel, H2ODRFMOJOModel, H2OIsolationForestMOJOModel, H2OPCAMOJOModel, H2OGLRMMOJOModel from ai.h2o.sparkling.ml.models import H2OMOJOModel, H2OAlgorithmMOJOModel, H2OFeatureMOJOModel, H2OMOJOPipelineModel, H2OMOJOSettings from ai.h2o.sparkling.ml.models import H2OCoxPHMOJOModel, H2ORuleFitMOJOModel, H2OWord2VecMOJOModel, H2OExtendedIsolationForestMOJOModel +from ai.h2o.sparkling.ml.metrics import H2ORegressionMetrics, H2OBinomialMetrics, H2OMultinomialMetrics diff --git a/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.py b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.py new file mode 100644 index 0000000000..52b2659549 --- /dev/null +++ b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.ml.param import * +from ai.h2o.sparkling.ml.metrics.H2OBinomialMetricsBase import H2OBinomialMetricsBase +from ai.h2o.sparkling.Initializer import Initializer +from pyspark.ml.util import _jvm + + +class H2OBinomialMetrics(H2OBinomialMetricsBase): + + @staticmethod + def calculate(dataFrame, + domain, + predictionCol = "detailed_prediction", + labelCol = "label", + weightCol = None, + offsetCol = None): + ''' + The method calculates binomial metrics on a provided data frame with predictions and actual values. + :param dataFrame: A data frame with predictions and actual values + :param domain: A list of classes representing negative and positive response. Negative class must at position 0 + and positive at 1 + :param predictionCol: The name of prediction column. The prediction column must have the same type as + a detailed_prediction column coming from the transform method of H2OMOJOModel descendant. Or the type must + be FloatType/DoubleType where values represent probabilities of the positive response. + :param labelCol: The name of label column that contains actual values. + :param weightCol: The name of a weight column. + :param offsetCol: The name of a offset column. + :return: Calculated binomial metrics + ''' + # We need to make sure that Sparkling Water classes are available on the Spark driver and executor paths + Initializer.load_sparkling_jar() + javaMetrics = _jvm().ai.h2o.sparkling.ml.metrics.H2OBinomialMetrics.calculateInternal(dataFrame._jdf, + domain, + predictionCol, + labelCol, + weightCol, + offsetCol) + return H2OBinomialMetrics(javaMetrics) diff --git a/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.py b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.py new file mode 100644 index 0000000000..8e41d56f3f --- /dev/null +++ b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.ml.param import * +from ai.h2o.sparkling.ml.metrics.H2OMultinomialMetricsBase import H2OMultinomialMetricsBase +from ai.h2o.sparkling.Initializer import Initializer +from pyspark.ml.util import _jvm + + +class H2OMultinomialMetrics(H2OMultinomialMetricsBase): + + @staticmethod + def calculate(dataFrame, + domain, + predictionCol = "detailed_prediction", + labelCol = "label", + weightCol = None, + aucType = "AUTO"): + ''' + The method calculates multinomial metrics on a provided data frame with predictions and actual values. + :param dataFrame: A data frame with predictions and actual values. + :param domain: List of response classes. + :param predictionCol: The name of prediction column. The prediction column must have the same type as + a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or + a array type or vector of doubles where particular arrays represent class probabilities. + The order of probabilities must correspond to the order of labels in the passed domain. + :param labelCol: The name of label column that contains actual values. + :param weightCol: The name of a weight column. + :param aucType: Type of multinomial AUC/AUCPR calculation. Possible values: + - AUTO, + - NONE, + - MACRO_OVR, + - WEIGHTED_OVR, + - MACRO_OVO, + - WEIGHTED_OVO + :return: Calculated multinomial metrics + ''' + # We need to make sure that Sparkling Water classes are available on the Spark driver and executor paths + Initializer.load_sparkling_jar() + javaMetrics = _jvm().ai.h2o.sparkling.ml.metrics.H2OMultinomialMetrics.calculateInternal(dataFrame._jdf, + domain, + predictionCol, + labelCol, + weightCol, + aucType) + return H2OMultinomialMetrics(javaMetrics) diff --git a/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.py b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.py new file mode 100644 index 0000000000..b82dd6ded4 --- /dev/null +++ b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.ml.param import * +from ai.h2o.sparkling.ml.metrics.H2ORegressionMetricsBase import H2ORegressionMetricsBase +from ai.h2o.sparkling.Initializer import Initializer +from pyspark.ml.util import _jvm + + +class H2ORegressionMetrics(H2ORegressionMetricsBase): + + @staticmethod + def calculate(dataFrame, + predictionCol = "detailed_prediction", + labelCol = "label", + weightCol = None, + offsetCol = None): + ''' + The method calculates regression metrics on a provided data frame with predictions and actual values. + :param dataFrame: A data frame with predictions and actual values + :param predictionCol: The name of prediction column. The prediction column must have the same type as + a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or + it must be of DoubleType or FloatType. + :param labelCol: The name of label column that contains actual values. + :param weightCol: The name of a weight column. + :param offsetCol: The name of a offset column. + :return: Calculated regression metrics + ''' + # We need to make sure that Sparkling Water classes are available on the Spark driver and executor paths + Initializer.load_sparkling_jar() + javaMetrics = _jvm().ai.h2o.sparkling.ml.metrics.H2ORegressionMetrics.calculateInternal(dataFrame._jdf, + predictionCol, + labelCol, + weightCol, + offsetCol) + return H2ORegressionMetrics(javaMetrics) diff --git a/py-scoring/src/pysparkling/ml/__init__.py b/py-scoring/src/pysparkling/ml/__init__.py index f4d2f558f7..f339c94e3b 100644 --- a/py-scoring/src/pysparkling/ml/__init__.py +++ b/py-scoring/src/pysparkling/ml/__init__.py @@ -16,13 +16,15 @@ # from pysparkling.ml.models import * +from pysparkling.ml.metrics import * __all__ = ["H2OMOJOModel", "H2OSupervisedMOJOModel", "H2OTreeBasedSupervisedMOJOModel", "H2OUnsupervisedMOJOModel", "H2OTreeBasedUnsupervisedMOJOModel", "H2OMOJOPipelineModel", "H2OMOJOSettings", "H2OBinaryModel", "H2OKMeansMOJOModel", "H2OGLMMOJOModel", "H2OGAMMOJOModel", "H2OGBMMOJOModel", "H2OXGBoostMOJOModel", - "H2ODeepLearningMOJOModel", "H2ODRFMOJOModel", "H2OIsolationForestMOJOModel", - "H2OExtendedIsolationForestMOJOModel", "H2OPCAMOJOModel", "H2OGLRMMOJOModel", "H2OCoxPHMOJOModel", - "H2ORuleFitMOJOModel", "H2OWord2VecMOJOModel"] + "H2ODeepLearningMOJOModel", "H2ODRFMOJOModel", "H2OIsolationForestMOJOModel", "H2OPCAMOJOModel", + "H2OGLRMMOJOModel", "H2OCoxPHMOJOModel", "H2ORuleFitMOJOModel", "H2OWord2VecMOJOModel", + "H2OExtendedIsolationorestMOJOModel", "H2ORegressionMetrics", "H2OMultinomialMetrics", + "H2OBinomialMetrics"] from pysparkling.initializer import Initializer diff --git a/py-scoring/src/pysparkling/ml/metrics/__init__.py b/py-scoring/src/pysparkling/ml/metrics/__init__.py new file mode 100644 index 0000000000..a24e87398c --- /dev/null +++ b/py-scoring/src/pysparkling/ml/metrics/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from ai.h2o.sparkling.ml.metrics import H2ORegressionMetrics, H2OMultinomialMetrics, H2OBinomialMetrics + +__all__ = ["H2ORegressionMetrics", "H2OMultinomialMetrics", "H2OBinomialMetrics"] diff --git a/py/src/ai/h2o/sparkling/ml/__init__.py b/py/src/ai/h2o/sparkling/ml/__init__.py index dbac500a71..e1e120d5a9 100644 --- a/py/src/ai/h2o/sparkling/ml/__init__.py +++ b/py/src/ai/h2o/sparkling/ml/__init__.py @@ -27,3 +27,4 @@ from ai.h2o.sparkling.ml.models import H2ODeepLearningMOJOModel, H2OWord2VecMOJOModel, H2OAutoEncoderMOJOModel, H2ODRFMOJOModel, H2OPCAMOJOModel, H2OGLRMMOJOModel from ai.h2o.sparkling.ml.models import H2OIsolationForestMOJOModel, H2OCoxPHMOJOModel, H2ORuleFitMOJOModel, H2OExtendedIsolationForestMOJOModel, H2OStackedEnsembleMOJOModel from ai.h2o.sparkling.ml.models import H2OMOJOModel, H2OAlgorithmMOJOModel, H2OFeatureMOJOModel, H2OMOJOPipelineModel, H2OMOJOSettings +from ai.h2o.sparkling.ml.metrics import H2ORegressionMetrics, H2OBinomialMetrics, H2OMultinomialMetrics diff --git a/py/src/pysparkling/ml/__init__.py b/py/src/pysparkling/ml/__init__.py index b08eb61eb1..7162c90e15 100644 --- a/py/src/pysparkling/ml/__init__.py +++ b/py/src/pysparkling/ml/__init__.py @@ -19,6 +19,7 @@ from pysparkling.ml.algos.regression import * from pysparkling.ml.features import * from pysparkling.ml.models import * +from pysparkling.ml.metrics import * __all__ = ["ColumnPruner", "H2OGBM", "H2ODeepLearning", "H2OAutoML", "H2OXGBoost", "H2OGLM", "H2OCoxPH", "H2OGAM", "H2OMOJOModel", "H2OAlgorithmMOJOModel", "H2OFeatureMOJOModel", "H2OSupervisedMOJOModel", @@ -32,7 +33,8 @@ "H2ODRFMOJOModel", "H2OIsolationForestMOJOModel", "H2OWord2Vec", "H2OWord2VecMOJOModel", "H2OAutoEncoder", "H2OAutoEncoderMOJOModel", "H2OPCA", "H2OPCAMOJOModel", "H2OGLRM", "H2OGLRMMOJOModel", "H2ORuleFit", "H2ORuleFitClassifier", "H2ORuleFitRegressor", "H2ORuleFitMOJOModel", "H2OStackedEnsemble", - "H2OStackedEnsembleMOJOModel", "H2OExtendedIsolationForest", "H2OExtendedIsolationForestMOJOModel"] + "H2OStackedEnsembleMOJOModel", "H2ORegressionMetrics", "H2OBinomialMetrics", "H2OMultinomialMetrics", + "H2OExtendedIsolationForest"] from pysparkling.initializer import Initializer diff --git a/py/src/pysparkling/ml/metrics/__init__.py b/py/src/pysparkling/ml/metrics/__init__.py new file mode 100644 index 0000000000..9bec18e1f3 --- /dev/null +++ b/py/src/pysparkling/ml/metrics/__init__.py @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from ai.h2o.sparkling.ml.metrics import H2ORegressionMetrics, H2OBinomialMetrics, H2OMultinomialMetrics + + +__all__ = ["H2ORegressionMetrics", "H2OBinomialMetrics", "H2OMultinomialMetrics"] diff --git a/py/tests/unit/with_runtime_sparkling/conftest.py b/py/tests/unit/with_runtime_sparkling/conftest.py index 2b8c0799d2..e7350c7d13 100644 --- a/py/tests/unit/with_runtime_sparkling/conftest.py +++ b/py/tests/unit/with_runtime_sparkling/conftest.py @@ -60,6 +60,11 @@ def irisDatasetPath(): return "file://" + os.path.abspath("../examples/smalldata/iris/iris_wheader.csv") +@pytest.fixture(scope="module") +def irisDataset(spark, irisDatasetPath): + return spark.read.csv(irisDatasetPath, header=True, inferSchema=True) + + @pytest.fixture(scope="module") def airlinesDatasetPath(): return "file://" + os.path.abspath("../examples/smalldata/airlines/allyears2k_headers.csv") diff --git a/py/tests/unit/with_runtime_sparkling/test_metric_calculation.py b/py/tests/unit/with_runtime_sparkling/test_metric_calculation.py new file mode 100644 index 0000000000..8af4f13e06 --- /dev/null +++ b/py/tests/unit/with_runtime_sparkling/test_metric_calculation.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +from pysparkling.ml import * + + +def testRegressionMetricsCalculation(prostateDataset): + mojo = H2OMOJOModel.createFromMojo( + "file://" + os.path.abspath("../ml/src/test/resources/regre_model_prostate.mojo")) + metrics = H2ORegressionMetrics.calculate(mojo.transform(prostateDataset), labelCol = "CAPSULE") + assert metrics is not None + assert metrics.getMAE() > 0.0 + assert metrics.getRMSLE() > 0.0 + + +def testBinomialMetricsCalculation(prostateDataset): + mojo = H2OMOJOModel.createFromMojo( + "file://" + os.path.abspath("../ml/src/test/resources/binom_model_prostate.mojo")) + domain = mojo.getDomainValues()["capsule"] + metrics = H2OBinomialMetrics.calculate(mojo.transform(prostateDataset), domain, labelCol = "CAPSULE") + assert metrics is not None + assert metrics.getAUC() > 0.5 + assert metrics.getConfusionMatrix().count() > 0 + + +def testMultinomialMetricsCalculation(irisDataset): + mojo = H2OMOJOModel.createFromMojo( + "file://" + os.path.abspath("../ml/src/test/resources/multi_model_iris.mojo")) + domain = mojo.getDomainValues()["class"] + metrics = H2OMultinomialMetrics.calculate(mojo.transform(irisDataset), domain, labelCol = "class") + assert metrics is not None + assert metrics.getLogloss() > 0.0 + assert metrics.getConfusionMatrix().count() > 0 diff --git a/py/tests/unit/with_runtime_sparkling/test_mojo.py b/py/tests/unit/with_runtime_sparkling/test_mojo.py index 78c328cb39..fe395b0fcb 100644 --- a/py/tests/unit/with_runtime_sparkling/test_mojo.py +++ b/py/tests/unit/with_runtime_sparkling/test_mojo.py @@ -31,7 +31,7 @@ @pytest.fixture(scope="module") def gbmModel(prostateDataset): - gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="capsule") + gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="CAPSULE") return gbm.fit(prostateDataset) @@ -42,7 +42,7 @@ def testDomainColumns(gbmModel): assert domainValues["VOL"] is None assert domainValues["AGE"] is None assert domainValues["PSA"] is None - assert domainValues["capsule"] == ["0", "1"] + assert domainValues["CAPSULE"] == ["0", "1"] assert domainValues["RACE"] is None assert domainValues["ID"] is None @@ -74,7 +74,7 @@ def testFeatureTypes(gbmModel): assert types["VOL"] == "Numeric" assert types["AGE"] == "Numeric" assert types["PSA"] == "Numeric" - assert types["capsule"] == "Enum" + assert types["CAPSULE"] == "Enum" assert types["RACE"] == "Numeric" assert types["ID"] == "Numeric" assert len(types) == 9 @@ -208,7 +208,7 @@ def testGetCrossValidationSummary(): def testCrossValidationModelsAreAvailableAfterSavingAndLoading(prostateDataset): path = "file://" + os.path.abspath("build/testCrossValidationModelsAreAvialableAfterSavingAndLoading") nfolds = 3 - gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="capsule", + gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="CAPSULE", nfolds=nfolds, keepCrossValidationModels=True) model = gbm.fit(prostateDataset) model.write().overwrite().save(path) @@ -229,7 +229,7 @@ def testCrossValidationModelsAreAvailableAfterSavingAndLoading(prostateDataset): def testCrossValidationModelsAreNoneIfKeepCrossValidationModelsIsFalse(prostateDataset): - gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="capsule", + gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="CAPSULE", nfolds=3, keepCrossValidationModels=False) model = gbm.fit(prostateDataset) @@ -237,7 +237,7 @@ def testCrossValidationModelsAreNoneIfKeepCrossValidationModelsIsFalse(prostateD def testMetricObjects(prostateDataset): - gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="capsule", + gbm = H2OGBM(ntrees=2, seed=42, distribution="bernoulli", labelCol="CAPSULE", nfolds=3, keepCrossValidationModels=False) model = gbm.fit(prostateDataset) diff --git a/r/src/R/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.R b/r/src/R/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.R new file mode 100644 index 0000000000..6e984bfa29 --- /dev/null +++ b/r/src/R/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.R @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source(file.path("R", "H2OBinomialMetricsBase.R")) + +#' @export rsparkling.H2OBinomialMetricsBase +rsparkling.H2OBinomialMetrics <- setRefClass("rsparkling.H2OBinomialMetrics", contains = ("rsparkling.H2OBinomialMetricsBase")) + +H2OBinomialMetrics.calculate <- function(sparkFrame, + domain, + predictionCol = "detailed_prediction", + labelCol = "label", + weightCol = NULL, + offsetCol = NULL) { + sc <- spark_connection_find()[[1]] + sparkFrame <- spark_dataframe(sparkFrame) + javaMetrics <- invoke_static(sc, + "ai.h2o.sparkling.ml.metrics.H2OBinomialMetrics", + "calculateInternal", + sparkFrame, + domain, + predictionCol, + labelCol, + weightCol, + offsetCol) + rsparkling.H2OBinomialMetrics(javaMetrics) +} diff --git a/r/src/R/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.R b/r/src/R/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.R new file mode 100644 index 0000000000..6362721c71 --- /dev/null +++ b/r/src/R/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.R @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source(file.path("R", "H2OMultinomialMetricsBase.R")) + +#' @export rsparkling.H2OMultinomialMetricsBase +rsparkling.H2OMultinomialMetrics <- setRefClass("rsparkling.H2OMultinomialMetrics", contains = ("rsparkling.H2OMultinomialMetricsBase")) + +H2OMultinomialMetrics.calculate <- function(sparkFrame, + domain, + predictionCol = "detailed_prediction", + labelCol = "label", + weightCol = NULL, + aucType = "AUTO") { + sc <- spark_connection_find()[[1]] + sparkFrame <- spark_dataframe(sparkFrame) + javaMetrics <- invoke_static(sc, + "ai.h2o.sparkling.ml.metrics.H2OMultinomialMetrics", + "calculateInternal", + sparkFrame, + domain, + predictionCol, + labelCol, + weightCol, + aucType) + rsparkling.H2OMultinomialMetrics(javaMetrics) +} diff --git a/r/src/R/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.R b/r/src/R/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.R new file mode 100644 index 0000000000..26996608c4 --- /dev/null +++ b/r/src/R/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.R @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source(file.path("R", "H2ORegressionMetricsBase.R")) + +#' @export rsparkling.H2ORegressionMetricsBase +rsparkling.H2ORegressionMetrics <- setRefClass("rsparkling.H2ORegressionMetrics", contains = ("rsparkling.H2ORegressionMetricsBase")) + +H2ORegressionMetrics.calculate <- function(sparkFrame, + predictionCol = "detailed_prediction", + labelCol = "label", + weightCol = NULL, + offsetCol = NULL) { + sc <- spark_connection_find()[[1]] + sparkFrame <- spark_dataframe(sparkFrame) + javaMetrics <- invoke_static(sc, + "ai.h2o.sparkling.ml.metrics.H2ORegressionMetrics", + "calculateInternal", + sparkFrame, + predictionCol, + labelCol, + weightCol, + offsetCol) + rsparkling.H2ORegressionMetrics(javaMetrics) +} diff --git a/r/src/tests/testthat/testMetricCalculation.R b/r/src/tests/testthat/testMetricCalculation.R new file mode 100644 index 0000000000..d76c44d45d --- /dev/null +++ b/r/src/tests/testthat/testMetricCalculation.R @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("Test metrics calculation") + +config <- spark_config() +config <- c(config, list( + "spark.hadoop.yarn.timeline-service.enabled" = "false", + "spark.ext.h2o.external.cluster.size" = "1", + "spark.ext.h2o.backend.cluster.mode" = Sys.getenv("spark.ext.h2o.backend.cluster.mode"), + "sparklyr.connect.enablehivesupport" = FALSE, + "sparklyr.gateway.connect.timeout" = 240, + "sparklyr.gateway.start.timeout" = 240, + "sparklyr.backend.timeout" = 240, + "sparklyr.log.console" = TRUE, + "spark.ext.h2o.external.start.mode" = "auto", + "spark.ext.h2o.external.disable.version.check" = "true", + "sparklyr.gateway.port" = 55555, + "sparklyr.connect.timeout" = 60 * 5, + "spark.master" = "local[*]" +)) + +for (i in 1:4) { + tryCatch( + { + sc <- spark_connect(master = "local[*]", config = config) + }, error = function(e) { } + ) +} + +locate <- function(fileName) { + normalizePath(file.path("../../../../../examples/", fileName)) +} + +test_that("test training metrics", { + model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) + metrics <- model$getTrainingMetrics() + expect_equal(as.character(metrics[["AUC"]]), "0.896878869021911") + expect_equal(length(metrics), 10) +}) + +test_that("test training metrics object", { + model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) + metrics <- model$getTrainingMetricsObject() + aucValue <- metrics$getAUC() + scoringTime <- metrics$getScoringTime() + + thresholdsAndScores <- metrics$getThresholdsAndMetricScores() + thresholdsAndScoresFrame <- dplyr::tally(thresholdsAndScores) + thresholdsAndScoresCount <- as.double(dplyr::collect(thresholdsAndScoresFrame)[[1]]) + + gainsLiftTable <- metrics$getGainsLiftTable() + gainsLiftTableFrame <- dplyr::tally(gainsLiftTable) + gainsLiftTableCount <- as.double(dplyr::collect(gainsLiftTableFrame)[[1]]) + + expect_equal(as.character(aucValue), "0.896878869021911") + expect_true(scoringTime > 0) + expect_true(thresholdsAndScoresCount > 0) + expect_true(gainsLiftTableCount > 0) +}) + +test_that("test null cross validation metrics object", { + model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) + cvObject <- model$getCrossValidationMetricsObject() + expect_true(is.null(cvObject)) +}) + +test_that("test current metrics", { + model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) + metrics <- model$getCurrentMetrics() + expect_equal(metrics, model$getTrainingMetrics()) +}) + +test_that("test calculation of regression metrics", { + path <- paste0("file://", locate("smalldata/prostate/prostate.csv")) + dataset <- spark_read_csv(sc, path = path, infer_schema = TRUE, header = TRUE) + model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/regre_model_prostate.mojo"))) + predictions <- model$transform(dataset) + + metrics <- H2ORegressionMetrics.calculate(predictions, labelCol = "CAPSULE") + + mae <- metrics$getMAE() + rmsle <- metrics$getRMSLE() + + expect_true(mae > 0.0) + expect_true(rmsle > 0.0) +}) + +test_that("test calculation of binomial metrics", { + path <- paste0("file://", locate("smalldata/prostate/prostate.csv")) + dataset <- spark_read_csv(sc, path = path, infer_schema = TRUE, header = TRUE) + model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) + predictions <- model$transform(dataset) + domainValues <- model$getDomainValues() + + metrics <- H2OBinomialMetrics.calculate(predictions, domainValues[["capsule"]], labelCol = "CAPSULE") + + aucValue <- metrics$getAUC() + scoringTime <- metrics$getScoringTime() + + thresholdsAndScores <- metrics$getThresholdsAndMetricScores() + thresholdsAndScoresFrame <- dplyr::tally(thresholdsAndScores) + thresholdsAndScoresCount <- as.double(dplyr::collect(thresholdsAndScoresFrame)[[1]]) + + expect_true(aucValue > 0.6) + expect_true(scoringTime > 0) + expect_true(thresholdsAndScoresCount > 0) +}) + +test_that("test calculation of multinomial metrics", { + path <- paste0("file://", locate("smalldata/iris/iris_wheader.csv")) + dataset <- spark_read_csv(sc, path = path, infer_schema = TRUE, header = TRUE) + model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/multi_model_iris.mojo"))) + predictions <- model$transform(dataset) + domainValues <- model$getDomainValues() + + metrics <- H2OMultinomialMetrics.calculate(predictions, domainValues[["class"]], labelCol = "class") + + logloss <- metrics$getLogloss() + scoringTime <- metrics$getScoringTime() + + confusionMatrix <- metrics$getConfusionMatrix() + confusionMatrixFrame <- dplyr::tally(confusionMatrix) + confusionMatrixCount <- as.double(dplyr::collect(confusionMatrixFrame)[[1]]) + + expect_true(logloss > 0.0) + expect_true(scoringTime > 0) + expect_true(confusionMatrixCount > 0) +}) + +spark_disconnect(sc) diff --git a/r/src/tests/testthat/testMojo.R b/r/src/tests/testthat/testMojo.R index 3c830c8e5e..099274bf64 100644 --- a/r/src/tests/testthat/testMojo.R +++ b/r/src/tests/testthat/testMojo.R @@ -115,45 +115,6 @@ test_that("test model category", { expect_equal(category, "Binomial") }) -test_that("test training metrics", { - model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) - metrics <- model$getTrainingMetrics() - expect_equal(as.character(metrics[["AUC"]]), "0.896878869021911") - expect_equal(length(metrics), 10) -}) - -test_that("test training metrics object", { - model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) - metrics <- model$getTrainingMetricsObject() - aucValue <- metrics$getAUC() - scoringTime <- metrics$getScoringTime() - - thresholdsAndScores <- metrics$getThresholdsAndMetricScores() - thresholdsAndScoresFrame <- dplyr::tally(thresholdsAndScores) - thresholdsAndScoresCount <- as.double(dplyr::collect(thresholdsAndScoresFrame)[[1]]) - - gainsLiftTable <- metrics$getGainsLiftTable() - gainsLiftTableFrame <- dplyr::tally(gainsLiftTable) - gainsLiftTableCount <- as.double(dplyr::collect(gainsLiftTableFrame)[[1]]) - - expect_equal(as.character(aucValue), "0.896878869021911") - expect_true(scoringTime > 0) - expect_true(thresholdsAndScoresCount > 0) - expect_true(gainsLiftTableCount > 0) -}) - -test_that("test null cross validation metrics object", { - model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) - cvObject <- model$getCrossValidationMetricsObject() - expect_true(is.null(cvObject)) -}) - -test_that("test current metrics", { - model <- H2OMOJOModel.createFromMojo(paste0("file://", normalizePath("../../../../../ml/src/test/resources/binom_model_prostate.mojo"))) - metrics <- model$getCurrentMetrics() - expect_equal(metrics, model$getTrainingMetrics()) -}) - test_that("test MOJO predictions on unseen categoricals", { path <- paste0("file://", normalizePath("../../../../../ml/src/test/resources/deep_learning_airlines_categoricals.zip")) settings <- H2OMOJOSettings(convertUnknownCategoricalLevelsToNa = TRUE) diff --git a/scoring/build.gradle b/scoring/build.gradle index 2188d58962..6b65d1679a 100644 --- a/scoring/build.gradle +++ b/scoring/build.gradle @@ -26,6 +26,10 @@ dependencies { api("ai.h2o:h2o-genmodel-ext-xgboost:${h2oVersion}") api(project(":sparkling-water-utils")) + // Required for model metrics calculation + implementation("ai.h2o:h2o-core:${h2oVersion}") + implementation(project(":sparkling-water-extensions")) + compileOnly(project(':sparkling-water-api-generation')) compileOnly("ai.h2o:h2o-ext-xgboost:${h2oVersion}") compileOnly("org.apache.spark:spark-core_${scalaBaseVersion}:${sparkVersion}") diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala new file mode 100644 index 0000000000..6eed9819b3 --- /dev/null +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import hex.ModelMetricsBinomial.MetricBuilderBinomial +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.col + +@MetricsDescription( + description = + "The class makes available all metrics that shared across all algorithms supporting binomial classification.") +class H2OBinomialMetrics(override val uid: String) extends H2OBinomialMetricsBase(uid) { + + def this() = this(Identifiable.randomUID("H2OBinomialMetrics")) +} + +object H2OBinomialMetrics extends MetricCalculation { + + /** + * The method calculates binomial metrics on a provided data frame with predictions and actual values. + * + * @param dataFrame A data frame with predictions and actual values + * @param domain Array of classes representing negative and positive response. Negative class must at position 0 and + * positive at 1. + * @param predictionCol The name of prediction column. The prediction column must have the same type as + * a detailed_prediction column coming from the transform method of H2OMOJOModel descendant. + * Or the type must be FloatType/DoubleType where values represent probabilities of + * the positive response. + * @param labelCol The name of label column that contains actual values. + * @param weightColOption The name of a weight column. + * @param offsetColOption The name of a offset column. + * @return Calculated binomial metrics + */ + def calculate( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String = "detailed_prediction", + labelCol: String = "label", + weightColOption: Option[String] = None, + offsetColOption: Option[String] = None): H2OBinomialMetrics = { + validateDataFrameForMetricCalculation(dataFrame, domain, predictionCol, labelCol, offsetColOption, weightColOption) + val getMetricBuilder = () => new MetricBuilderBinomial(domain) + val castedLabelDF = dataFrame.withColumn(labelCol, col(labelCol) cast StringType) + + val gson = + getMetricGson(getMetricBuilder, castedLabelDF, predictionCol, labelCol, offsetColOption, weightColOption, domain) + val result = new H2OBinomialMetrics() + result.setMetrics(gson, "H2OBinomialMetrics.calculate") + result + } + + // The method serves for call from Python API + def calculateInternal( + dataFrame: DataFrame, + domain: java.util.ArrayList[String], + predictionCol: String, + labelCol: String, + weightCol: String, + offsetCol: String): H2OBinomialMetrics = { + calculate( + dataFrame, + domain.toArray[String](new Array[String](0)), + predictionCol, + labelCol, + Option(weightCol), + Option(offsetCol)) + } + + // The method serves for call from R API + def calculateInternal( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String, + labelCol: String, + weightCol: String, + offsetCol: String): H2OBinomialMetrics = { + calculate(dataFrame, domain, predictionCol, labelCol, Option(weightCol), Option(offsetCol)) + } + + private val unusedLabelIndex: Double = -1.0 + + override protected def getPredictionValues(dataType: DataType, domain: Array[String], row: Row): Array[Double] = { + dataType match { + case StructType(fields) + if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == 2 => + val predictionStructure = row.getStruct(0) + val prediction = predictionStructure.getString(0) + val index = domain.indexOf(prediction).toDouble + val probabilities = predictionStructure.getStruct(1) + Array(index) ++ probabilities.toSeq.map(_.asInstanceOf[Double]) + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == 2 => + val probabilities = row.getStruct(0) + Array(unusedLabelIndex) ++ probabilities.toSeq.map(_.asInstanceOf[Double]) + case DoubleType => probabilityToArray(row.getDouble(0)) + case FloatType => probabilityToArray(row.getFloat(0).toDouble) + } + } + + private def probabilityToArray(probability: Double): Array[Double] = { + Array[Double](unusedLabelIndex, 1 - probability, probability) + } + + override protected def getActualValue(dataType: DataType, domain: Array[String], row: Row): Double = { + val label = row.getString(1) + domain.indexOf(label).toDouble + } + + override protected def validateDataFrameForMetricCalculation( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String, + labelCol: String, + offsetColOption: Option[String], + weightColOption: Option[String]): Unit = { + super.validateDataFrameForMetricCalculation( + dataFrame, + domain, + predictionCol, + labelCol, + offsetColOption, + weightColOption) + val predictionType = dataFrame.schema.fields.find(_.name == predictionCol).get.dataType + val isPredictionTypeValid = predictionType match { + case StructType(fields) + if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == 2 => + true + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == 2 => true + case DoubleType => true + case FloatType => true + case _ => false + } + if (!isPredictionTypeValid) { + throw new IllegalArgumentException( + s"The type of the prediction column '$predictionCol' is not valid. " + + "The prediction column must have the same type as a detailed_prediction column coming from the transform " + + "method of H2OMOJOModel descendant or a array type or vector of doubles. Or the type must be " + + "FloatType/DoubleType where values represent probabilities of positive response.") + } + } +} diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMetrics.scala index 3c6a696881..7e145aaece 100644 --- a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMetrics.scala +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMetrics.scala @@ -55,13 +55,13 @@ object H2OMetrics { dataFrameSerializerGetter: () => String): H2OMetrics = { val metricsObject = modelCategory match { - case H2OModelCategory.Binomial if algoName == "glm" => new H2OBinomialGLMMetrics() + case H2OModelCategory.Binomial if Set("glm", "gam").contains(algoName) => new H2OBinomialGLMMetrics() case H2OModelCategory.Binomial => new H2OBinomialMetrics() - case H2OModelCategory.Multinomial if algoName == "glm" => new H2OMultinomialGLMMetrics() + case H2OModelCategory.Multinomial if Set("glm", "gam").contains(algoName) => new H2OMultinomialGLMMetrics() case H2OModelCategory.Multinomial => new H2OMultinomialMetrics() - case H2OModelCategory.Ordinal if algoName == "glm" => new H2OOrdinalGLMMetrics() + case H2OModelCategory.Ordinal if Set("glm", "gam").contains(algoName) => new H2OOrdinalGLMMetrics() case H2OModelCategory.Ordinal => new H2OOrdinalMetrics() - case H2OModelCategory.Regression if algoName == "glm" => new H2ORegressionGLMMetrics() + case H2OModelCategory.Regression if Set("glm", "gam").contains(algoName) => new H2ORegressionGLMMetrics() case H2OModelCategory.Regression => new H2ORegressionMetrics() case H2OModelCategory.Clustering => new H2OClusteringMetrics() case H2OModelCategory.AnomalyDetection => new H2OAnomalyMetrics() diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala new file mode 100644 index 0000000000..05b6f85584 --- /dev/null +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import hex.ModelMetricsMultinomial.MetricBuilderMultinomial +import hex.MultinomialAucType +import hex.genmodel.GenModel +import org.apache.spark.{ExposeUtils, ml, mllib} +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, StringType, StructType} + +@MetricsDescription( + description = + "The class makes available all metrics that shared across all algorithms supporting multinomial classification.") +class H2OMultinomialMetrics(override val uid: String) extends H2OMultinomialMetricsBase(uid) { + + def this() = this(Identifiable.randomUID("H2OMultinomialMetrics")) +} + +object H2OMultinomialMetrics extends MetricCalculation { + + /** + * The method calculates multinomial metrics on a provided data frame with predictions and actual values. + * + * @param dataFrame A data frame with predictions and actual values. + * @param domain Array of response classes. + * @param predictionCol The name of prediction column. The prediction column must have the same type as + * a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or + * a array type or vector of doubles where particular arrays represent class probabilities. + * The order of probabilities must correspond to the order of labels in the passed domain. + * @param labelCol The name of label column that contains actual values. + * @param weightColOption The name of a weight column. + * @param aucType Type of multinomial AUC/AUCPR calculation. Possible values: + * - AUTO, + * - NONE, + * - MACRO_OVR, + * - WEIGHTED_OVR, + * - MACRO_OVO, + * - WEIGHTED_OVO + * @return Calculated multinomial metrics + */ + def calculate( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String = "detailed_prediction", + labelCol: String = "label", + weightColOption: Option[String] = None, + aucType: String = "AUTO"): H2OMultinomialMetrics = { + validateDataFrameForMetricCalculation(dataFrame, domain, predictionCol, labelCol, None, weightColOption) + val aucTypeEnum = MultinomialAucType.valueOf(aucType) + val nclasses = domain.length + val getMetricBuilder = + () => new MetricBuilderMultinomial(nclasses, domain, aucTypeEnum) + val castedLabelDF = dataFrame.withColumn(labelCol, col(labelCol) cast StringType) + + val gson = + getMetricGson(getMetricBuilder, castedLabelDF, predictionCol, labelCol, None, weightColOption, domain) + val result = new H2OMultinomialMetrics() + result.setMetrics(gson, "H2OMultinomialMetrics.calculate") + result + } + + // The method serves for call from Python API + def calculateInternal( + dataFrame: DataFrame, + domain: java.util.ArrayList[String], + predictionCol: String, + labelCol: String, + weightCol: String, + aucType: String): H2OMultinomialMetrics = { + calculate( + dataFrame, + domain.toArray[String](new Array[String](0)), + predictionCol, + labelCol, + Option(weightCol), + aucType) + } + + // The method serves for call from R API + def calculateInternal( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String, + labelCol: String, + weightCol: String, + aucType: String): H2OMultinomialMetrics = { + calculate(dataFrame, domain, predictionCol, labelCol, Option(weightCol), aucType) + } + + override protected def getPredictionValues(dataType: DataType, domain: Array[String], row: Row): Array[Double] = { + dataType match { + case StructType(fields) + if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == domain.length => + val predictionStructure = row.getStruct(0) + val prediction = predictionStructure.getString(0) + val index = domain.indexOf(prediction).toDouble + val probabilities = predictionStructure.getStruct(1) + + Array(index) ++ probabilities.toSeq.map(_.asInstanceOf[Double]) + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == domain.length => + val probabilities = row.getStruct(0).toSeq.map(_.asInstanceOf[Double]).toArray + probabilitiesToPredictedValues(probabilities) + case ArrayType(DoubleType, _) => probabilitiesToPredictedValues(row.getSeq[Double](0).toArray) + case ArrayType(FloatType, _) => probabilitiesToPredictedValues(row.getSeq[Float](0).map(_.toDouble).toArray) + case v if ExposeUtils.isMLVectorUDT(v) => + probabilitiesToPredictedValues(row.getAs[ml.linalg.Vector](0).toDense.values) + case _: mllib.linalg.VectorUDT => + probabilitiesToPredictedValues(row.getAs[mllib.linalg.Vector](0).toDense.values) + } + } + + private def probabilitiesToPredictedValues(probabilities: Array[Double]): Array[Double] = { + val result = new Array[Double](probabilities.length + 1) + Array.copy(probabilities, 0, result, 1, probabilities.length) + result(0) = GenModel.getPredictionMultinomial(result, null, result).toDouble + result + } + + override protected def getActualValue(dataType: DataType, domain: Array[String], row: Row): Double = { + val label = row.getString(1) + domain.indexOf(label).toDouble + } + + override protected def validateDataFrameForMetricCalculation( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String, + labelCol: String, + offsetColOption: Option[String], + weightColOption: Option[String]): Unit = { + super.validateDataFrameForMetricCalculation( + dataFrame, + domain, + predictionCol, + labelCol, + offsetColOption, + weightColOption) + val predictionType = dataFrame.schema.fields.find(_.name == predictionCol).get.dataType + val isPredictionTypeValid = predictionType match { + case StructType(fields) + if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == domain.length => + true + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == domain.length => + true + case ArrayType(DoubleType, _) => true + case ArrayType(FloatType, _) => true + case v if ExposeUtils.isMLVectorUDT(v) => true + case _: mllib.linalg.VectorUDT => true + case _ => false + } + if (!isPredictionTypeValid) { + throw new IllegalArgumentException(s"The type of the prediction column '$predictionCol' is not valid. " + + "The prediction column must have the same type as a detailed_prediction column coming from the transform " + + "method of H2OMOJOModel descendant or a array type or vector of doubles where particular arrays represent " + + "class probabilities. The order of probabilities must correspond to the order of labels in the passed domain.") + } + } +} diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala new file mode 100644 index 0000000000..0bdd1a70ed --- /dev/null +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import hex.DistributionFactory +import hex.ModelMetricsRegression.MetricBuilderRegression +import hex.genmodel.utils.DistributionFamily +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.types._ + +@MetricsDescription( + description = "The class makes available all metrics that shared across all algorithms supporting regression.") +class H2ORegressionMetrics(override val uid: String) extends H2ORegressionMetricsBase(uid) { + + def this() = this(Identifiable.randomUID("H2ORegressionMetrics")) +} + +object H2ORegressionMetrics extends MetricCalculation { + + /** + * The method calculates regression metrics on a provided data frame with predictions and actual values. + * + * @param dataFrame A data frame with predictions and actual values + * @param predictionCol The name of prediction column. The prediction column must have the same type as + * a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or + * it must be of DoubleType or FloatType. + * @param labelCol The name of label column that contains actual values. + * @param weightColOption The name of a weight column. + * @param offsetColOption The name of a offset column. + * @return Calculated regression metrics + */ + def calculate( + dataFrame: DataFrame, + predictionCol: String = "detailed_prediction", + labelCol: String = "label", + weightColOption: Option[String] = None, + offsetColOption: Option[String] = None): H2ORegressionMetrics = { + validateDataFrameForMetricCalculation(dataFrame, null, predictionCol, labelCol, offsetColOption, weightColOption) + val getMetricBuilder = + () => new MetricBuilderRegression(DistributionFactory.getDistribution(DistributionFamily.AUTO)) + val castedLabelDF = dataFrame.withColumn(labelCol, col(labelCol) cast DoubleType) + val gson = + getMetricGson(getMetricBuilder, castedLabelDF, predictionCol, labelCol, offsetColOption, weightColOption, null) + val result = new H2ORegressionMetrics() + result.setMetrics(gson, "H2ORegressionMetrics.calculate") + result + } + + // The method serves for call from Python/R API + def calculateInternal( + dataFrame: DataFrame, + predictionCol: String, + labelCol: String, + weightCol: String, + offsetCol: String): H2ORegressionMetrics = { + calculate(dataFrame, predictionCol, labelCol, Option(weightCol), Option(offsetCol)) + } + + override protected def getPredictionValues(dataType: DataType, domain: Array[String], row: Row): Array[Double] = { + dataType match { + case StructType(fields) if fields.head.dataType == DoubleType => Array(row.getStruct(0).getDouble(0)) + case DoubleType => Array(row.getDouble(0)) + case FloatType => Array(row.getFloat(0).toDouble) + } + } + + override protected def getActualValue(dataType: DataType, domain: Array[String], row: Row): Double = dataType match { + case DoubleType => row.getDouble(1) + } + + override protected def validateDataFrameForMetricCalculation( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String, + labelCol: String, + offsetColOption: Option[String], + weightColOption: Option[String]): Unit = { + super.validateDataFrameForMetricCalculation( + dataFrame, + domain, + predictionCol, + labelCol, + offsetColOption, + weightColOption) + val predictionType = dataFrame.schema.fields.find(_.name == predictionCol).get.dataType + val isPredictionTypeValid = predictionType match { + case StructType(fields) if fields.head.dataType == DoubleType => true + case DoubleType => true + case FloatType => true + case _ => false + } + if (!isPredictionTypeValid) { + throw new IllegalArgumentException( + s"The type of the prediction column '$predictionCol' is not valid. " + + "The prediction column must have the same type as a detailed_prediction column coming from the transform " + + "method of H2OMOJOModel descendant or it must be of DoubleType or FloatType.") + } + + val labelType = dataFrame.schema.fields.find(_.name == labelCol).get.dataType + if (!labelType.isInstanceOf[NumericType]) { + throw new IllegalArgumentException(s"The label column '$labelCol' must be a numeric type.") + } + } +} diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala new file mode 100644 index 0000000000..d207b3a277 --- /dev/null +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.h2o.sparkling.ml.metrics + +import com.google.gson.{GsonBuilder, JsonObject} +import hex._ +import hex.ModelMetrics.MetricBuilder +import org.apache.spark.sql.{DataFrame, Row} +import water.api.{Schema, SchemaServer} +import water.api.schemas3._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions.{col, lit} + +trait MetricCalculation { + + protected def validateDataFrameForMetricCalculation( + dataFrame: DataFrame, + domain: Array[String], + predictionCol: String, + labelCol: String, + offsetColOption: Option[String], + weightColOption: Option[String]): Unit = { + + if (predictionCol != null && !dataFrame.columns.contains(predictionCol)) { + throw new IllegalArgumentException( + s"DataFrame passed as a parameter does not contain prediction column '$predictionCol'.") + } + + if (labelCol != null && !dataFrame.columns.contains(labelCol)) { + throw new IllegalArgumentException(s"DataFrame passed as a parameter does not contain label column '$labelCol'.") + } + + if (offsetColOption.isDefined) { + val offsetCol = offsetColOption.get + if (!dataFrame.columns.contains(offsetCol)) { + throw new IllegalArgumentException( + s"DataFrame passed as a parameter does not contain offset column '$offsetCol'.") + } + val offsetType = dataFrame.schema.fields.find(_.name == offsetCol).get.dataType + if (!offsetType.isInstanceOf[NumericType]) { + throw new IllegalArgumentException(s"The offset column '$offsetCol' must be a numeric type.") + } + } + + if (weightColOption.isDefined) { + val weightCol = weightColOption.get + if (!dataFrame.columns.contains(weightCol)) { + throw new IllegalArgumentException( + s"DataFrame passed as a parameter does not contain weight column '$weightCol'.") + } + val weightType = dataFrame.schema.fields.find(_.name == weightCol).get.dataType + if (!weightType.isInstanceOf[NumericType]) { + throw new IllegalArgumentException(s"The weight column '$weightType' must be a numeric type.") + } + } + } + + private def metricsToSchema(metrics: ModelMetrics): Schema[_, _] = { + val schemas = + MetricsCalculationTypeExtensions.SCHEMA_CLASSES.map(c => + Class.forName(c).getConstructor().newInstance().asInstanceOf[Schema[Nothing, Nothing]]) + schemas.foreach(SchemaServer.register) + val schema = SchemaServer.schema(3, metrics) + schema match { + case s: ModelMetricsBinomialV3[ModelMetricsBinomial, _] => + s.fillFromImpl(metrics.asInstanceOf[ModelMetricsBinomial]) + case s: ModelMetricsMultinomialV3[ModelMetricsMultinomial, _] => + s.fillFromImpl(metrics.asInstanceOf[ModelMetricsMultinomial]) + case s: ModelMetricsRegressionV3[ModelMetricsRegression, _] => + s.fillFromImpl(metrics.asInstanceOf[ModelMetricsRegression]) + } + schema + } + + protected def getPredictionValues(dataType: DataType, domain: Array[String], row: Row): Array[Double] + + protected def getActualValue(dataType: DataType, domain: Array[String], row: Row): Double + + protected def getMetricGson( + createMetricBuilder: () => MetricBuilder[_], + dataFrame: DataFrame, + predictionCol: String, + labelCol: String, + offsetColOption: Option[String], + weightColOption: Option[String], + domain: Array[String]): JsonObject = { + val flatDF = dataFrame.select(col(predictionCol) as "prediction", col(labelCol) as "label", weightColOption match { + case Some(weightCol) => col(weightCol) cast DoubleType as "weight" + case None => lit(1.0d) as "weight" + }, offsetColOption match { + case Some(offsetCol) => col(offsetCol) cast DoubleType as "offset" + case None => lit(0.0d) as "offset" + }) + val predictionType = flatDF.schema.fields(0).dataType + val actualType = flatDF.schema.fields(1).dataType + val filledMetricsBuilder = flatDF.rdd + .mapPartitions[MetricBuilder[_]] { rows => + val metricBuilder = createMetricBuilder() + while (rows.hasNext) { + val row = rows.next() + val prediction = getPredictionValues(predictionType, domain, row) + val actualValue: Double = getActualValue(actualType, domain, row) + val weight = row.getDouble(2) + val offset = row.getDouble(3) + metricBuilder.perRow(prediction, Array(actualValue.toFloat), weight, offset, null) + } + Iterator.single(metricBuilder) + } + .reduce((f, s) => { f.reduce(s); f }) + + filledMetricsBuilder.postGlobal() + + // Setting parameters of makeModelMetrics to null since they are required only by H2O runtime + val model = null + val frame = null + val adaptedFrame = null + val predictions = null + val metrics = filledMetricsBuilder.makeModelMetrics(model, frame, adaptedFrame, predictions) + + val schema = metricsToSchema(metrics) + val json = schema.toJsonString + new GsonBuilder().create().fromJson(json, classOf[JsonObject]) + } +}