Skip to content

Latest commit

 

History

History
134 lines (110 loc) · 4.22 KB

File metadata and controls

134 lines (110 loc) · 4.22 KB

第72课:Spark SQL UDF和UDAF解密与实战

标签: sparkIMF


##代码实战

SparkSQLUDFUDAF.scala

package com.dt.spark.sparkapps.sql

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
 * 第72课:Spark SQL UDF和UDAF解密与实战
 * Created by Limaoran on 2016/7/6.
 * 通过案例实战Spark SQL下的UDF和UDAF的具体使用
 * UDF:User Defined Function,用户自定义的函数,函数的输入是一条具体的数据记录,实现上讲就是普通的Scala函数
 * UDAF:User Defined Aggregation Function,用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作。
 *
 * 实质上讲,例如说UDF会被Spark SQL中的Catalyst封装成为Expression,
 *    最终会通过eval方法来计算输入的数据Row(此处的Row和DataFrame中的Row没有任何关系)
 */
object SparkSQLUDFUDAF {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkSQLUDFUDAF").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    //模拟实际使用的数据
    val bigData = Array( "Spark","Hadoop","Spark","Hadoop","Spark","Spark","Spark","Spark","Hadoop","Hadoop")

    /**
     * 基于提供的数据创建DataFrame
     */
    val bigDataRDD = sc.parallelize(bigData)
    val bigDataRowRDD = bigDataRDD.map(item=>Row(item))

    val structType = StructType(Array(StructField("word",StringType,true)))
    val bigDataDF = sqlContext.createDataFrame(bigDataRowRDD,structType)
    bigDataDF.registerTempTable("bigDataTable") //注册为临时表
    /**
     * 通过SQLContext注册UDF,在Scala 2.10.x版本UDF函数最多可以接受22个输入参数,2.11没有这个限制!
     */
    sqlContext.udf.register("computeLength", (input:String)=>input.length)

    //直接在SQL语句中使用UDF,就像使用SQL自带的内部函数一样
    sqlContext.sql("select word,computeLength(word) as length from bigDataTable").show()

    sqlContext.udf.register("wordCount",new MyUDAF)

    //
    sqlContext.sql("select word,wordCount(word) as count,computeLength(word) as length from bigDataTable group by word").show()

    //这是为了进入WebUI查看详细流程:http://localhost:4040
    Thread.sleep(20*60*1000)
  }
}

/**
 * 按照模板实现UDAF
 */
class MyUDAF extends UserDefinedAggregateFunction{
  /**
   * 该方法指定具体输入数据的类型
   * @return
   */
  override def inputSchema: StructType = StructType(Array(StructField("name",StringType,true)))

  /**
   * 在进行聚合操作的时候所要处理的结果的类型
   * @return
   */
  override def bufferSchema: StructType = StructType(Array(StructField("count",IntegerType,true)))

  /**
   * 指定UDAF函数计算后返回的结果类型
   * @return
   */
  override def dataType: DataType = IntegerType

  /**
   * 确保一致性,一般都用true
   * @return
   */
  override def deterministic: Boolean = true

  /**
   * 在Aggregate之前每组数据的初始化结果
   * @param buffer
   */
  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0)=0

  /**
   * 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
   * 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner
   * @param buffer
   * @param input
   */
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0)+1
  }

  /**
   * 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作
   * @param buffer1
   * @param buffer2
   */
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  }

  /**
   * 返回UDAF最后的计算结果
   * @param buffer
   * @return
   */
  override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
}

##扩展源码

  • SparkSQLParser
  • AbstractSparkSQLParser
  • UDFRegistration
  • FunctionRegistry:内置函数也可以在这里查到!
  • SimpleFunctionRegistry
  • ScalaUDF