diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/BinomialMetricsTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/BinomialMetricsTestSuite.scala index dbd1ce0916..5a36495647 100644 --- a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/BinomialMetricsTestSuite.scala +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/BinomialMetricsTestSuite.scala @@ -20,8 +20,8 @@ package ai.h2o.sparkling.ml.metrics import ai.h2o.sparkling.ml.algos._ import ai.h2o.sparkling.ml.models.{H2OGBMMOJOModel, H2OGLMMOJOModel, H2OMOJOModel} import ai.h2o.sparkling.{SharedH2OTestContext, TestUtils} -import org.apache.spark.sql.functions.rand -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.{rand, col} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -196,6 +196,42 @@ class BinomialMetricsTestSuite extends FunSuite with Matchers with SharedH2OTest validationMetricsTolerance) } + test(s"test calculation of binomial $algorithmName metrics with probabilities passed to predictionCol") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("AGE", "RACE", "DPROS", "DCAPS", "PSA", "VOL", "GLEASON") + .setLabelCol("CAPSULE") + + val model = algorithm.fit(trainingDataset) + val domain = model.getDomainValues()("CAPSULE") + + def extractProbability(df: DataFrame): DataFrame = { + df.withColumn("probability", col(s"detailed_prediction.probabilities.${domain(1)}")) + } + + val trainingMetricObject = + H2OBinomialMetrics.calculate( + extractProbability(model.transform(trainingDataset)), + domain, + labelCol = "CAPSULE", + predictionCol = "probability") + val validationMetricObject = + H2OBinomialMetrics.calculate( + extractProbability(model.transform(validationDataset)), + domain, + labelCol = "CAPSULE", + predictionCol = "probability") + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + test(s"test calculation of binomial $algorithmName metrics with weightCol set on arbitrary dataset") { val algorithm = algorithmGetter() algorithm diff --git a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MultinomialMetricsTestSuite.scala b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MultinomialMetricsTestSuite.scala index 02684ee4fe..15a35ba40d 100644 --- a/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MultinomialMetricsTestSuite.scala +++ b/ml/src/test/scala/ai/h2o/sparkling/ml/metrics/MultinomialMetricsTestSuite.scala @@ -20,11 +20,13 @@ package ai.h2o.sparkling.ml.metrics import ai.h2o.sparkling.ml.algos._ import ai.h2o.sparkling.ml.models.{H2OGBMMOJOModel, H2OGLMMOJOModel, H2OMOJOModel} import ai.h2o.sparkling.{SharedH2OTestContext, TestUtils} +import hex.genmodel.GenModel import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions.{monotonically_increasing_id, rand} +import org.apache.spark.sql.functions.{monotonically_increasing_id, rand, udf} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{FunSuite, Matchers} +import org.apache.spark.sql.functions.{array, col} @RunWith(classOf[JUnitRunner]) class MultinomialMetricsTestSuite extends FunSuite with Matchers with SharedH2OTestContext { @@ -189,6 +191,47 @@ class MultinomialMetricsTestSuite extends FunSuite with Matchers with SharedH2OT validationMetricsTolerance) } + test(s"test calculation of multinomial $algorithmName metrics with just probabilities") { + val algorithm = algorithmGetter() + algorithm + .setValidationDataFrame(validationDataset) + .set(algorithm.getParam("seed"), 1L) + .setFeaturesCols("sepal_len", "sepal_wid", "petal_len", "petal_wid") + .setColumnsToCategorical("class") + .set(algorithm.getParam("aucType"), "MACRO_OVR") + .setLabelCol("class") + + val model = algorithm.fit(trainingDataset) + val priorClassDistribution = model.unwrapMojoModel()._priorClassDistrib + val domain = model.getDomainValues()("class") + def extractProbabilities(df: DataFrame) = { + val columns = domain.map(label => col(s"detailed_prediction.probabilities.$label")) + df.withColumn("probabilities", array(columns: _*)) + } + + val trainingMetricObject = + H2OMultinomialMetrics.calculate( + extractProbabilities(model.transform(trainingDataset)), + domain, + labelCol = "class", + predictionCol = "probabilities", + aucType = "MACRO_OVR") + val validationMetricObject = + H2OMultinomialMetrics.calculate( + extractProbabilities(model.transform(validationDataset)), + domain, + labelCol = "class", + predictionCol = "probabilities", + aucType = "MACRO_OVR") + + assertMetrics( + model, + trainingMetricObject, + validationMetricObject, + trainingMetricsTolerance, + validationMetricsTolerance) + } + test(s"test calculation of multinomial $algorithmName metrics with weightCol set on arbitrary dataset") { val algorithm = algorithmGetter() algorithm diff --git a/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.py b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.py index be6a87ef17..52b2659549 100644 --- a/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.py +++ b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.py @@ -36,9 +36,8 @@ def calculate(dataFrame, :param domain: A list of classes representing negative and positive response. Negative class must at position 0 and positive at 1 :param predictionCol: The name of prediction column. The prediction column must have the same type as - a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or a array type or - vector of doubles. First item is must be 0.0 or 1.0 representing negative or positive response. The other items - must be probabilities to predict given probability classes. + a detailed_prediction column coming from the transform method of H2OMOJOModel descendant. Or the type must + be FloatType/DoubleType where values represent probabilities of the positive response. :param labelCol: The name of label column that contains actual values. :param weightCol: The name of a weight column. :param offsetCol: The name of a offset column. diff --git a/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.py b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.py index 7a7854b455..8e41d56f3f 100644 --- a/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.py +++ b/py-scoring/src/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.py @@ -35,9 +35,9 @@ def calculate(dataFrame, :param dataFrame: A data frame with predictions and actual values. :param domain: List of response classes. :param predictionCol: The name of prediction column. The prediction column must have the same type as - a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or a array type or - vector of doubles. First item is must be 0.0, 1.0, 2.0 representing indexes of response classes. The other - items must be probabilities to predict given probability classes. + a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or + a array type or vector of doubles where particular arrays represent class probabilities. + The order of probabilities must correspond to the order of labels in the passed domain. :param labelCol: The name of label column that contains actual values. :param weightCol: The name of a weight column. :param aucType: Type of multinomial AUC/AUCPR calculation. Possible values: diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala index 9c54fd4f06..2d78d7706c 100644 --- a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OBinomialMetrics.scala @@ -18,7 +18,6 @@ package ai.h2o.sparkling.ml.metrics import hex.ModelMetricsBinomial.MetricBuilderBinomial -import org.apache.spark.{ExposeUtils, ml, mllib} import org.apache.spark.ml.util.Identifiable import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row} @@ -41,10 +40,9 @@ object H2OBinomialMetrics extends MetricCalculation { * @param domain Array of classes representing negative and positive response. Negative class must at position 0 and * positive at 1. * @param predictionCol The name of prediction column. The prediction column must have the same type as - * a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or - * a array type or vector of doubles. First item is must be 0.0 or 1.0 representing - * negative or positive response. The other items must be probabilities to predict given probability - * classes. + * a detailed_prediction column coming from the transform method of H2OMOJOModel descendant. + * Or the type must be FloatType/DoubleType where values represent probabilities of + * the positive response. * @param labelCol The name of label column that contains actual values. * @param weightColOption The name of a weight column. * @param offsetColOption The name of a offset column. @@ -57,7 +55,7 @@ object H2OBinomialMetrics extends MetricCalculation { labelCol: String = "label", weightColOption: Option[String] = None, offsetColOption: Option[String] = None): H2OBinomialMetrics = { - validateDataFrameForMetricCalculation(dataFrame, predictionCol, labelCol, offsetColOption, weightColOption) + validateDataFrameForMetricCalculation(dataFrame, domain, predictionCol, labelCol, offsetColOption, weightColOption) val getMetricBuilder = () => new MetricBuilderBinomial(domain) val castedLabelDF = dataFrame.withColumn(labelCol, col(labelCol) cast StringType) @@ -100,20 +98,25 @@ object H2OBinomialMetrics extends MetricCalculation { dataType match { case StructType(fields) if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && - fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) => + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == 2 => val predictionStructure = row.getStruct(0) val prediction = predictionStructure.getString(0) val index = domain.indexOf(prediction).toDouble val probabilities = predictionStructure.getStruct(1) - Array(index) ++ probabilities.toSeq.map(_.asInstanceOf[Double]) - case ArrayType(DoubleType, _) => row.getSeq[Double](0).toArray - case ArrayType(FloatType, _) => row.getSeq[Float](0).map(_.toDouble).toArray - case v if ExposeUtils.isMLVectorUDT(v) => row.getAs[ml.linalg.Vector](0).toDense.values - case _: mllib.linalg.VectorUDT => row.getAs[mllib.linalg.Vector](0).toDense.values + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == 2 => + val probabilities = row.getStruct(0) + Array(-1.0) ++ probabilities.toSeq.map(_.asInstanceOf[Double]) + case DoubleType => probabilityToArray(row.getDouble(0)) + case FloatType => probabilityToArray(row.getFloat(0).toDouble) } } + private def probabilityToArray(probability: Double): Array[Double] = { + Array[Double](-1 /* unused */, 1 - probability, probability) + } + override protected def getActualValue(dataType: DataType, domain: Array[String], row: Row): Double = { val label = row.getString(1) domain.indexOf(label).toDouble @@ -121,29 +124,36 @@ object H2OBinomialMetrics extends MetricCalculation { override protected def validateDataFrameForMetricCalculation( dataFrame: DataFrame, + domain: Array[String], predictionCol: String, labelCol: String, offsetColOption: Option[String], weightColOption: Option[String]): Unit = { - super.validateDataFrameForMetricCalculation(dataFrame, predictionCol, labelCol, offsetColOption, weightColOption) + super.validateDataFrameForMetricCalculation( + dataFrame, + domain, + predictionCol, + labelCol, + offsetColOption, + weightColOption) val predictionType = dataFrame.schema.fields.find(_.name == predictionCol).get.dataType val isPredictionTypeValid = predictionType match { case StructType(fields) if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && - fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) => + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == 2 => true - case ArrayType(DoubleType, _) => true - case ArrayType(FloatType, _) => true - case v if ExposeUtils.isMLVectorUDT(v) => true - case _: mllib.linalg.VectorUDT => true + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == 2 => true + case DoubleType => true + case FloatType => true case _ => false } if (!isPredictionTypeValid) { - throw new IllegalArgumentException(s"The type of the prediction column '$predictionCol' is not valid. " + - "The prediction column must have the same type as a detailed_prediction column coming from the transform " + - "method of H2OMOJOModel descendant or a array type or vector of doubles. First item is must be 0.0 or 1.0" + - "representing negative or positive response. The other items must be probabilities to predict given probability" + - "classes.") + throw new IllegalArgumentException( + s"The type of the prediction column '$predictionCol' is not valid. " + + "The prediction column must have the same type as a detailed_prediction column coming from the transform " + + "method of H2OMOJOModel descendant or a array type or vector of doubles. Or the type must be " + + "FloatType/DoubleType where values represent probabilities of positive response.") } } } diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala index e7b75cdae0..05b6f85584 100644 --- a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2OMultinomialMetrics.scala @@ -19,10 +19,11 @@ package ai.h2o.sparkling.ml.metrics import hex.ModelMetricsMultinomial.MetricBuilderMultinomial import hex.MultinomialAucType +import hex.genmodel.GenModel import org.apache.spark.{ExposeUtils, ml, mllib} import org.apache.spark.ml.util.Identifiable import org.apache.spark.sql.functions.col -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, StringType, StructType} @MetricsDescription( @@ -42,9 +43,8 @@ object H2OMultinomialMetrics extends MetricCalculation { * @param domain Array of response classes. * @param predictionCol The name of prediction column. The prediction column must have the same type as * a detailed_prediction column coming from the transform method of H2OMOJOModel descendant or - * a array type or vector of doubles. First item is must be 0.0, 1.0, 2.0 representing - * indexes of response classes. The other items must be probabilities to predict given probability - * classes. + * a array type or vector of doubles where particular arrays represent class probabilities. + * The order of probabilities must correspond to the order of labels in the passed domain. * @param labelCol The name of label column that contains actual values. * @param weightColOption The name of a weight column. * @param aucType Type of multinomial AUC/AUCPR calculation. Possible values: @@ -63,7 +63,7 @@ object H2OMultinomialMetrics extends MetricCalculation { labelCol: String = "label", weightColOption: Option[String] = None, aucType: String = "AUTO"): H2OMultinomialMetrics = { - validateDataFrameForMetricCalculation(dataFrame, predictionCol, labelCol, None, weightColOption) + validateDataFrameForMetricCalculation(dataFrame, domain, predictionCol, labelCol, None, weightColOption) val aucTypeEnum = MultinomialAucType.valueOf(aucType) val nclasses = domain.length val getMetricBuilder = @@ -109,20 +109,33 @@ object H2OMultinomialMetrics extends MetricCalculation { dataType match { case StructType(fields) if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && - fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) => + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == domain.length => val predictionStructure = row.getStruct(0) val prediction = predictionStructure.getString(0) val index = domain.indexOf(prediction).toDouble val probabilities = predictionStructure.getStruct(1) Array(index) ++ probabilities.toSeq.map(_.asInstanceOf[Double]) - case ArrayType(DoubleType, _) => row.getSeq[Double](0).toArray - case ArrayType(FloatType, _) => row.getSeq[Float](0).map(_.toDouble).toArray - case v if ExposeUtils.isMLVectorUDT(v) => row.getAs[ml.linalg.Vector](0).toDense.values - case _: mllib.linalg.VectorUDT => row.getAs[mllib.linalg.Vector](0).toDense.values + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == domain.length => + val probabilities = row.getStruct(0).toSeq.map(_.asInstanceOf[Double]).toArray + probabilitiesToPredictedValues(probabilities) + case ArrayType(DoubleType, _) => probabilitiesToPredictedValues(row.getSeq[Double](0).toArray) + case ArrayType(FloatType, _) => probabilitiesToPredictedValues(row.getSeq[Float](0).map(_.toDouble).toArray) + case v if ExposeUtils.isMLVectorUDT(v) => + probabilitiesToPredictedValues(row.getAs[ml.linalg.Vector](0).toDense.values) + case _: mllib.linalg.VectorUDT => + probabilitiesToPredictedValues(row.getAs[mllib.linalg.Vector](0).toDense.values) } } + private def probabilitiesToPredictedValues(probabilities: Array[Double]): Array[Double] = { + val result = new Array[Double](probabilities.length + 1) + Array.copy(probabilities, 0, result, 1, probabilities.length) + result(0) = GenModel.getPredictionMultinomial(result, null, result).toDouble + result + } + override protected def getActualValue(dataType: DataType, domain: Array[String], row: Row): Double = { val label = row.getString(1) domain.indexOf(label).toDouble @@ -130,16 +143,26 @@ object H2OMultinomialMetrics extends MetricCalculation { override protected def validateDataFrameForMetricCalculation( dataFrame: DataFrame, + domain: Array[String], predictionCol: String, labelCol: String, offsetColOption: Option[String], weightColOption: Option[String]): Unit = { - super.validateDataFrameForMetricCalculation(dataFrame, predictionCol, labelCol, offsetColOption, weightColOption) + super.validateDataFrameForMetricCalculation( + dataFrame, + domain, + predictionCol, + labelCol, + offsetColOption, + weightColOption) val predictionType = dataFrame.schema.fields.find(_.name == predictionCol).get.dataType val isPredictionTypeValid = predictionType match { case StructType(fields) if fields(0).dataType == StringType && fields(1).dataType.isInstanceOf[StructType] && - fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) => + fields(1).dataType.asInstanceOf[StructType].fields.forall(_.dataType == DoubleType) && + fields(1).dataType.asInstanceOf[StructType].fields.length == domain.length => + true + case StructType(fields) if fields.forall(_.dataType == DoubleType) && fields.length == domain.length => true case ArrayType(DoubleType, _) => true case ArrayType(FloatType, _) => true @@ -150,9 +173,8 @@ object H2OMultinomialMetrics extends MetricCalculation { if (!isPredictionTypeValid) { throw new IllegalArgumentException(s"The type of the prediction column '$predictionCol' is not valid. " + "The prediction column must have the same type as a detailed_prediction column coming from the transform " + - "method of H2OMOJOModel descendant or a array type or vector of doubles. First item is must be 0.0, 1.0, 2.0 " + - "representing indexes of response classes. The other items must be probabilities to predict given " + - "probability classes.") + "method of H2OMOJOModel descendant or a array type or vector of doubles where particular arrays represent " + + "class probabilities. The order of probabilities must correspond to the order of labels in the passed domain.") } } } diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala index 54e7ac3209..0bdd1a70ed 100644 --- a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/H2ORegressionMetrics.scala @@ -48,11 +48,11 @@ object H2ORegressionMetrics extends MetricCalculation { */ def calculate( dataFrame: DataFrame, - predictionCol: String = "prediction", + predictionCol: String = "detailed_prediction", labelCol: String = "label", weightColOption: Option[String] = None, offsetColOption: Option[String] = None): H2ORegressionMetrics = { - validateDataFrameForMetricCalculation(dataFrame, predictionCol, labelCol, offsetColOption, weightColOption) + validateDataFrameForMetricCalculation(dataFrame, null, predictionCol, labelCol, offsetColOption, weightColOption) val getMetricBuilder = () => new MetricBuilderRegression(DistributionFactory.getDistribution(DistributionFamily.AUTO)) val castedLabelDF = dataFrame.withColumn(labelCol, col(labelCol) cast DoubleType) @@ -87,12 +87,18 @@ object H2ORegressionMetrics extends MetricCalculation { override protected def validateDataFrameForMetricCalculation( dataFrame: DataFrame, + domain: Array[String], predictionCol: String, labelCol: String, offsetColOption: Option[String], weightColOption: Option[String]): Unit = { - super.validateDataFrameForMetricCalculation(dataFrame, predictionCol, labelCol, offsetColOption, weightColOption) - + super.validateDataFrameForMetricCalculation( + dataFrame, + domain, + predictionCol, + labelCol, + offsetColOption, + weightColOption) val predictionType = dataFrame.schema.fields.find(_.name == predictionCol).get.dataType val isPredictionTypeValid = predictionType match { case StructType(fields) if fields.head.dataType == DoubleType => true diff --git a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala index 169b054cc1..6d6274978d 100644 --- a/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala +++ b/scoring/src/main/scala/ai/h2o/sparkling/ml/metrics/MetricCalculation.scala @@ -30,6 +30,7 @@ trait MetricCalculation { protected def validateDataFrameForMetricCalculation( dataFrame: DataFrame, + domain: Array[String], predictionCol: String, labelCol: String, offsetColOption: Option[String],