标签: sparkIMF
Spark适合做在线个性化交互!
未来的世界所有的系统一定是实时在线交互!
##代码实战
###AdClickedStreamStates.java
/**
* 第112课:Spark Streaming电商广告点击综合案例实战实现广告点击Trend趋势计算实战
*
* 计算过去30分钟内广告点击趋势
*/
JavaPairDStream<String,Long> adClickedDStream112 = filteredadClickedDStream.mapToPair(content->{
String[]splited = content.split("\t");
String adID = splited[3];
//TODO 后续需要重构代码实现 时间戳和分钟的转换提取,此处需要提取出该广告点击的分钟单位
String time = splited[0];
return new Tuple2(time+"_"+adID,1L);
});
//每隔5分钟计算30分钟内广告的点击
JavaPairDStream<String,Long> resultDStream112 = adClickedDStream112.reduceByKeyAndWindow((v1,v2)->v1+v2,(v1,v2)->v1-v2,Durations.minutes(30),Durations.minutes(5));
//物化到外部存储系统
resultDStream112.foreachRDD(rdd->{
rdd.foreachPartition(partition->{
List<AdTrend> listAdTrend = new ArrayList();
while(partition.hasNext()){
Tuple2<String,Long> record = partition.next();
String [] splited = record._1().split("_");
String time = splited[0]; //TODO time:格式应该保存的是System.currentTimeMillis()
String adID = splited[1];
/**
* 在插入数据到数据库的时候具体需要哪些字段?time、adID、clickedCount
* 而我们通过J2EE技术进行趋势绘图的时候肯定是需要年、月、日、时、分这几个维度的,
* 所以我们在这里需要年月日、小时、分钟这些时间维度。
*/
Date date = new Date(Long.parseLong(time));
AdTrend bean = new AdTrend();
bean.set_date(time); //年月日
bean.set_hour(date.getHours()); //小时
bean.set_minite(date.getMinutes()); //分钟
bean.setAdID(adID);
bean.setClickedCount(record._2());
listAdTrend.add(bean);
}
//保存数据库
new BlackDBOpt().saveAdTrend(listAdTrend);
});
});
###BlackDBOpt.java
/**
* 第112课:保存过去30分钟的广告点击趋势图
* @param list
*/
public void saveAdTrend(List<AdTrend> list){
JDBCWrapper wrapper = JDBCWrapper.getJDBCInstance();
//清空今天的数据,然后插入
//先判断是否有数据
List<AdTrend> inserting = new ArrayList<>();
List<AdTrend> updating = new ArrayList<>();
//adtrend 表的字段:timestamp、_hour、_minite、adID、clickedCount
for(AdTrend clicked:list) {
wrapper.doQuery("SELECT count(1) FROM adtrend WHERE _date=? and _hour=? and _minute=? and adID=?",
new Object[]{clicked.get_date(), clicked.get_hour(), clicked.get_minite(), clicked.getAdID()}, resultSet -> {
if (resultSet.next()) {
long count = resultSet.getLong(1);
if (count > 0) {
//后续更新
updating.add(clicked);
} else { //后续插入
inserting.add(clicked);
}
}
});
}
//插入
wrapper.doBatch("INSERT INTO adtrend VALUES(?,?,?,?,?)",inserting,(AdTrend bean,PreparedStatement pst)->{
pst.setString(1, bean.get_date());
pst.setInt(2, bean.get_hour());
pst.setInt(3, bean.get_minite());
pst.setString(4,bean.getAdID());
pst.setLong(5,bean.getClickedCount());
});
//更新
wrapper.doBatch("UPDATE adtrend SET clickedCount=? WHERE _date=? and _hour=? and _minute=? and adID=?",updating,(AdTrend bean,PreparedStatement pst)->{
pst.setLong(1, bean.getClickedCount());
pst.setString(2, bean.get_date());
pst.setInt(3, bean.get_hour());
pst.setInt(4,bean.get_minite());
pst.setString(5, bean.getAdID());
});
}
###AdTrend.java
package com.dtspark.sparkapps.streaming.ads;
/**
* 第112课 用于保存过去30分钟内的广告点击趋势图使用的Bean
* Created by Limaoran on 2016/7/18.
*/
public class AdTrend {
private String _date;
private int _hour;
private int _minite;
private String adID;
private Long clickedCount;
public String get_date() {
return _date;
}
public void set_date(String _date) {
this._date = _date;
}
public int get_hour() {
return _hour;
}
public void set_hour(int _hour) {
this._hour = _hour;
}
public int get_minite() {
return _minite;
}
public void set_minite(int _minite) {
this._minite = _minite;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}