Skip to content

Commit

Permalink
[SW-2622] Avoid Multiple Materialization of Spark DataFrame During Co…
Browse files Browse the repository at this point in the history
…nversion to H2OFrame
  • Loading branch information
mn-mikke committed Oct 21, 2021
1 parent a651153 commit 9b3ebf6
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.{Partition, TaskContext}

import scala.reflect.ClassTag

private[backend] class H2OAwareRDD[U: ClassTag](nodes: Array[NodeDesc], prev: RDD[U])
private[backend] class H2OAwareRDD[U: ClassTag](val nodes: Array[NodeDesc], prev: RDD[U])
extends H2OAwareBaseRDD[U](nodes, prev) {

override def compute(split: Partition, context: TaskContext): Iterator[U] = prev.compute(split, context)
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/ai/h2o/sparkling/backend/H2OChunk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ private[sparkling] object H2OChunk extends RestCommunication {
node: NodeDesc,
conf: H2OConf,
frameName: String,
numRows: Int,
chunkId: Int,
expectedTypes: Array[ExpectedType],
maxVecSizes: Array[Int]): OutputStream = {
Expand All @@ -67,7 +66,6 @@ private[sparkling] object H2OChunk extends RestCommunication {

val parameters = Map(
"frame_name" -> frameName,
"num_rows" -> numRows,
"chunk_id" -> chunkId,
"expected_types" -> expectedTypesString,
"maximum_vector_sizes" -> maxVecSizesString,
Expand Down
53 changes: 20 additions & 33 deletions core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.{ExposeUtils, TaskContext, ml, mllib}

private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, numRows: Int, chunkId: Int)
private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int)
extends Closeable {

private val outputStream = H2OChunk.putChunk(
nodeDesc,
metadata.conf,
metadata.frameId,
numRows,
chunkId,
metadata.expectedTypes,
metadata.maxVectorSizes)

private val chunkWriter = new ChunkAutoBufferWriter(outputStream)

def setHasNext(data: Boolean): Unit = put(data)

def put(data: Boolean): Unit = chunkWriter.writeBoolean(data)

def put(data: Byte): Unit = chunkWriter.writeByte(data)
Expand Down Expand Up @@ -91,13 +92,12 @@ private[backend] object Writer {

def convert(rdd: H2OAwareRDD[Row], colNames: Array[String], metadata: WriterMetadata): H2OFrame = {
H2OFrame.initializeFrame(metadata.conf, metadata.frameId, colNames)
val partitionSizes = getNonEmptyPartitionSizes(rdd)
val nonEmptyPartitions = getNonEmptyPartitions(partitionSizes)
val numPartitions = rdd.getNumPartitions

val uploadPlan = getUploadPlan(metadata.conf, nonEmptyPartitions.length)
val operation: SparkJob = perDataFramePartition(metadata, uploadPlan, nonEmptyPartitions, partitionSizes)
val rows = SparkSessionUtils.active.sparkContext.runJob(rdd, operation, nonEmptyPartitions)
val res = new Array[Long](nonEmptyPartitions.size)
val uploadPlan = getUploadPlan(metadata.conf, numPartitions)
val operation: SparkJob = perDataFramePartition(metadata, uploadPlan)
val rows = SparkSessionUtils.active.sparkContext.runJob(rdd, operation)
val res = new Array[Long](numPartitions)
rows.foreach { case (chunkIdx, numRows) => res(chunkIdx) = numRows }
val types = SerdeUtils.expectedTypesToVecTypes(metadata.expectedTypes, metadata.maxVectorSizes)
H2OFrame.finalizeFrame(metadata.conf, metadata.frameId, res, types)
Expand All @@ -106,17 +106,18 @@ private[backend] object Writer {

private def perDataFramePartition(
metadata: WriterMetadata,
uploadPlan: UploadPlan,
partitions: Seq[Int],
partitionSizes: Map[Int, Int])(context: TaskContext, it: Iterator[Row]): (Int, Long) = {
val chunkIdx = partitions.indexOf(context.partitionId())
val numRows = partitionSizes(context.partitionId())
uploadPlan: UploadPlan)(context: TaskContext, it: Iterator[Row]): (Int, Long) = {
val chunkIdx = context.partitionId()
var numRows = 0
val domainBuilder = new CategoricalDomainBuilder(metadata.expectedTypes)
val h2oNode = uploadPlan(chunkIdx)
withResource(new Writer(h2oNode, metadata, numRows, chunkIdx)) { writer =>
withResource(new Writer(h2oNode, metadata, chunkIdx)) { writer =>
it.foreach { row =>
writer.setHasNext(true)
sparkRowToH2ORow(row, writer, metadata, domainBuilder)
numRows += 1
}
writer.setHasNext(false)
}
H2OChunk.putChunkCategoricalDomains(h2oNode, metadata.conf, metadata.frameId, chunkIdx, domainBuilder.getDomains())
(chunkIdx, numRows)
Expand Down Expand Up @@ -161,30 +162,16 @@ private[backend] object Writer {
}
}

private def getNonEmptyPartitionSizes[T](rdd: RDD[T]): Map[Int, Int] = {
rdd
.mapPartitionsWithIndex {
case (idx, it) =>
if (it.nonEmpty) {
Iterator.single((idx, it.size))
} else {
Iterator.empty
}
}
.collect()
.toMap
}

private def getNonEmptyPartitions(partitionSizes: Map[Int, Int]): Seq[Int] = {
partitionSizes.keys.toSeq.sorted
}

private def getUploadPlan(conf: H2OConf, numberOfPartitions: Int): UploadPlan = {
val endpoint = getClusterEndpoint(conf)
val parameters = Map("number_of_chunks" -> numberOfPartitions)
val rawPlan = query[UploadPlanV3](endpoint, Paths.UPLOAD_PLAN, conf, parameters)
rawPlan.layout.map { chunkAssignment =>
chunkAssignment.chunk_id -> NodeDesc(chunkAssignment.node_idx.toString, chunkAssignment.ip, chunkAssignment.port)
chunkAssignment.chunk_id -> NodeDesc(
chunkAssignment.node_idx.toString,
chunkAssignment.ip,
chunkAssignment.port,
chunkAssignment.cpus_allowed)
}.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object H2OClientUtils extends SharedBackendUtils {
.buildArgs()
}

def startH2OClient(hc: H2OContext, conf: H2OConf, nodes: Array[NodeDesc]): NodeDesc = {
def startH2OClient(hc: H2OContext, conf: H2OConf, nodes: Array[NodeDesc]): Unit = {
if (conf.runsInExternalClusterMode) {
setClientIp(conf)
} else {
Expand All @@ -91,7 +91,6 @@ object H2OClientUtils extends SharedBackendUtils {
}
}
}
NodeDesc(SparkEnv.get.executorId, H2O.SELF_ADDRESS.getHostAddress, H2O.API_PORT)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,17 @@ class DataFrameConverterTestSuite extends FunSuite with SharedH2OTestContext {
h2oFrame.delete()
}

test("DataFrame is evaluated only once during conversion to H2OFrame") {
val numberOfEvaluationsAccumulator = spark.sparkContext.longAccumulator("NumberOfEvaluationsAccumulator")
val baseDF = spark.range(1000).repartition(1)
val finalDF = baseDF.mapPartitions { partitions => numberOfEvaluationsAccumulator.add(1); partitions }

val h2oFrame = hc.asH2OFrame(finalDF)
h2oFrame.delete()

numberOfEvaluationsAccumulator.value shouldEqual 1
}

test("DataFrame[FloatField] to H2OFrame[Numeric]") {
val values = Seq(Float.MinValue, Float.MaxValue, -33.33.toFloat, 200.001.toFloat, -5000.34.toFloat)
val df = sc.parallelize(values).map(v => FloatField(v)).toDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ final class ChunkServlet extends ServletBase {

private case class PUTRequestParameters(
frameName: String,
numRows: Int,
chunkId: Int,
expectedTypes: Array[ExpectedType],
maxVecSizes: Array[Int],
Expand All @@ -160,24 +159,21 @@ final class ChunkServlet extends ServletBase {
private object PUTRequestParameters {
def parse(request: HttpServletRequest): PUTRequestParameters = {
val frameName = getParameterAsString(request, "frame_name")
val numRowsString = getParameterAsString(request, "num_rows")
val numRows = numRowsString.toInt
val chunkIdString = getParameterAsString(request, "chunk_id")
val chunkId = chunkIdString.toInt
val expectedTypesString = getParameterAsString(request, "expected_types")
val expectedTypes = Base64Encoding.decode(expectedTypesString).map(ExpectedTypes(_))
val maximumVectorSizesString = getParameterAsString(request, "maximum_vector_sizes")
val maxVecSizes = Base64Encoding.decodeToIntArray(maximumVectorSizesString)
val compression = getParameterAsString(request, "compression")
PUTRequestParameters(frameName, numRows, chunkId, expectedTypes, maxVecSizes, compression)
PUTRequestParameters(frameName, chunkId, expectedTypes, maxVecSizes, compression)
}
}

/*
* The method handles handles PUT requests for the path /3/Chunk
* It requires 6 GET parameters
* It requires 5 GET parameters
* - frame_name - a unique string identifier of H2O Frame
* - num_rows - a number of rows forming by the body of the request
* - chunk_id - a unique identifier of the chunk within the H2O Frame
* - expected_type - byte array encoded in Base64 encoding. The types corresponds to the `selected_columns` parameter
* - maximum_vector_sizes - maximum vector sizes for each vector column encoded into Base64 encoding.
Expand All @@ -193,7 +189,6 @@ final class ChunkServlet extends ServletBase {
withResource(new ChunkAutoBufferReader(decompressed)) { reader =>
reader.readChunk(
parameters.frameName,
parameters.numRows,
parameters.chunkId,
parameters.expectedTypes,
parameters.maxVecSizes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl

def readChunk(
frameName: String,
numRows: Int,
chunkId: Int,
expectedTypes: Array[ExpectedType],
maxVecSizes: Array[Int]): Unit = {
val vecTypes = SerdeUtils.expectedTypesToVecTypes(expectedTypes, maxVecSizes)
val elementSizes = getElementSizes(expectedTypes, maxVecSizes)
val startPositions = getStartPositions(elementSizes)
val chunks = ChunkUtils.createNewChunks(frameName, vecTypes, chunkId)
var rowIdx = 0
while (rowIdx < numRows) {
while (readHasNext()) {
var typeIdx = 0
while (typeIdx < expectedTypes.length) {
expectedTypes(typeIdx) match {
Expand All @@ -60,7 +58,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl
}
typeIdx += 1
}
rowIdx += 1
//rowIdx += 1
}
ChunkUtils.closeNewChunks(chunks)
}
Expand Down Expand Up @@ -142,6 +140,8 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl
}
}

def readHasNext() = readBoolean()

def readBoolean(): Boolean = {
val data = buffer.getZ
isLastNAVar = isNA(data)
Expand Down

0 comments on commit 9b3ebf6

Please sign in to comment.