Skip to content

Commit

Permalink
Support native scan hive paimon cow table (#708)
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue authored Dec 20, 2024
1 parent 2425c4b commit 0a4ad53
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 2 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrowVersion>16.0.0</arrowVersion>
<protobufVersion>3.21.9</protobufVersion>
<paimonVersion>0.9.0</paimonVersion>
</properties>

<dependencyManagement>
Expand All @@ -43,6 +44,11 @@
<artifactId>spark-sql_${scalaVersion}</artifactId>
<version>${sparkVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
<version>${paimonVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions spark-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>spark-sql_${scalaVersion}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.blaze.plan.BuildSide
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.hive.blaze.BlazeHiveConverters

object BlazeConvertStrategy extends Logging {
import BlazeConverters._
Expand Down Expand Up @@ -123,6 +124,8 @@ object BlazeConvertStrategy extends Logging {
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: FileSourceScanExec =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e if BlazeHiveConverters.isNativePaimonTableScan(e) =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: ProjectExec if isNative(e.child) =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: FilterExec if isNative(e.child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@ import org.apache.spark.sql.execution.blaze.plan.ConvertToNativeBase
import org.apache.spark.sql.execution.blaze.plan.NativeOrcScanBase
import org.apache.spark.sql.execution.blaze.plan.NativeParquetScanBase
import org.apache.spark.sql.execution.blaze.plan.NativeSortBase
import org.apache.spark.sql.hive.blaze.BlazeHiveConverters
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.execution.blaze.plan.NativeHiveTableScanBase
import org.apache.spark.sql.types.LongType

object BlazeConverters extends Logging {
val enableScan: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.scan", defaultValue = true)
val enablePaimonScan: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.paimon.scan", defaultValue = false)
val enableProject: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.project", defaultValue = true)
val enableFilter: Boolean =
Expand Down Expand Up @@ -152,6 +156,9 @@ object BlazeConverters extends Logging {
case e: BroadcastExchangeExec => tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
tryConvert(e, convertFileSourceScanExec)
case e
if enablePaimonScan && BlazeHiveConverters.isNativePaimonTableScan(e) => // scan paimon
tryConvert(e, BlazeHiveConverters.convertPaimonTableScanExec)
case e: ProjectExec if enableProject => // project
tryConvert(e, convertProjectExec)
case e: FilterExec if enableFilter => // filter
Expand Down Expand Up @@ -787,7 +794,7 @@ object BlazeConverters extends Logging {
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
logWarning(
s"Error projecting resultExpressions, failback to non-native projection: " +
s"Error projecting resultExpressions, fallback to non-native projection: " +
s"${e.getMessage}")
val proj = ProjectExec(exec.resultExpressions, nativeAggr)
proj.setTagValue(convertToNonNativeTag, true)
Expand Down Expand Up @@ -878,7 +885,9 @@ object BlazeConverters extends Logging {
return false
}
plan match {
case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeUnionBase => true
case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeHiveTableScanBase |
_: NativeUnionBase =>
true
case _: ConvertToNativeBase => needRenameColumns(plan.children.head)
case exec if NativeHelper.isNative(exec) =>
NativeHelper.getUnderlyingNativePlan(exec).output != plan.output
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.blaze

import org.apache.spark.internal.Logging
import org.apache.spark.sql.blaze.BlazeConverters.addRenameColumnsExec
import org.apache.spark.sql.blaze.Shims
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.execution.blaze.plan.NativePaimonTableScanExec

object BlazeHiveConverters extends Logging {

def isNativePaimonTableScan(exec: SparkPlan): Boolean = {
exec match {
case e: HiveTableScanExec
if e.relation.tableMeta.storage.serde.isDefined
&& e.relation.tableMeta.storage.serde.get.contains("Paimon") =>
true
case _ => false
}
}

def convertPaimonTableScanExec(exec: SparkPlan): SparkPlan = {
val hiveExec = exec.asInstanceOf[HiveTableScanExec]
assert(
PaimonUtil.isPaimonCowTable(
PaimonUtil.loadTable(hiveExec.relation.tableMeta.location.toString)),
"paimon MOR/MOW mode is not supported")
val (relation, output, requestedAttributes, partitionPruningPred) = (
hiveExec.relation,
hiveExec.output,
hiveExec.requestedAttributes,
hiveExec.partitionPruningPred)
logDebug(s"Converting HiveTableScanExec: ${Shims.get.simpleStringWithNodeId(exec)}")
logDebug(s" relation: ${relation.getClass}")
logDebug(s" relation.location: ${relation.tableMeta.location}")
logDebug(s" output: $output")
logDebug(s" requestedAttributes: $requestedAttributes")
logDebug(s" partitionPruningPred: $partitionPruningPred")

addRenameColumnsExec(NativePaimonTableScanExec(hiveExec))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.blaze

import java.util.{HashMap => JHashMap}

import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.FileStoreTableFactory
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

object PaimonUtil extends Logging {
private val paimonCowOptionKey = "full-compaction.delta-commits"
private val paimonFileFormatOptionKey = "file.format"
val parquetFormat = "parquet"
val orcFormat = "orc"

def loadTable(path: String): FileStoreTable = {
val parameters = new JHashMap[String, String]()
parameters.put(CoreOptions.PATH.key, path)
val catalogContext =
CatalogContext.create(
Options.fromMap(parameters),
SparkSession.active.sessionState.newHadoopConf())
FileStoreTableFactory.create(catalogContext)
}

def isPaimonCowTable(table: FileStoreTable): Boolean = {
// https://paimon.apache.org/docs/master/primary-key-table/table-mode/
// Paimon COW mode: 'full-compaction.delta-commits' = '1'
table
.options()
.get(paimonCowOptionKey) != null && table.options().get(paimonCowOptionKey).equals("1")
}

def paimonFileFormat(table: FileStoreTable): String = {
if (table.options().get(paimonFileFormatOptionKey) != null) {
table.options().get(paimonFileFormatOptionKey)
} else {
parquetFormat
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution.blaze.plan

import scala.collection.immutable.SortedMap
import scala.collection.JavaConverters._
import scala.collection.Seq

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.blaze.JniBridge
import org.apache.spark.sql.blaze.NativeConverters
import org.apache.spark.sql.blaze.NativeHelper
import org.apache.spark.sql.blaze.NativeSupports
import org.apache.spark.sql.blaze.Shims
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.blaze.{protobuf => pb}

import java.net.URI
import java.security.PrivilegedExceptionAction

abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec)
extends LeafExecNode
with NativeSupports {

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq :+
("predicate_evaluation_errors", SQLMetrics
.createMetric(sparkContext, "Native.predicate_evaluation_errors")) :+
("row_groups_pruned", SQLMetrics
.createMetric(sparkContext, "Native.row_groups_pruned")) :+
("bytes_scanned", SQLMetrics.createSizeMetric(sparkContext, "Native.bytes_scanned")) :+
("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) :+
("io_time_getfs", SQLMetrics
.createNanoTimingMetric(sparkContext, "Native.io_time_getfs")): _*)

override val output: Seq[Attribute] = basedHiveScan.output
override val outputPartitioning: Partitioning = basedHiveScan.outputPartitioning

protected val relation: HiveTableRelation = basedHiveScan.relation
protected val partitionSchema: StructType = relation.tableMeta.partitionSchema
protected val tableName: String = relation.tableMeta.identifier.unquotedString

protected lazy val partitions: Array[FilePartition] = getFilePartitions()
private lazy val fileSizes = partitions
.flatMap(_.files)
.groupBy(_.filePath)
.mapValues(_.map(_.length).sum)
.map(identity) // make this map serializable

// should not include partition columns
protected def nativeFileSchema: pb.Schema =
NativeConverters.convertSchema(StructType(relation.tableMeta.dataSchema.map {
case field if basedHiveScan.requestedAttributes.exists(_.name == field.name) =>
field.copy(nullable = true)
case field =>
// avoid converting unsupported type in non-used fields
StructField(field.name, NullType, nullable = true)
}))

protected def nativePartitionSchema: pb.Schema =
NativeConverters.convertSchema(partitionSchema)

protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => {
// list input file statuses
val nativePartitionedFile = (file: PartitionedFile) => {
val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) =>
NativeConverters.convertValue(
file.partitionValues.get(index, field.dataType),
field.dataType)
}
pb.PartitionedFile
.newBuilder()
.setPath(s"${file.filePath}")
.setSize(fileSizes(file.filePath))
.addAllPartitionValues(nativePartitionValues.asJava)
.setLastModifiedNs(0)
.setRange(
pb.FileRange
.newBuilder()
.setStart(file.start)
.setEnd(file.start + file.length)
.build())
.build()
}
pb.FileGroup
.newBuilder()
.addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava)
.build()
}

// check whether native converting is supported
nativeFileSchema
nativePartitionSchema
nativeFileGroups

protected def putJniBridgeResource(
resourceId: String,
broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = {
val sharedConf = broadcastedHadoopConf.value.value
JniBridge.resourcesMap.put(
resourceId,
(location: String) => {
val getFsTimeMetric = metrics("io_time_getfs")
val currentTimeMillis = System.currentTimeMillis()
val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] {
override def run(): FileSystem = {
FileSystem.get(new URI(location), sharedConf)
}
})
getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000)
fs
})
}

protected def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = {
val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(Map.empty)
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
}

def getFilePartitions(): Array[FilePartition]

override protected def doCanonicalize(): SparkPlan = basedHiveScan.canonicalized

override def simpleString(maxFields: Int): String =
s"$nodeName (${basedHiveScan.simpleString(maxFields)})"
}
Loading

0 comments on commit 0a4ad53

Please sign in to comment.