From b3eaa6efa2ef410a9c6a0d7d0c934d6190411e18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Novotn=C3=BD?= Date: Tue, 5 Oct 2021 17:13:32 +0200 Subject: [PATCH 1/6] [SW-2622] Avoid Multiple Materialization of Spark DataFrame During Conversion to H2OFrame --- .../h2o/sparkling/backend/H2OAwareRDD.scala | 2 +- .../ai/h2o/sparkling/backend/H2OChunk.scala | 2 - .../ai/h2o/sparkling/backend/Writer.scala | 53 +++++++------------ .../backend/utils/H2OClientUtils.scala | 3 +- .../DataFrameConverterTestSuite.scala | 11 ++++ .../extensions/rest/api/ChunkServlet.scala | 9 +--- .../serde/ChunkAutoBufferReader.scala | 8 +-- 7 files changed, 39 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala b/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala index c227699576..909ef9007b 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala @@ -21,7 +21,7 @@ import org.apache.spark.{Partition, TaskContext} import scala.reflect.ClassTag -private[backend] class H2OAwareRDD[U: ClassTag](nodes: Array[NodeDesc], prev: RDD[U]) +private[backend] class H2OAwareRDD[U: ClassTag](val nodes: Array[NodeDesc], prev: RDD[U]) extends H2OAwareBaseRDD[U](nodes, prev) { override def compute(split: Partition, context: TaskContext): Iterator[U] = prev.compute(split, context) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala b/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala index 685be35371..8427cea404 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala @@ -58,7 +58,6 @@ private[sparkling] object H2OChunk extends RestCommunication { node: NodeDesc, conf: H2OConf, frameName: String, - numRows: Int, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): OutputStream = { @@ -67,7 +66,6 @@ private[sparkling] object H2OChunk extends RestCommunication { val parameters = Map( "frame_name" -> frameName, - "num_rows" -> numRows, "chunk_id" -> chunkId, "expected_types" -> expectedTypesString, "maximum_vector_sizes" -> maxVecSizesString, diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala index 978aefe9d3..8a5f468827 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala @@ -33,20 +33,21 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.{ExposeUtils, TaskContext, ml, mllib} -private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, numRows: Int, chunkId: Int) +private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) extends Closeable { private val outputStream = H2OChunk.putChunk( nodeDesc, metadata.conf, metadata.frameId, - numRows, chunkId, metadata.expectedTypes, metadata.maxVectorSizes) private val chunkWriter = new ChunkAutoBufferWriter(outputStream) + def setHasNext(data: Boolean): Unit = put(data) + def put(data: Boolean): Unit = chunkWriter.writeBoolean(data) def put(data: Byte): Unit = chunkWriter.writeByte(data) @@ -91,13 +92,12 @@ private[backend] object Writer { def convert(rdd: H2OAwareRDD[Row], colNames: Array[String], metadata: WriterMetadata): H2OFrame = { H2OFrame.initializeFrame(metadata.conf, metadata.frameId, colNames) - val partitionSizes = getNonEmptyPartitionSizes(rdd) - val nonEmptyPartitions = getNonEmptyPartitions(partitionSizes) + val numPartitions = rdd.getNumPartitions - val uploadPlan = getUploadPlan(metadata.conf, nonEmptyPartitions.length) - val operation: SparkJob = perDataFramePartition(metadata, uploadPlan, nonEmptyPartitions, partitionSizes) - val rows = SparkSessionUtils.active.sparkContext.runJob(rdd, operation, nonEmptyPartitions) - val res = new Array[Long](nonEmptyPartitions.size) + val uploadPlan = getUploadPlan(metadata.conf, numPartitions) + val operation: SparkJob = perDataFramePartition(metadata, uploadPlan) + val rows = SparkSessionUtils.active.sparkContext.runJob(rdd, operation) + val res = new Array[Long](numPartitions) rows.foreach { case (chunkIdx, numRows) => res(chunkIdx) = numRows } val types = SerdeUtils.expectedTypesToVecTypes(metadata.expectedTypes, metadata.maxVectorSizes) H2OFrame.finalizeFrame(metadata.conf, metadata.frameId, res, types) @@ -106,17 +106,18 @@ private[backend] object Writer { private def perDataFramePartition( metadata: WriterMetadata, - uploadPlan: UploadPlan, - partitions: Seq[Int], - partitionSizes: Map[Int, Int])(context: TaskContext, it: Iterator[Row]): (Int, Long) = { - val chunkIdx = partitions.indexOf(context.partitionId()) - val numRows = partitionSizes(context.partitionId()) + uploadPlan: UploadPlan)(context: TaskContext, it: Iterator[Row]): (Int, Long) = { + val chunkIdx = context.partitionId() + var numRows = 0 val domainBuilder = new CategoricalDomainBuilder(metadata.expectedTypes) val h2oNode = uploadPlan(chunkIdx) - withResource(new Writer(h2oNode, metadata, numRows, chunkIdx)) { writer => + withResource(new Writer(h2oNode, metadata, chunkIdx)) { writer => it.foreach { row => + writer.setHasNext(true) sparkRowToH2ORow(row, writer, metadata, domainBuilder) + numRows += 1 } + writer.setHasNext(false) } H2OChunk.putChunkCategoricalDomains(h2oNode, metadata.conf, metadata.frameId, chunkIdx, domainBuilder.getDomains()) (chunkIdx, numRows) @@ -161,30 +162,16 @@ private[backend] object Writer { } } - private def getNonEmptyPartitionSizes[T](rdd: RDD[T]): Map[Int, Int] = { - rdd - .mapPartitionsWithIndex { - case (idx, it) => - if (it.nonEmpty) { - Iterator.single((idx, it.size)) - } else { - Iterator.empty - } - } - .collect() - .toMap - } - - private def getNonEmptyPartitions(partitionSizes: Map[Int, Int]): Seq[Int] = { - partitionSizes.keys.toSeq.sorted - } - private def getUploadPlan(conf: H2OConf, numberOfPartitions: Int): UploadPlan = { val endpoint = getClusterEndpoint(conf) val parameters = Map("number_of_chunks" -> numberOfPartitions) val rawPlan = query[UploadPlanV3](endpoint, Paths.UPLOAD_PLAN, conf, parameters) rawPlan.layout.map { chunkAssignment => - chunkAssignment.chunk_id -> NodeDesc(chunkAssignment.node_idx.toString, chunkAssignment.ip, chunkAssignment.port) + chunkAssignment.chunk_id -> NodeDesc( + chunkAssignment.node_idx.toString, + chunkAssignment.ip, + chunkAssignment.port, + chunkAssignment.cpus_allowed) }.toMap } } diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala b/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala index 2c97e5cd21..5e81288f82 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala @@ -66,7 +66,7 @@ object H2OClientUtils extends SharedBackendUtils { .buildArgs() } - def startH2OClient(hc: H2OContext, conf: H2OConf, nodes: Array[NodeDesc]): NodeDesc = { + def startH2OClient(hc: H2OContext, conf: H2OConf, nodes: Array[NodeDesc]): Unit = { if (conf.runsInExternalClusterMode) { setClientIp(conf) } else { @@ -91,7 +91,6 @@ object H2OClientUtils extends SharedBackendUtils { } } } - NodeDesc(SparkEnv.get.executorId, H2O.SELF_ADDRESS.getHostAddress, H2O.API_PORT) } /** diff --git a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala index 8b04e6014b..c76a5a95db 100644 --- a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala +++ b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala @@ -252,6 +252,17 @@ class DataFrameConverterTestSuite extends FunSuite with SharedH2OTestContext { h2oFrame.delete() } + test("DataFrame is evaluated only once during conversion to H2OFrame") { + val numberOfEvaluationsAccumulator = spark.sparkContext.longAccumulator("NumberOfEvaluationsAccumulator") + val baseDF = spark.range(1000).repartition(1) + val finalDF = baseDF.mapPartitions { partitions => numberOfEvaluationsAccumulator.add(1); partitions } + + val h2oFrame = hc.asH2OFrame(finalDF) + h2oFrame.delete() + + numberOfEvaluationsAccumulator.value shouldEqual 1 + } + test("DataFrame[FloatField] to H2OFrame[Numeric]") { val values = Seq(Float.MinValue, Float.MaxValue, -33.33.toFloat, 200.001.toFloat, -5000.34.toFloat) val df = sc.parallelize(values).map(v => FloatField(v)).toDF diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala index b8d389657d..ed41ebab76 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala @@ -135,7 +135,6 @@ final class ChunkServlet extends ServletBase { private case class PUTRequestParameters( frameName: String, - numRows: Int, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int], @@ -160,8 +159,6 @@ final class ChunkServlet extends ServletBase { private object PUTRequestParameters { def parse(request: HttpServletRequest): PUTRequestParameters = { val frameName = getParameterAsString(request, "frame_name") - val numRowsString = getParameterAsString(request, "num_rows") - val numRows = numRowsString.toInt val chunkIdString = getParameterAsString(request, "chunk_id") val chunkId = chunkIdString.toInt val expectedTypesString = getParameterAsString(request, "expected_types") @@ -169,15 +166,14 @@ final class ChunkServlet extends ServletBase { val maximumVectorSizesString = getParameterAsString(request, "maximum_vector_sizes") val maxVecSizes = Base64Encoding.decodeToIntArray(maximumVectorSizesString) val compression = getParameterAsString(request, "compression") - PUTRequestParameters(frameName, numRows, chunkId, expectedTypes, maxVecSizes, compression) + PUTRequestParameters(frameName, chunkId, expectedTypes, maxVecSizes, compression) } } /* * The method handles handles PUT requests for the path /3/Chunk - * It requires 6 GET parameters + * It requires 5 GET parameters * - frame_name - a unique string identifier of H2O Frame - * - num_rows - a number of rows forming by the body of the request * - chunk_id - a unique identifier of the chunk within the H2O Frame * - expected_type - byte array encoded in Base64 encoding. The types corresponds to the `selected_columns` parameter * - maximum_vector_sizes - maximum vector sizes for each vector column encoded into Base64 encoding. @@ -193,7 +189,6 @@ final class ChunkServlet extends ServletBase { withResource(new ChunkAutoBufferReader(decompressed)) { reader => reader.readChunk( parameters.frameName, - parameters.numRows, parameters.chunkId, parameters.expectedTypes, parameters.maxVecSizes) diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala index 149866e34b..c6c021e0e8 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala @@ -31,7 +31,6 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl def readChunk( frameName: String, - numRows: Int, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): Unit = { @@ -39,8 +38,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl val elementSizes = getElementSizes(expectedTypes, maxVecSizes) val startPositions = getStartPositions(elementSizes) val chunks = ChunkUtils.createNewChunks(frameName, vecTypes, chunkId) - var rowIdx = 0 - while (rowIdx < numRows) { + while (readHasNext()) { var typeIdx = 0 while (typeIdx < expectedTypes.length) { expectedTypes(typeIdx) match { @@ -60,7 +58,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl } typeIdx += 1 } - rowIdx += 1 + //rowIdx += 1 } ChunkUtils.closeNewChunks(chunks) } @@ -142,6 +140,8 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl } } + def readHasNext() = readBoolean() + def readBoolean(): Boolean = { val data = buffer.getZ isLastNAVar = isNA(data) From a9af31f2ad5e46c47b4e8942162ab5f5815cc2a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Novotn=C3=BD?= Date: Mon, 11 Oct 2021 16:24:05 +0200 Subject: [PATCH 2/6] spotless apply --- .../src/main/scala/ai/h2o/sparkling/backend/Writer.scala | 9 ++++----- .../backend/converters/DataFrameConverterTestSuite.scala | 4 +++- .../h2o/sparkling/extensions/rest/api/ChunkServlet.scala | 6 +----- .../extensions/serde/ChunkAutoBufferReader.scala | 6 +----- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala index 8a5f468827..78e3e027cc 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala @@ -33,8 +33,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.{ExposeUtils, TaskContext, ml, mllib} -private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) - extends Closeable { +private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) extends Closeable { private val outputStream = H2OChunk.putChunk( nodeDesc, @@ -104,9 +103,9 @@ private[backend] object Writer { H2OFrame(metadata.frameId) } - private def perDataFramePartition( - metadata: WriterMetadata, - uploadPlan: UploadPlan)(context: TaskContext, it: Iterator[Row]): (Int, Long) = { + private def perDataFramePartition(metadata: WriterMetadata, uploadPlan: UploadPlan)( + context: TaskContext, + it: Iterator[Row]): (Int, Long) = { val chunkIdx = context.partitionId() var numRows = 0 val domainBuilder = new CategoricalDomainBuilder(metadata.expectedTypes) diff --git a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala index c76a5a95db..3d62b83319 100644 --- a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala +++ b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala @@ -255,7 +255,9 @@ class DataFrameConverterTestSuite extends FunSuite with SharedH2OTestContext { test("DataFrame is evaluated only once during conversion to H2OFrame") { val numberOfEvaluationsAccumulator = spark.sparkContext.longAccumulator("NumberOfEvaluationsAccumulator") val baseDF = spark.range(1000).repartition(1) - val finalDF = baseDF.mapPartitions { partitions => numberOfEvaluationsAccumulator.add(1); partitions } + val finalDF = baseDF.mapPartitions { partitions => + numberOfEvaluationsAccumulator.add(1); partitions + } val h2oFrame = hc.asH2OFrame(finalDF) h2oFrame.delete() diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala index ed41ebab76..fc36e91ccb 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala @@ -187,11 +187,7 @@ final class ChunkServlet extends ServletBase { withResource(request.getInputStream) { inputStream => withResource(Compression.decompress(parameters.compression, inputStream)) { decompressed => withResource(new ChunkAutoBufferReader(decompressed)) { reader => - reader.readChunk( - parameters.frameName, - parameters.chunkId, - parameters.expectedTypes, - parameters.maxVecSizes) + reader.readChunk(parameters.frameName, parameters.chunkId, parameters.expectedTypes, parameters.maxVecSizes) } } } diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala index c6c021e0e8..9a1b523c9e 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala @@ -29,11 +29,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl private val buffer = new AutoBuffer(inputStream) private var isLastNAVar: Boolean = false - def readChunk( - frameName: String, - chunkId: Int, - expectedTypes: Array[ExpectedType], - maxVecSizes: Array[Int]): Unit = { + def readChunk(frameName: String, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): Unit = { val vecTypes = SerdeUtils.expectedTypesToVecTypes(expectedTypes, maxVecSizes) val elementSizes = getElementSizes(expectedTypes, maxVecSizes) val startPositions = getStartPositions(elementSizes) From 9b3ebf65a0775b8d5b4055c8ef33ba4716242ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Novotn=C3=BD?= Date: Tue, 5 Oct 2021 17:13:32 +0200 Subject: [PATCH 3/6] [SW-2622] Avoid Multiple Materialization of Spark DataFrame During Conversion to H2OFrame --- .../h2o/sparkling/backend/H2OAwareRDD.scala | 2 +- .../ai/h2o/sparkling/backend/H2OChunk.scala | 2 - .../ai/h2o/sparkling/backend/Writer.scala | 53 +++++++------------ .../backend/utils/H2OClientUtils.scala | 3 +- .../DataFrameConverterTestSuite.scala | 11 ++++ .../extensions/rest/api/ChunkServlet.scala | 9 +--- .../serde/ChunkAutoBufferReader.scala | 8 +-- 7 files changed, 39 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala b/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala index c227699576..909ef9007b 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/H2OAwareRDD.scala @@ -21,7 +21,7 @@ import org.apache.spark.{Partition, TaskContext} import scala.reflect.ClassTag -private[backend] class H2OAwareRDD[U: ClassTag](nodes: Array[NodeDesc], prev: RDD[U]) +private[backend] class H2OAwareRDD[U: ClassTag](val nodes: Array[NodeDesc], prev: RDD[U]) extends H2OAwareBaseRDD[U](nodes, prev) { override def compute(split: Partition, context: TaskContext): Iterator[U] = prev.compute(split, context) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala b/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala index 685be35371..8427cea404 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala @@ -58,7 +58,6 @@ private[sparkling] object H2OChunk extends RestCommunication { node: NodeDesc, conf: H2OConf, frameName: String, - numRows: Int, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): OutputStream = { @@ -67,7 +66,6 @@ private[sparkling] object H2OChunk extends RestCommunication { val parameters = Map( "frame_name" -> frameName, - "num_rows" -> numRows, "chunk_id" -> chunkId, "expected_types" -> expectedTypesString, "maximum_vector_sizes" -> maxVecSizesString, diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala index 978aefe9d3..8a5f468827 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala @@ -33,20 +33,21 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.{ExposeUtils, TaskContext, ml, mllib} -private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, numRows: Int, chunkId: Int) +private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) extends Closeable { private val outputStream = H2OChunk.putChunk( nodeDesc, metadata.conf, metadata.frameId, - numRows, chunkId, metadata.expectedTypes, metadata.maxVectorSizes) private val chunkWriter = new ChunkAutoBufferWriter(outputStream) + def setHasNext(data: Boolean): Unit = put(data) + def put(data: Boolean): Unit = chunkWriter.writeBoolean(data) def put(data: Byte): Unit = chunkWriter.writeByte(data) @@ -91,13 +92,12 @@ private[backend] object Writer { def convert(rdd: H2OAwareRDD[Row], colNames: Array[String], metadata: WriterMetadata): H2OFrame = { H2OFrame.initializeFrame(metadata.conf, metadata.frameId, colNames) - val partitionSizes = getNonEmptyPartitionSizes(rdd) - val nonEmptyPartitions = getNonEmptyPartitions(partitionSizes) + val numPartitions = rdd.getNumPartitions - val uploadPlan = getUploadPlan(metadata.conf, nonEmptyPartitions.length) - val operation: SparkJob = perDataFramePartition(metadata, uploadPlan, nonEmptyPartitions, partitionSizes) - val rows = SparkSessionUtils.active.sparkContext.runJob(rdd, operation, nonEmptyPartitions) - val res = new Array[Long](nonEmptyPartitions.size) + val uploadPlan = getUploadPlan(metadata.conf, numPartitions) + val operation: SparkJob = perDataFramePartition(metadata, uploadPlan) + val rows = SparkSessionUtils.active.sparkContext.runJob(rdd, operation) + val res = new Array[Long](numPartitions) rows.foreach { case (chunkIdx, numRows) => res(chunkIdx) = numRows } val types = SerdeUtils.expectedTypesToVecTypes(metadata.expectedTypes, metadata.maxVectorSizes) H2OFrame.finalizeFrame(metadata.conf, metadata.frameId, res, types) @@ -106,17 +106,18 @@ private[backend] object Writer { private def perDataFramePartition( metadata: WriterMetadata, - uploadPlan: UploadPlan, - partitions: Seq[Int], - partitionSizes: Map[Int, Int])(context: TaskContext, it: Iterator[Row]): (Int, Long) = { - val chunkIdx = partitions.indexOf(context.partitionId()) - val numRows = partitionSizes(context.partitionId()) + uploadPlan: UploadPlan)(context: TaskContext, it: Iterator[Row]): (Int, Long) = { + val chunkIdx = context.partitionId() + var numRows = 0 val domainBuilder = new CategoricalDomainBuilder(metadata.expectedTypes) val h2oNode = uploadPlan(chunkIdx) - withResource(new Writer(h2oNode, metadata, numRows, chunkIdx)) { writer => + withResource(new Writer(h2oNode, metadata, chunkIdx)) { writer => it.foreach { row => + writer.setHasNext(true) sparkRowToH2ORow(row, writer, metadata, domainBuilder) + numRows += 1 } + writer.setHasNext(false) } H2OChunk.putChunkCategoricalDomains(h2oNode, metadata.conf, metadata.frameId, chunkIdx, domainBuilder.getDomains()) (chunkIdx, numRows) @@ -161,30 +162,16 @@ private[backend] object Writer { } } - private def getNonEmptyPartitionSizes[T](rdd: RDD[T]): Map[Int, Int] = { - rdd - .mapPartitionsWithIndex { - case (idx, it) => - if (it.nonEmpty) { - Iterator.single((idx, it.size)) - } else { - Iterator.empty - } - } - .collect() - .toMap - } - - private def getNonEmptyPartitions(partitionSizes: Map[Int, Int]): Seq[Int] = { - partitionSizes.keys.toSeq.sorted - } - private def getUploadPlan(conf: H2OConf, numberOfPartitions: Int): UploadPlan = { val endpoint = getClusterEndpoint(conf) val parameters = Map("number_of_chunks" -> numberOfPartitions) val rawPlan = query[UploadPlanV3](endpoint, Paths.UPLOAD_PLAN, conf, parameters) rawPlan.layout.map { chunkAssignment => - chunkAssignment.chunk_id -> NodeDesc(chunkAssignment.node_idx.toString, chunkAssignment.ip, chunkAssignment.port) + chunkAssignment.chunk_id -> NodeDesc( + chunkAssignment.node_idx.toString, + chunkAssignment.ip, + chunkAssignment.port, + chunkAssignment.cpus_allowed) }.toMap } } diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala b/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala index 2c97e5cd21..5e81288f82 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/utils/H2OClientUtils.scala @@ -66,7 +66,7 @@ object H2OClientUtils extends SharedBackendUtils { .buildArgs() } - def startH2OClient(hc: H2OContext, conf: H2OConf, nodes: Array[NodeDesc]): NodeDesc = { + def startH2OClient(hc: H2OContext, conf: H2OConf, nodes: Array[NodeDesc]): Unit = { if (conf.runsInExternalClusterMode) { setClientIp(conf) } else { @@ -91,7 +91,6 @@ object H2OClientUtils extends SharedBackendUtils { } } } - NodeDesc(SparkEnv.get.executorId, H2O.SELF_ADDRESS.getHostAddress, H2O.API_PORT) } /** diff --git a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala index 8b04e6014b..c76a5a95db 100644 --- a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala +++ b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala @@ -252,6 +252,17 @@ class DataFrameConverterTestSuite extends FunSuite with SharedH2OTestContext { h2oFrame.delete() } + test("DataFrame is evaluated only once during conversion to H2OFrame") { + val numberOfEvaluationsAccumulator = spark.sparkContext.longAccumulator("NumberOfEvaluationsAccumulator") + val baseDF = spark.range(1000).repartition(1) + val finalDF = baseDF.mapPartitions { partitions => numberOfEvaluationsAccumulator.add(1); partitions } + + val h2oFrame = hc.asH2OFrame(finalDF) + h2oFrame.delete() + + numberOfEvaluationsAccumulator.value shouldEqual 1 + } + test("DataFrame[FloatField] to H2OFrame[Numeric]") { val values = Seq(Float.MinValue, Float.MaxValue, -33.33.toFloat, 200.001.toFloat, -5000.34.toFloat) val df = sc.parallelize(values).map(v => FloatField(v)).toDF diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala index b8d389657d..ed41ebab76 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala @@ -135,7 +135,6 @@ final class ChunkServlet extends ServletBase { private case class PUTRequestParameters( frameName: String, - numRows: Int, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int], @@ -160,8 +159,6 @@ final class ChunkServlet extends ServletBase { private object PUTRequestParameters { def parse(request: HttpServletRequest): PUTRequestParameters = { val frameName = getParameterAsString(request, "frame_name") - val numRowsString = getParameterAsString(request, "num_rows") - val numRows = numRowsString.toInt val chunkIdString = getParameterAsString(request, "chunk_id") val chunkId = chunkIdString.toInt val expectedTypesString = getParameterAsString(request, "expected_types") @@ -169,15 +166,14 @@ final class ChunkServlet extends ServletBase { val maximumVectorSizesString = getParameterAsString(request, "maximum_vector_sizes") val maxVecSizes = Base64Encoding.decodeToIntArray(maximumVectorSizesString) val compression = getParameterAsString(request, "compression") - PUTRequestParameters(frameName, numRows, chunkId, expectedTypes, maxVecSizes, compression) + PUTRequestParameters(frameName, chunkId, expectedTypes, maxVecSizes, compression) } } /* * The method handles handles PUT requests for the path /3/Chunk - * It requires 6 GET parameters + * It requires 5 GET parameters * - frame_name - a unique string identifier of H2O Frame - * - num_rows - a number of rows forming by the body of the request * - chunk_id - a unique identifier of the chunk within the H2O Frame * - expected_type - byte array encoded in Base64 encoding. The types corresponds to the `selected_columns` parameter * - maximum_vector_sizes - maximum vector sizes for each vector column encoded into Base64 encoding. @@ -193,7 +189,6 @@ final class ChunkServlet extends ServletBase { withResource(new ChunkAutoBufferReader(decompressed)) { reader => reader.readChunk( parameters.frameName, - parameters.numRows, parameters.chunkId, parameters.expectedTypes, parameters.maxVecSizes) diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala index 149866e34b..c6c021e0e8 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala @@ -31,7 +31,6 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl def readChunk( frameName: String, - numRows: Int, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): Unit = { @@ -39,8 +38,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl val elementSizes = getElementSizes(expectedTypes, maxVecSizes) val startPositions = getStartPositions(elementSizes) val chunks = ChunkUtils.createNewChunks(frameName, vecTypes, chunkId) - var rowIdx = 0 - while (rowIdx < numRows) { + while (readHasNext()) { var typeIdx = 0 while (typeIdx < expectedTypes.length) { expectedTypes(typeIdx) match { @@ -60,7 +58,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl } typeIdx += 1 } - rowIdx += 1 + //rowIdx += 1 } ChunkUtils.closeNewChunks(chunks) } @@ -142,6 +140,8 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl } } + def readHasNext() = readBoolean() + def readBoolean(): Boolean = { val data = buffer.getZ isLastNAVar = isNA(data) From a1d3a4ff8c093814a818c790e0c42edce47dea34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Novotn=C3=BD?= Date: Mon, 11 Oct 2021 16:24:05 +0200 Subject: [PATCH 4/6] spotless apply --- .../src/main/scala/ai/h2o/sparkling/backend/Writer.scala | 9 ++++----- .../backend/converters/DataFrameConverterTestSuite.scala | 4 +++- .../h2o/sparkling/extensions/rest/api/ChunkServlet.scala | 6 +----- .../extensions/serde/ChunkAutoBufferReader.scala | 6 +----- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala index 8a5f468827..78e3e027cc 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala @@ -33,8 +33,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.{ExposeUtils, TaskContext, ml, mllib} -private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) - extends Closeable { +private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) extends Closeable { private val outputStream = H2OChunk.putChunk( nodeDesc, @@ -104,9 +103,9 @@ private[backend] object Writer { H2OFrame(metadata.frameId) } - private def perDataFramePartition( - metadata: WriterMetadata, - uploadPlan: UploadPlan)(context: TaskContext, it: Iterator[Row]): (Int, Long) = { + private def perDataFramePartition(metadata: WriterMetadata, uploadPlan: UploadPlan)( + context: TaskContext, + it: Iterator[Row]): (Int, Long) = { val chunkIdx = context.partitionId() var numRows = 0 val domainBuilder = new CategoricalDomainBuilder(metadata.expectedTypes) diff --git a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala index c76a5a95db..3d62b83319 100644 --- a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala +++ b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala @@ -255,7 +255,9 @@ class DataFrameConverterTestSuite extends FunSuite with SharedH2OTestContext { test("DataFrame is evaluated only once during conversion to H2OFrame") { val numberOfEvaluationsAccumulator = spark.sparkContext.longAccumulator("NumberOfEvaluationsAccumulator") val baseDF = spark.range(1000).repartition(1) - val finalDF = baseDF.mapPartitions { partitions => numberOfEvaluationsAccumulator.add(1); partitions } + val finalDF = baseDF.mapPartitions { partitions => + numberOfEvaluationsAccumulator.add(1); partitions + } val h2oFrame = hc.asH2OFrame(finalDF) h2oFrame.delete() diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala index ed41ebab76..fc36e91ccb 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala @@ -187,11 +187,7 @@ final class ChunkServlet extends ServletBase { withResource(request.getInputStream) { inputStream => withResource(Compression.decompress(parameters.compression, inputStream)) { decompressed => withResource(new ChunkAutoBufferReader(decompressed)) { reader => - reader.readChunk( - parameters.frameName, - parameters.chunkId, - parameters.expectedTypes, - parameters.maxVecSizes) + reader.readChunk(parameters.frameName, parameters.chunkId, parameters.expectedTypes, parameters.maxVecSizes) } } } diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala index c6c021e0e8..9a1b523c9e 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala @@ -29,11 +29,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl private val buffer = new AutoBuffer(inputStream) private var isLastNAVar: Boolean = false - def readChunk( - frameName: String, - chunkId: Int, - expectedTypes: Array[ExpectedType], - maxVecSizes: Array[Int]): Unit = { + def readChunk(frameName: String, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): Unit = { val vecTypes = SerdeUtils.expectedTypesToVecTypes(expectedTypes, maxVecSizes) val elementSizes = getElementSizes(expectedTypes, maxVecSizes) val startPositions = getStartPositions(elementSizes) From 3a459da340507f181f30654ed45654dbebdf5fd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Novotn=C3=BD?= Date: Thu, 21 Oct 2021 12:50:47 +0200 Subject: [PATCH 5/6] add repartitioning --- .../ai/h2o/sparkling/backend/NodeDesc.scala | 5 +++-- .../sparkling/backend/SharedBackendConf.scala | 17 +++++++++++++++++ .../converters/SparkDataFrameConverter.scala | 15 ++++++++++++--- .../sparkling/backend/utils/RestApiUtils.scala | 2 +- .../h2o/backends/internal/H2ORpcEndpoint.scala | 6 +++++- .../backends/internal/InternalH2OBackend.scala | 2 +- .../rest/api/schema/UploadPlanV3.java | 4 ++++ 7 files changed, 43 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/NodeDesc.scala b/core/src/main/scala/ai/h2o/sparkling/backend/NodeDesc.scala index 7cca2bca95..c06330d2f6 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/NodeDesc.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/NodeDesc.scala @@ -26,8 +26,9 @@ import water.nbhm.NonBlockingHashMap * is ID of Spark Executor where corresponding instance is located * @param hostname hostname of the node * @param port port of the node + * @param allowedCpus The number of cpus processing tasks on H2O node */ -case class NodeDesc(nodeId: String, hostname: String, port: Int) { +case class NodeDesc(nodeId: String, hostname: String, port: Int, allowedCpus: Int) { override def productPrefix = "" def ipPort(): String = s"$hostname:$port" @@ -40,7 +41,7 @@ object NodeDesc { private def fromH2ONode(node: H2ONode): NodeDesc = { val ipPort = node.getIpPortString.split(":") - NodeDesc(node.index().toString, ipPort(0), Integer.parseInt(ipPort(1))) + NodeDesc(node.index().toString, ipPort(0), Integer.parseInt(ipPort(1)), node._heartbeat._cpus_allowed) } private def intern(node: H2ONode): NodeDesc = { diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/SharedBackendConf.scala b/core/src/main/scala/ai/h2o/sparkling/backend/SharedBackendConf.scala index ca6285ad69..f14bfe7a27 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/SharedBackendConf.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/SharedBackendConf.scala @@ -165,6 +165,9 @@ trait SharedBackendConf extends SharedBackendConfExtensions { def restApiTimeout: Int = sparkConf.getInt(PROP_REST_API_TIMEOUT._1, PROP_REST_API_TIMEOUT._2) + def isNumberOfPartitionsOptimizationEnabled(): Boolean = + sparkConf.getBoolean(PROP_OPTIMIZE_NUM_PARTITIONS._1, PROP_OPTIMIZE_NUM_PARTITIONS._2) + /** Setters */ def setInternalClusterMode(): H2OConf = { if (runsInExternalClusterMode) { @@ -334,6 +337,10 @@ trait SharedBackendConf extends SharedBackendConfExtensions { def setIcedDir(dir: String): H2OConf = set(PROP_ICED_DIR._1, dir) def setRestApiTimeout(timeout: Int): H2OConf = set(PROP_REST_API_TIMEOUT._1, timeout.toString) + + def setNumberOfPartitionsOptimizationEnabled(): H2OConf = set(PROP_OPTIMIZE_NUM_PARTITIONS._1, true) + + def setNumberOfPartitionsOptimizationDisabled(): H2OConf = set(PROP_OPTIMIZE_NUM_PARTITIONS._1, false) } object SharedBackendConf { @@ -698,6 +705,16 @@ object SharedBackendConf { "setSessionTimeout(Boolean)", "Timeout in milliseconds for Rest API requests.") + val PROP_OPTIMIZE_NUM_PARTITIONS: BooleanOption = ( + "spark.ext.h2o.conversions.optimize.numPartitions", + true, + """setNumberOfPartitionsOptimizationEnabled() + |setNumberOfPartitionsOptimizationDisabled() + """.stripMargin, + "If enabled, the conversion method asH2OFrame will repartition the input data frame to " + + "the optimal number of partitions for a given H2O cluster." + ) + /** Language of the connected client. */ private[sparkling] val PROP_CLIENT_LANGUAGE: (String, String) = ("spark.ext.h2o.client.language", "scala") } diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala b/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala index 2f48310f87..d50b31a389 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala @@ -45,7 +45,7 @@ object SparkDataFrameConverter extends Logging { // we are dealing with Dataset[Row] val flatDataFrame = flattenDataFrame(df) val schema = flatDataFrame.schema - val rdd = flatDataFrame.rdd // materialized the data frame + val rdd = flatDataFrame.rdd val elemMaxSizes = collectMaxElementSizes(rdd, schema) val vecIndices = collectVectorLikeTypes(schema).toArray @@ -56,7 +56,16 @@ object SparkDataFrameConverter extends Logging { val expectedTypes = DataTypeConverter.determineExpectedTypes(schema) val uniqueFrameId = frameKeyName.getOrElse("frame_rdd_" + rdd.id + scala.util.Random.nextInt()) - val metadata = WriterMetadata(hc.getConf, uniqueFrameId, expectedTypes, maxVecSizes, SparkTimeZone.current()) - Writer.convert(new H2OAwareRDD(hc.getH2ONodes(), rdd), colNames, metadata) + val conf = hc.getConf + val h2oNodes = hc.getNodes(hc.getClusterInfo(conf)) + val metadata = WriterMetadata(conf, uniqueFrameId, expectedTypes, maxVecSizes, SparkTimeZone.current()) + + val optimizedRdd = if (conf.isNumberOfPartitionsOptimizationEnabled()) { + val optimalSize = h2oNodes.foldLeft(0)((accumulator, node) => accumulator + node.allowedCpus) + flatDataFrame.rdd.repartition(optimalSize) + } else { + rdd + } + Writer.convert(new H2OAwareRDD(h2oNodes, optimizedRdd), colNames, metadata) } } diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/utils/RestApiUtils.scala b/core/src/main/scala/ai/h2o/sparkling/backend/utils/RestApiUtils.scala index 45acca24d8..cf2996b59e 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/utils/RestApiUtils.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/utils/RestApiUtils.scala @@ -63,7 +63,7 @@ trait RestApiUtils extends RestCommunication { val splits = node.ip_port.split(":") val ip = splits(0) val port = splits(1).toInt - NodeDesc(idx.toString, ip, port) + NodeDesc(idx.toString, ip, port, node.cpus_allowed) } } diff --git a/core/src/main/scala/org/apache/spark/h2o/backends/internal/H2ORpcEndpoint.scala b/core/src/main/scala/org/apache/spark/h2o/backends/internal/H2ORpcEndpoint.scala index 6c26aab9da..e368e9c380 100644 --- a/core/src/main/scala/org/apache/spark/h2o/backends/internal/H2ORpcEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/h2o/backends/internal/H2ORpcEndpoint.scala @@ -61,7 +61,11 @@ class H2ORpcEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint context.reply(H2O.CLOUD.size()) case GetLeaderNodeMsg => val reply = if (H2O.SELF.isLeaderNode) { - Some(NodeDesc(SparkEnv.get.executorId, H2O.SELF_ADDRESS.getHostAddress, H2O.API_PORT)) + Some(NodeDesc( + SparkEnv.get.executorId, + H2O.SELF_ADDRESS.getHostAddress, + H2O.API_PORT, + H2O.SELF._heartbeat._cpus_allowed)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala b/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala index 0d63768fb7..5b98693994 100644 --- a/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala +++ b/core/src/main/scala/org/apache/spark/h2o/backends/internal/InternalH2OBackend.scala @@ -152,7 +152,7 @@ object InternalH2OBackend extends InternalBackendUtils { val launcherArgs = toH2OArgs(args) initializeH2OKerberizedHiveSupport(conf) H2OStarter.start(launcherArgs, true) - NodeDesc(SparkEnv.get.executorId, H2O.SELF_ADDRESS.getHostAddress, H2O.API_PORT) + NodeDesc(SparkEnv.get.executorId, H2O.SELF_ADDRESS.getHostAddress, H2O.API_PORT, H2O.SELF._heartbeat._cpus_allowed) } private def registerNewExecutorListener(hc: H2OContext): Unit = { diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/schema/UploadPlanV3.java b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/schema/UploadPlanV3.java index 29fddde3ec..7b97daeb32 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/schema/UploadPlanV3.java +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/schema/UploadPlanV3.java @@ -51,6 +51,9 @@ public static class ChunkAssigmentV3 extends SchemaV3 Date: Thu, 21 Oct 2021 13:34:29 +0200 Subject: [PATCH 6/6] minor improvement --- .../sparkling/backend/converters/SparkDataFrameConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala b/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala index d50b31a389..2a730d2af8 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/converters/SparkDataFrameConverter.scala @@ -62,7 +62,7 @@ object SparkDataFrameConverter extends Logging { val optimizedRdd = if (conf.isNumberOfPartitionsOptimizationEnabled()) { val optimalSize = h2oNodes.foldLeft(0)((accumulator, node) => accumulator + node.allowedCpus) - flatDataFrame.rdd.repartition(optimalSize) + rdd.repartition(optimalSize) } else { rdd }