diff --git a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala index 8a5f468827..78e3e027cc 100644 --- a/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala +++ b/core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala @@ -33,8 +33,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.{ExposeUtils, TaskContext, ml, mllib} -private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) - extends Closeable { +private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) extends Closeable { private val outputStream = H2OChunk.putChunk( nodeDesc, @@ -104,9 +103,9 @@ private[backend] object Writer { H2OFrame(metadata.frameId) } - private def perDataFramePartition( - metadata: WriterMetadata, - uploadPlan: UploadPlan)(context: TaskContext, it: Iterator[Row]): (Int, Long) = { + private def perDataFramePartition(metadata: WriterMetadata, uploadPlan: UploadPlan)( + context: TaskContext, + it: Iterator[Row]): (Int, Long) = { val chunkIdx = context.partitionId() var numRows = 0 val domainBuilder = new CategoricalDomainBuilder(metadata.expectedTypes) diff --git a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala index c76a5a95db..3d62b83319 100644 --- a/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala +++ b/core/src/test/scala/ai/h2o/sparkling/backend/converters/DataFrameConverterTestSuite.scala @@ -255,7 +255,9 @@ class DataFrameConverterTestSuite extends FunSuite with SharedH2OTestContext { test("DataFrame is evaluated only once during conversion to H2OFrame") { val numberOfEvaluationsAccumulator = spark.sparkContext.longAccumulator("NumberOfEvaluationsAccumulator") val baseDF = spark.range(1000).repartition(1) - val finalDF = baseDF.mapPartitions { partitions => numberOfEvaluationsAccumulator.add(1); partitions } + val finalDF = baseDF.mapPartitions { partitions => + numberOfEvaluationsAccumulator.add(1); partitions + } val h2oFrame = hc.asH2OFrame(finalDF) h2oFrame.delete() diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala index ed41ebab76..fc36e91ccb 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/rest/api/ChunkServlet.scala @@ -187,11 +187,7 @@ final class ChunkServlet extends ServletBase { withResource(request.getInputStream) { inputStream => withResource(Compression.decompress(parameters.compression, inputStream)) { decompressed => withResource(new ChunkAutoBufferReader(decompressed)) { reader => - reader.readChunk( - parameters.frameName, - parameters.chunkId, - parameters.expectedTypes, - parameters.maxVecSizes) + reader.readChunk(parameters.frameName, parameters.chunkId, parameters.expectedTypes, parameters.maxVecSizes) } } } diff --git a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala index c6c021e0e8..9a1b523c9e 100644 --- a/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala +++ b/extensions/src/main/scala/ai/h2o/sparkling/extensions/serde/ChunkAutoBufferReader.scala @@ -29,11 +29,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl private val buffer = new AutoBuffer(inputStream) private var isLastNAVar: Boolean = false - def readChunk( - frameName: String, - chunkId: Int, - expectedTypes: Array[ExpectedType], - maxVecSizes: Array[Int]): Unit = { + def readChunk(frameName: String, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): Unit = { val vecTypes = SerdeUtils.expectedTypesToVecTypes(expectedTypes, maxVecSizes) val elementSizes = getElementSizes(expectedTypes, maxVecSizes) val startPositions = getStartPositions(elementSizes)