Skip to content

Commit

Permalink
[spark] Add task metrics (#2498)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored Dec 13, 2023
1 parent 4107c01 commit 36d5318
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 162 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
package org.apache.paimon.spark

import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.table.{DataTable, Table}
import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
import org.apache.paimon.table.source.{ReadBuilder, Split}

import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
import org.apache.spark.sql.types.StructType

import java.util.OptionalLong

import scala.collection.JavaConverters._

abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: String)
Expand Down Expand Up @@ -60,4 +59,17 @@ abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: Stri
splits
}

override def supportedCustomMetrics: Array[CustomMetric] = {
val paimonMetrics: Array[CustomMetric] = table match {
case _: FileStoreTable =>
Array(
PaimonNumSplitMetric(),
PaimonSplitSizeMetric(),
PaimonAvgSplitSizeMetric()
)
case _ =>
Array.empty[CustomMetric]
}
super.supportedCustomMetrics() ++ paimonMetrics
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ case class PaimonBatch(splits: Array[Split], readBuilder: ReadBuilder) extends B
override def planInputPartitions(): Array[InputPartition] =
splits.map(new SparkInputPartition(_).asInstanceOf[InputPartition])

override def createReaderFactory(): PartitionReaderFactory = new SparkReaderFactory(readBuilder)
override def createReaderFactory(): PartitionReaderFactory = new PaimonPartitionReaderFactory(
readBuilder)

override def equals(obj: Any): Boolean = {
obj match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark

import org.apache.spark.sql.Utils
import org.apache.spark.sql.connector.metric.{CustomAvgMetric, CustomSumMetric, CustomTaskMetric}

import java.text.DecimalFormat

object PaimonMetrics {

val NUM_SPLITS = "numSplits"

val SPLIT_SIZE = "splitSize"

val AVG_SPLIT_SIZE = "avgSplitSize"
}

// paimon's task metric
sealed trait PaimonTaskMetric extends CustomTaskMetric

case class PaimonNumSplitsTaskMetric(override val value: Long) extends PaimonTaskMetric {

override def name(): String = PaimonMetrics.NUM_SPLITS

}

case class PaimonSplitSizeTaskMetric(override val value: Long) extends PaimonTaskMetric {

override def name(): String = PaimonMetrics.SPLIT_SIZE

}

case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends PaimonTaskMetric {

override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE

}

// paimon's sum metric
sealed trait PaimonSumMetric extends CustomSumMetric {

protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Long = {
var sum: Long = 0L
for (taskMetric <- taskMetrics) {
sum += taskMetric
}
sum
}

override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
String.valueOf(aggregateTaskMetrics0(taskMetrics))
}

}

case class PaimonNumSplitMetric() extends PaimonSumMetric {

override def name(): String = PaimonMetrics.NUM_SPLITS

override def description(): String = "number of splits read"

}

case class PaimonSplitSizeMetric() extends PaimonSumMetric {

override def name(): String = PaimonMetrics.SPLIT_SIZE

override def description(): String = "size of splits read"

override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
Utils.bytesToString(aggregateTaskMetrics0(taskMetrics))
}
}

// paimon's avg metric
sealed trait PaimonAvgMetric extends CustomAvgMetric {

protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Double = {
if (taskMetrics.length > 0) {
var sum = 0L
for (taskMetric <- taskMetrics) {
sum += taskMetric
}
sum.toDouble / taskMetrics.length
} else {
0d
}
}

override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
val average = aggregateTaskMetrics0(taskMetrics)
new DecimalFormat("#0.000").format(average)
}

}

case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric {

override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE

override def description(): String = "avg size of splits read"

override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
val average = aggregateTaskMetrics0(taskMetrics).round
Utils.bytesToString(average)
}

}
Loading

0 comments on commit 36d5318

Please sign in to comment.