标签: sparkIMF
##在Hive中创建数据表
- use hive;
- create table userLogs(logdate String,time bigint,userID bigint,pageID bigint,channel String,action String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n';
- desc userLogs;
- load data local inpath 'E:/HiveData/data.txt' into table userLogs;
##代码实战
SparkSQLUserLogsOps.java
package com.dtspark.sparkapps.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
/**
* 第76课:Spark SQL实战用户日志的输入导入Hive及SQL计算PV实战
* 执行脚本:spark-submit --class com.dtspark.sparkapps.sql.SparkSQLUserLogsOps --master local /out/sparkApp.jar
* Created by Limaoran on 2016/7/2.
*/
public class SparkSQLUserLogsOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SparkSQLwithJoin");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext sqlContext = new HiveContext(sc);
String date = "2016-07-06";
pvStatisic(sqlContext,date);
sc.stop();
}
static void pvStatisic(HiveContext hiveContext,String date){
hiveContext.sql("use hive");
String sql = "SELECT logdate,pageID,count(*) pv "+
"FROM userLogs " +
"WHERE action='View' and logdate='"+date+"' "+
"GROUP BY logdate,pageID "+
"ORDER BY pv DESC";
DataFrame logDF = hiveContext.sql(sql);
logDF.show();
}
}
##执行结果:
| logdate|pageID| pv|
+----------+------+---+
|2016-07-06| 714| 4|
|2016-07-06| 1779| 4|
|2016-07-06| 5000| 3|
|2016-07-06| 3358| 3|
|2016-07-06| 8190| 3|
|2016-07-06| 9731| 3|
|2016-07-06| 378| 3|
|2016-07-06| 4179| 3|
|2016-07-06| 8584| 3|
|2016-07-06| 5708| 3|
|2016-07-06| 2155| 3|
|2016-07-06| 2172| 3|
|2016-07-06| 6977| 3|
|2016-07-06| 4914| 3|
|2016-07-06| 1982| 3|
|2016-07-06| 4984| 3|
|2016-07-06| 2128| 3|
|2016-07-06| 1736| 3|
|2016-07-06| 6745| 3|
|2016-07-06| 1564| 3|
+----------+------+---+