Skip to content

Commit

Permalink
[spark] Avoid unnecessary get splits in spark scan (apache#3645)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jul 1, 2024
1 parent 37fb8f5 commit 1fde4a4
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark

import org.apache.paimon.{stats, CoreOptions}
import org.apache.paimon.annotation.VisibleForTesting
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.schema.PaimonMetadataColumn
Expand Down Expand Up @@ -92,6 +93,7 @@ abstract class PaimonBaseScan(
_readBuilder
}

@VisibleForTesting
def getOriginSplits: Array[Split] = {
readBuilder
.newScan()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import org.apache.paimon.table.source.Split

import org.apache.spark.sql.connector.read.InputPartition

case class PaimonInputPartition(splits: Seq[Split]) extends InputPartition {}
case class PaimonInputPartition(splits: Seq[Split]) extends InputPartition {
def rowCount(): Long = {
splits.map(_.rowCount()).sum
}
}

object PaimonInputPartition {
def apply(split: Split): PaimonInputPartition = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._

case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics {

private lazy val rowCount: Long = scan.getOriginSplits.map(_.rowCount).sum
private lazy val rowCount: Long = scan.getInputPartitions.map(_.rowCount()).sum

private lazy val scannedTotalSize: Long = rowCount * scan.readSchema().defaultSize

Expand Down

0 comments on commit 1fde4a4

Please sign in to comment.