Skip to content

Commit

Permalink
[spark] refine description of scan
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Dec 13, 2023
1 parent c4f74c4 commit d11d476
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
*/
package org.apache.paimon.spark

import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.ReadBuilder

case class PaimonScan(table: Table, readBuilder: ReadBuilder, desc: String)
extends PaimonBaseScan(table, readBuilder, desc)
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

case class PaimonScan(
table: Table,
requiredSchema: StructType,
filters: Array[(Filter, Predicate)],
pushDownLimit: Option[Int])
extends PaimonBaseScan(table, requiredSchema, filters, pushDownLimit)
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,57 @@
*/
package org.apache.paimon.spark

import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
import org.apache.paimon.table.source.{ReadBuilder, Split}

import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters._

abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: String)
abstract class PaimonBaseScan(
table: Table,
requiredSchema: StructType,
filters: Array[(Filter, Predicate)],
pushDownLimit: Option[Int])
extends Scan
with SupportsReportStatistics {

private val fieldNames = table.rowType.getFieldNames

protected var runtimeFilters: Array[Filter] = Array.empty

protected var splits: Array[Split] = _

override def description(): String = desc
protected lazy val readBuilder: ReadBuilder = {
val _readBuilder = table.newReadBuilder()

val projection = requiredSchema.fieldNames.map(field => fieldNames.indexOf(field))
_readBuilder.withProjection(projection)
if (filters.nonEmpty) {
val pushedPredicate = PredicateBuilder.and(filters.map(_._2): _*)
_readBuilder.withFilter(pushedPredicate)
}
pushDownLimit.foreach(_readBuilder.withLimit)

_readBuilder
}

def getSplits: Array[Split] = {
if (splits == null) {
splits = readBuilder.newScan().plan().splits().asScala.toArray
}
splits
}

override def readSchema(): StructType = {
SparkTypeUtils.fromPaimonRowType(readBuilder.readType())
val requiredFields = requiredSchema.filter(field => fieldNames.contains(field.name))
StructType(requiredFields)
}

override def toBatch: Batch = {
Expand All @@ -52,13 +82,6 @@ abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: Stri
PaimonStatistics(this)
}

def getSplits: Array[Split] = {
if (splits == null) {
splits = readBuilder.newScan().plan().splits().asScala.toArray
}
splits
}

override def supportedCustomMetrics: Array[CustomMetric] = {
val paimonMetrics: Array[CustomMetric] = table match {
case _: FileStoreTable =>
Expand All @@ -72,4 +95,23 @@ abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: Stri
}
super.supportedCustomMetrics() ++ paimonMetrics
}

override def description(): String = {
val requiredFieldNameStr = readSchema().map(_.name).mkString(",")
val pushedFiltersStr = if (filters.nonEmpty) {
", PushedFilters: " + filters.map(_._1).mkString(",")
} else {
""
}
val runtimeFiltersStr = if (runtimeFilters.nonEmpty) {
", RuntimeFilters: " + runtimeFilters.mkString(",")
} else {
""
}
super.description() + "["
"Fields: " + requiredFieldNameStr + pushedFiltersStr +
runtimeFiltersStr +
pushDownLimit.map(limit => s", Limit: $limit") +
"]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,14 @@ abstract class PaimonBaseScanBuilder(table: Table)
with SupportsPushDownRequiredColumns
with Logging {

protected var predicates: Option[Predicate] = None
protected var requiredSchema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType())

protected var pushed: Option[Array[Filter]] = None
protected var pushed: Array[(Filter, Predicate)] = Array.empty

protected var projectedIndexes: Option[Array[Int]] = None

protected def getReadBuilder: ReadBuilder = {
val readBuilder = table.newReadBuilder()
projectedIndexes.foreach(readBuilder.withProjection)
predicates.foreach(readBuilder.withFilter)

readBuilder
}

protected def getDescription: String = {
val description = s"PaimonTable: [${table.name()}]"
description + pushed
.map(filters => s" PushedFilters: [${filters.mkString(", ")}]")
.getOrElse("")
}
protected var pushDownLimit: Option[Int] = None

override def build(): Scan = {
PaimonScan(table, getReadBuilder, getDescription)
PaimonScan(table, requiredSchema, pushed, pushDownLimit)
}

/**
Expand All @@ -65,18 +50,16 @@ abstract class PaimonBaseScanBuilder(table: Table)
* filters must be interpreted as ANDed together.
*/
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
val pushable = mutable.ArrayBuffer.empty[Filter]
val pushable = mutable.ArrayBuffer.empty[(Filter, Predicate)]
val postScan = mutable.ArrayBuffer.empty[Filter]
val predicates = mutable.ArrayBuffer.empty[Predicate]

val converter = new SparkFilterConverter(table.rowType)
val visitor = new PartitionPredicateVisitor(table.partitionKeys())
filters.foreach {
filter =>
try {
val predicate = converter.convert(filter)
pushable.append(filter)
predicates.append(predicate)
pushable.append((filter, predicate))
if (!predicate.visit(visitor)) postScan.append(filter)
} catch {
case e: UnsupportedOperationException =>
Expand All @@ -85,21 +68,17 @@ abstract class PaimonBaseScanBuilder(table: Table)
}
}

if (predicates.nonEmpty) {
this.predicates = Some(PredicateBuilder.and(predicates: _*))
if (pushable.nonEmpty) {
this.pushed = pushable.toArray
}
this.pushed = Some(pushable.toArray)
postScan.toArray
}

override def pushedFilters(): Array[Filter] = {
pushed.getOrElse(Array.empty)
pushed.map(_._1)
}

override def pruneColumns(requiredSchema: StructType): Unit = {
val pruneFields = requiredSchema.fieldNames
val fieldNames = table.rowType.getFieldNames
val projected = pruneFields.map(field => fieldNames.indexOf(field))
this.projectedIndexes = Some(projected)
this.requiredSchema = requiredSchema
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@
*/
package org.apache.paimon.spark

import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.ReadBuilder

import org.apache.spark.sql.Utils.fieldReference
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
import org.apache.spark.sql.sources.{Filter, In}
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters._

case class PaimonScan(table: Table, readBuilder: ReadBuilder, desc: String)
extends PaimonBaseScan(table, readBuilder, desc)
case class PaimonScan(
table: Table,
requiredSchema: StructType,
filters: Array[(Filter, Predicate)],
pushDownLimit: Option[Int])
extends PaimonBaseScan(table, requiredSchema, filters, pushDownLimit)
with SupportsRuntimeFiltering {

override def filterAttributes(): Array[NamedReference] = {
Expand All @@ -49,6 +55,7 @@ case class PaimonScan(table: Table, readBuilder: ReadBuilder, desc: String)
case _ => None
}
if (partitionFilter.nonEmpty) {
this.runtimeFilters = filters
readBuilder.withFilter(partitionFilter.head)
splits = readBuilder.newScan().plan().splits().asScala.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@ class PaimonScanBuilder(table: Table)
extends PaimonBaseScanBuilder(table)
with SupportsPushDownLimit {

private var pushDownLimit: Option[Int] = None

override protected def getReadBuilder: ReadBuilder = {
val readBuilder = super.getReadBuilder
pushDownLimit.foreach(readBuilder.withLimit)
readBuilder
}

override protected def getDescription: String = {
super.getDescription + pushDownLimit.map(limit => s" Limit: [$limit]").getOrElse("")
}

override def pushLimit(limit: Int): Boolean = {
if (table.isInstanceOf[AppendOnlyFileStoreTable]) {
pushDownLimit = Some(limit)
Expand Down

0 comments on commit d11d476

Please sign in to comment.