标签: sparkIMF
##代码实战
SparkSQLWindowFunction.scala
package com.dt.spark.sparkapps.sql
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
/**
* 第71课:Spark SQL窗口函数解密与实战
* 运行sh:spark-submit --class com.dt.spark.sparkapps.sql.SparkSQLWindowFunction --master local /out/sparkApp.jar
* Created by Limaoran on 2016/7/6.
*/
object SparkSQLWindowFunction {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkSQLWindowFunction").setMaster("local")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use hive") //使用名称为hive的数据库,我们接下来所有的表的操作都在这个库中
/**
* 如果要创建的表存在的话就删除,然后创建我们要导入数据的表
*/
hiveContext.sql("DROP TABLE IF EXISTS scores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INT) " +
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\n' ")
//把要处理的数据导入到Hive的表中
hiveContext.sql("LOAD DATA LOCAL INPATH 'G:/source/sparkApp/src/main/resources/topNGroup.txt' INTO TABLE scores")
/**
* 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序:
* PARTITION BY:指定窗口函数分组的KEY;
* ORDER BY:分组后进行排序
*/
val result = hiveContext.sql("SELECT name,score " +
"FROM (" +
"SELECT name,score,row_number() over(PARTITION BY name ORDER BY score DESC) rank " +
"FROM scores" +
") sub_scores " +
"WHERE rank <=4 ").repartition(1)
//repartition把并行度设为1,只生成一个文件,不然默认有200个
result.show()
//把数据保存到数据仓库中
// hiveContext.sql("DROP TABLE IF EXISTS sortedresultscores")
result.write.mode(SaveMode.Overwrite).saveAsTable("sortedresultscores")
}
}
###测试源数据
Spark 100
Hadoop 65
Spark 99
Hadoop 61
Spark 95
Hadoop 60
Spark 98
Hadoop 69
Spark 91
Hadoop 64
Spark 89
Hadoop 98
Spark 88
Hadoop 99
Spark 68
Hadoop 60
Spark 79
Hadoop 97
Spark 69
Hadoop 96
###测试结果:
+------+-----+
| name|score|
+------+-----+
| Spark| 100|
| Spark| 99|
| Spark| 98|
| Spark| 95|
|Hadoop| 99|
|Hadoop| 98|
|Hadoop| 97|
|Hadoop| 96|
+------+-----+