Skip to content

Commit

Permalink
spotless apply
Browse files Browse the repository at this point in the history
  • Loading branch information
mn-mikke committed Oct 21, 2021
1 parent 9b3ebf6 commit a1d3a4f
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 16 deletions.
9 changes: 4 additions & 5 deletions core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.{ExposeUtils, TaskContext, ml, mllib}

private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int)
extends Closeable {
private[backend] class Writer(nodeDesc: NodeDesc, metadata: WriterMetadata, chunkId: Int) extends Closeable {

private val outputStream = H2OChunk.putChunk(
nodeDesc,
Expand Down Expand Up @@ -104,9 +103,9 @@ private[backend] object Writer {
H2OFrame(metadata.frameId)
}

private def perDataFramePartition(
metadata: WriterMetadata,
uploadPlan: UploadPlan)(context: TaskContext, it: Iterator[Row]): (Int, Long) = {
private def perDataFramePartition(metadata: WriterMetadata, uploadPlan: UploadPlan)(
context: TaskContext,
it: Iterator[Row]): (Int, Long) = {
val chunkIdx = context.partitionId()
var numRows = 0
val domainBuilder = new CategoricalDomainBuilder(metadata.expectedTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ class DataFrameConverterTestSuite extends FunSuite with SharedH2OTestContext {
test("DataFrame is evaluated only once during conversion to H2OFrame") {
val numberOfEvaluationsAccumulator = spark.sparkContext.longAccumulator("NumberOfEvaluationsAccumulator")
val baseDF = spark.range(1000).repartition(1)
val finalDF = baseDF.mapPartitions { partitions => numberOfEvaluationsAccumulator.add(1); partitions }
val finalDF = baseDF.mapPartitions { partitions =>
numberOfEvaluationsAccumulator.add(1); partitions
}

val h2oFrame = hc.asH2OFrame(finalDF)
h2oFrame.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,7 @@ final class ChunkServlet extends ServletBase {
withResource(request.getInputStream) { inputStream =>
withResource(Compression.decompress(parameters.compression, inputStream)) { decompressed =>
withResource(new ChunkAutoBufferReader(decompressed)) { reader =>
reader.readChunk(
parameters.frameName,
parameters.chunkId,
parameters.expectedTypes,
parameters.maxVecSizes)
reader.readChunk(parameters.frameName, parameters.chunkId, parameters.expectedTypes, parameters.maxVecSizes)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ final class ChunkAutoBufferReader(val inputStream: InputStream) extends Closeabl
private val buffer = new AutoBuffer(inputStream)
private var isLastNAVar: Boolean = false

def readChunk(
frameName: String,
chunkId: Int,
expectedTypes: Array[ExpectedType],
maxVecSizes: Array[Int]): Unit = {
def readChunk(frameName: String, chunkId: Int, expectedTypes: Array[ExpectedType], maxVecSizes: Array[Int]): Unit = {
val vecTypes = SerdeUtils.expectedTypesToVecTypes(expectedTypes, maxVecSizes)
val elementSizes = getElementSizes(expectedTypes, maxVecSizes)
val startPositions = getStartPositions(elementSizes)
Expand Down

0 comments on commit a1d3a4f

Please sign in to comment.