Skip to content

Latest commit

 

History

History
1162 lines (1100 loc) · 45.8 KB

File metadata and controls

1162 lines (1100 loc) · 45.8 KB

第114课:(完整版)SparkStreaming+Kafka+Spark SQL+TopN+Mysql+KafkaOffsetMonitor电商广告点击综合案例实战

标签: sparkIMF


##业务介绍:

##一:Kafka启动流程

  • 启动Kafka-Zookeeper
zookeeper-server-start G:/runtime/kafka_2.11-0.10.0.0/config/zookeeper.properties
  • 启动Kafka
kafka-server-start G:/runtime/kafka_2.11-0.10.0.0/config/server.properties
  • 创建Topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic AdClicked
  • 查看Topic创建是否成功
kafka-topics --list --zookeeper localhost:2181
  • (测试用)可以启动Kafka消费者获取已生成的数据
kafka-console-consumer --zookeeper localhost:2181 --topic AdClicked --from-beginning

##二:MySQL数据库表创建流程

创建sql脚本:

DROP TABLE IF EXISTS `adclicked`;
CREATE TABLE `adclicked` (
  `timestamp` varchar(32) default NULL COMMENT '时间戳,格式:年-月-日(yyyy-MM-dd)',
  `ip` varchar(32) default NULL COMMENT 'IP地址',
  `userID` varchar(32) default NULL COMMENT '用户ID',
  `adID` varchar(32) default NULL COMMENT '广告ID',
  `province` varchar(64) default NULL COMMENT '省市',
  `city` varchar(64) default NULL COMMENT '城市',
  `clickedCount` int(11) default '0' COMMENT '广告点击次数'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='保存用户广告点击';


DROP TABLE IF EXISTS `adclickedtotal`;
CREATE TABLE `adclickedtotal` (
  `timestamp` varchar(32) default NULL COMMENT '时间戳,格式:年-月-日(yyyy-MM-dd)',
  `adID` varchar(32) default NULL COMMENT '广告ID',
  `province` varchar(64) default NULL COMMENT '省市',
  `city` varchar(64) default NULL COMMENT '城市',
  `clickedCount` int(11) default NULL COMMENT '总的点击次数'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='保存总的广告点击';


DROP TABLE IF EXISTS `adprovincetopn`;
CREATE TABLE `adprovincetopn` (
  `timestamp` varchar(32) default NULL COMMENT '时间戳,格式:年-月-日(yyyy-MM-dd)',
  `adID` varchar(32) default NULL COMMENT '广告ID',
  `province` varchar(64) default NULL COMMENT '省市',
  `clickCount` int(11) default NULL COMMENT '广告点击次数'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='保存每个省市的广告Top5排名';


DROP TABLE IF EXISTS `adtrend`;
CREATE TABLE `adtrend` (
  `_date` varchar(32) default NULL COMMENT '时间戳,格式:年-月-日(yyyy-MM-dd)',
  `_hour` int(2) default NULL COMMENT '小时',
  `_minute` int(2) default NULL COMMENT '分钟',
  `adID` varchar(32) default NULL COMMENT '广告ID',
  `clickedCount` int(11) default NULL COMMENT '广告点击次数'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='保存过去30分钟的广告点击趋势图';


DROP TABLE IF EXISTS `blacklisttable`;
CREATE TABLE `blacklisttable` (
  `blackName` varchar(32) NOT NULL COMMENT '黑名单的名称',
  PRIMARY KEY  (`blackName`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='黑名单列表';

##三:启动Kafka生产者MockAdClickedStates向Kafka发送数据

###运行MockAdClickedStates.java

package com.dtspark.sparkapps.streaming.ads;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

/**
 * 第113课:Spark Streaming电商广告点击综合案例实战模拟点击数据的生成和数据表SQL建立
 * Created by Limaoran on 2016/7/18.
 */
public class MockAdClickedStates implements Runnable{
    public static void main(String[] args) {
        Thread thread = new Thread(new MockAdClickedStates());
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private Producer<String,String> producer ;
    private Random random = new Random();
    static final String [] provinces = new String[]{
        "ShanDong","ZheJiang","JiangSu","FuJian"
    };
    static final Map<String,String[]> cities = new HashMap<>();
    static{
        cities.put("ShanDong",new String[]{"JiNan","QingDao","DeZhou"});
        cities.put("ZheJiang",new String[]{"HangZhou","WenZhou","BingBo"});
        cities.put("JiangSu",new String[]{"NanJing","SuZhou","Wuxi"});
        cities.put("FuJian",new String[]{"FuZhou","XiaMen","SanMing"});
    }
    public MockAdClickedStates(){
        /**
         * Kafka相关的基本配置信息
         */
        Properties props = new Properties();
        props.put("serializer.class","kafka.serializer.StringEncoder");
        props.put("metadata.broker.list","localhost:9092");
        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer(config);
    }
    @Override
    public void run() {
        System.out.println("广告点击的基本数据格式:timeatempip、userID、adID、province、city");
        while(true) {
            //广告点击的基本数据格式:timestamp、ip、userID、adID、province、city
            Long timestamp = System.currentTimeMillis();
            String ip = getIP(); //可以采用网络上免费提供的ip库
            String userID = random.nextInt(1000) + "";
            String adID = random.nextInt(100) + "";
            String province = provinces[random.nextInt(provinces.length)];
            String city = cities.get(province)[random.nextInt(cities.get(province).length)];
            String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city;
            System.out.println(clickedAd);
            producer.send(new KeyedMessage<String, String>("AdClicked", clickedAd));
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private String getIP() {
        return ""+(random.nextInt(20)+180)+'.'+(random.nextInt(20)+150)+'.'+random.nextInt(11)+'.'+(random.nextInt(250)+1);
    }
}

###启动Kafka消费者,接收到的数据:

1468829152053   188.166.5.222   854     7       ShanDong        JiNan
1468829152466   183.163.5.221   748     78      ShanDong        QingDao
1468829152566   180.153.10.38   137     27      FuJian  SanMing
1468829152668   183.161.6.180   683     67      JiangSu SuZhou
1468829152772   194.159.5.203   780     41      JiangSu Wuxi
...

##四:启动广告处理程序:AdClickedStreamStates

启动脚本:

spark-submit --class com.dtspark.sparkapps.streaming.ads.AdClickedStreamStates --master local /out/sparkApp.jar

###主程序AdClickedStreamStates.java

package com.dtspark.sparkapps.streaming.ads;

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import com.google.common.base.Optional;
import java.util.*;

/**
 * 第105-112课:Spark Streaming电商广告点击综合案例在线点击统计实战
 * Created by Limaoran on 2016/7/14.
 *
 * 在线处理广告点击流
 * 广告点击的基本数据格式:timestemp、ip、userID、adID、province、city
 */
public class AdClickedStreamStates implements Serializable {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("AdClickedStreamStates")
                .setMaster("local[4]");
//                .setMaster("spark://localhost:7077");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
        jsc.sparkContext().setLogLevel("INFO");
        jsc.checkpoint("file:///z:/checkpoint");

        Map<String,String> kafkaParameters = new HashMap<>();
        kafkaParameters.put("metadata.broker.list", "MasterWin:9092");
        Set<String> topics = new HashSet<>();
        topics.add("AdClicked");
        JavaPairInputDStream<String,String> adClickStream = KafkaUtils.createDirectStream(jsc,
                String.class, String.class, StringDecoder.class, StringDecoder.class,
                kafkaParameters, topics);

        /**
         * 因为要对黑名单进行在线过滤,而数据是在RDD中的,所以必然使用transform这个函数。
         * 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair<String,String>类型的;
         * 另外一个原因是过滤后的数据要进行进一步处理,所以必须是读进来的Kafka数据的原始类型DStream<String,String>
         *
         * 在此:再次说明每个Batch Duration中实际上讲输入的数据就是被一个且仅被一个RDD封装的,
         *  你可以有多个InputStream,但其实在产生Job的时候,这些不同的InputDStream在Batch Duration中
         *  就相当于Spark基于HDFS数据操作的不同文件来源而已罢了。
         */
        //1.过滤黑名单
        JavaDStream<String> filteredadClickedDStream = adClickStream.transform((JavaPairRDD<String, String> rdd) -> {
            /**
             * 在线黑名单过滤思路:
             * 1,从数据库中获取黑名单转换成RDD,即用新的RDD实例封装黑名单数据。
             * 2,然后把代表黑名单的RDD的实例和Batch Duration产生的rdd进行Join操作,
             *      准确的说是进行leftOuterJoin操作,也就是说Batch Duration产生的rdd和代表黑名单的rdd实例
             *      进行leftOuterJoin操作,如果两者都有内容的话,就会使true,否则的话就是false;
             *
             * 我们要留下的是leftOuterJoin操作结果为false;
             */
            //数据来自于查询的黑名单的表,并且映射为<String,Boolean>
            List blackListFromDB = new BlackDBOpt().blackListFromDB();
            /**
             * 黑名单的表中只有userID,但是如果要进行join操作的话,就必须是Key-Value,所以
             *  在这里我们需要基于数据表中的数据产生Key-Value类型的数据集合。
             */
            //数据格式:Tuple2<String,Boolean>
            JavaSparkContext sc =  new JavaSparkContext(rdd.context());
            JavaPairRDD blackListRDD = sc.parallelizePairs(blackListFromDB);
//
            /**
            * 进行操作的时候肯定是基于userID进行join的,所以必须把传入的rdd进行mapToPair操作
            * 转换成为符合格式的rdd
            */
            JavaPairRDD<String, String> rdd2Pair = rdd.mapToPair(tuple2 -> {
                String userID = tuple2._2().split("\t")[2];
                return new Tuple2(userID, tuple2._2());
            });
            JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joinedRDD = rdd2Pair.leftOuterJoin(blackListRDD);
            JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filterRDD = joinedRDD.filter(tuple2 -> {
                if (tuple2._2()._2().isPresent() && true == tuple2._2()._2().get()) {
                    return false;
                } else {
                    return true;
                }
            });
            JavaRDD<String> resultRDD = filterRDD.map(tuple2 -> {
                return tuple2._2()._1();
            });
            return resultRDD;
        });
//        filteredadClickedDStream.print();
        //数据格式:timestemp、ip、userID、adID、province、city
        JavaPairDStream<String,Long> pairDStream = filteredadClickedDStream.mapToPair(line -> {
            String[] splited = line.split("\t");
            String timestamp = splited[0];  //yyyy-MM-dd
            String ip = splited[1];
            String userID = splited[2];
            String adID = splited[3];
            String province = splited[4];
            String city = splited[5];
            //格式化日期:yyyy-MM-dd
            String clickedRecord = dateFormat.format(new Date(Long.parseLong(timestamp))) + "_" + ip + "_" + userID + "_" + adID + "_" + province + "_" + city;
            return new Tuple2(clickedRecord, 1L);
        });
        /**
         * 计算每个Batch Duration中每个User的广告点击量
         * 2.统计广告点击次数
         */
        JavaPairDStream<String,Long> adClickedUsers = pairDStream.reduceByKey((v1,v2)->v1+v2);
        /**
         * 计算出什么叫有效的点击?
         * 1,复杂化的一般都是采用机器学习训练好模型直接在线进行过滤;
         * 2,简单的可以通过一个Batch Duration中的点击次数来判断是不是非法广告点击,但是
         *  实际上讲非法广告点击程序会尽可能的模拟真实的广告点击行为,所以通过一个Batch来判断
         *  是不完整的,我们需要对例如一天(也可以是每一小时)的数据进行判断!
         * 3,比在线机器学习退而求其次的做法如下:
         *  例如:一段时间内,同一个IP(MAC地址)有多个账号访问
         *  例如:可以统计一天内一个用户点击广告的次数,如果一天点击同样的广告超过50次的话,
         *      就列入黑名单。
         *
         * 黑名单有一个重要的特征:动态生成!所以每一个Batch Duration都要考虑是否有新的黑名单加入,此时黑名单需要存储起来。
         *  具体存储在什么地方呢,存储在DB/Redis中即可!
         *
         * 例如邮件系统中的“黑名单”,可以采用Spark Streaming不断的监控每个用户的操作,
         *  如果用户发送邮件的频率超过了设定的值,可以暂时把用户列入“黑名单”,从而阻止用户过度频繁的发送邮件。
         */
        JavaPairDStream<String,Long> filteredClickedInBatch = adClickedUsers.filter(tuple->{
            if(tuple._2() > 1){ //每10秒钟,点击超过1次,判为非法点击
                //更新一下黑名单的数据表
                return false;
            }else{
                return true;
            }
        });
//        filteredClickedInBatch.print();
        filteredClickedInBatch.foreachRDD(rdd -> {
            rdd.foreachPartition(record -> {
                /**
                 * 在这里我们使用数据库连接池的高效读写数据库的方式把数据写入数据库MySQL;
                 * 由于传入的参数是一个Iterator类型的集合,所以为了更加高效的操作,我们需要批量处理;
                 * 例如说一次性插入1000条Record,使用insertBatch或者updateBatch类型的操作
                 * 插入的用户信息可以只包含:userID、adID、clickedCount
                 * 这里面有一个问题:可能出现两条记录的Key是一样的,此时就需要更新累加操作
                 */
                //有效数据插入MySQL中(格式:time、ip、userID、adID、province、city)
                List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();
                while (record.hasNext()) {
                    Tuple2<String, Long> tuple2 = record.next();
                    String[] splited = tuple2._1().split("\t");
                    UserAdClicked userAdClicked = new UserAdClicked();
                    userAdClicked.setTimestamp(splited[0]);
                    userAdClicked.setIp(splited[1]);
                    userAdClicked.setUserID(splited[2]);
                    userAdClicked.setAdID(splited[3]);
                    userAdClicked.setProvince(splited[4]);
                    userAdClicked.setCity(splited[5]);
                    userAdClickedList.add(userAdClicked);
                }
                new BlackDBOpt().saveUserAds(userAdClickedList);
            });
        });
        JavaPairDStream<String,Long> blackListOnHistory = filteredClickedInBatch.filter(tuple -> {
            //广告点击的基本数据格式:timestemp、ip、userID、adID、province、city
            String [] splited = tuple._1().split("_");
            String date = splited[0];
            String userID = splited[2];
            String adID = splited[3];
            /**
             * 接下来根据date、userID、adID等条件去查询用户点击广告的数据表,获得总的点击次数。
             * 这个时候基于点击次数判断是否属于黑名单点击
             */
            int clickedCountTotalToday = 81;
            if(clickedCountTotalToday>50){
                return true;
            }else{
                return false;
            }
        });
        /**
         * 对黑名单的整个RDD进行去重操作!!!
         */
        JavaDStream<String> blackListUniqueUserIDBaseOnHistory = blackListOnHistory.map(tuple2 -> {
            return tuple2._1().split("_")[2];
        });
        JavaDStream<String> distinctDStream = blackListUniqueUserIDBaseOnHistory.transform((JavaRDD<String> rdd) -> {
            return rdd.distinct();
        });

        //下一步写入黑名单数据表中
        distinctDStream.foreachRDD(rdd -> {
            rdd.foreachPartition(record -> {
                /**
                 * 插入的用户信息可以只包含:userID
                 * 此时直接插入黑名单数据表即可。
                 */
                List<String> blackList = new ArrayList<String>();
                while (record.hasNext()) {
                    blackList.add(record.next());
                }
                new BlackDBOpt().saveBlackList(blackList);
            });
        });
        /**
         * 第110课:Spark Streaming电商广告点击综合案例通过updateStateByKey等实现广告点击流量的在线更新统计
         *
         * 广告点击累计动态更新
         */
        JavaPairDStream<String,Long> adClickedDStream110 = filteredadClickedDStream.mapToPair(content -> {
            String[] splited = content.split("\t");
            String timestamp = splited[0];  //yyyy-MM-dd
            String ip = splited[1];
            String userID = splited[2];
            String adID = splited[3];
            String province = splited[4];
            String city = splited[5];
            //格式化日期为:yyyy-MM-dd
            String clickedRecord = dateFormat.format(new Date(Long.parseLong(timestamp))) + "_" + adID + "_" + province + "_" + city;
            return new Tuple2(clickedRecord, 1L);
        });
        /**
         * 广告点击累加动态更新,每个updateStateByKey都会在Batch Duration的时间间隔的基础上进行
         *  更高点击次数的更新,更新之后我们一般都会持久化到外部存储设备上,在这里我们存储到MySQL数据库中。
         */
        JavaPairDStream<String,Long> updateStateByKeyDStream110 = adClickedDStream110.updateStateByKey((List<Long> v1, Optional<Long> v2) -> {
            /**
             * v1:代表的是当前的Key在当前的Batch Duration中出现的次数的集合,例如{1,1,1,1}
             * v2:代表当前Key在以前的Batch Duration中积累下来的结果。
             */
            Long count = 0L;
            if (v2.isPresent()) {
                count = v2.get();
            }
            for (Long one : v1) {
                count += one;
            }
            //com.google.common.base.Optional
            return Optional.of(count);
        });
        updateStateByKeyDStream110.foreachRDD(rdd -> {
            rdd.foreachPartition(record -> {
                /**
                 * 写入数据格式:timestamp、adID、province、city
                 */
                List<AdClicked> listAdClicked = new ArrayList<AdClicked>();
                while (record.hasNext()) {
                    Tuple2<String, Long> tuple2 = record.next();
                    String[] splited = tuple2._1().split("_");
                    AdClicked adClicked = new AdClicked();
                    adClicked.setTimestamp(splited[0]); //yyyy-MM-dd
                    adClicked.setAdID(splited[1]);
                    adClicked.setProvince(splited[2]);
                    adClicked.setCity(splited[3]);
                    adClicked.setClickedCount(tuple2._2());
                    listAdClicked.add(adClicked);
                }
                //保存insert和update
                new BlackDBOpt().saveAds(listAdClicked);
            });
        });
        /**
         * 第111课:Spark Streaming电商广告点击综合案例在线实现每个Province点击排名Top5广告
         *
         * 对广告点击进行TopN计算,计算出每天每个省份的Top5排名的广告:
         * 因为我们直接对RDD进行操作,所以使用了transform算子
         */
        updateStateByKeyDStream110.transform((JavaPairRDD<String,Long> rdd)->{
            //timestamp + "_" + adID + "_" + province/省市
            JavaPairRDD<String,Long> pairedRDD = rdd.mapToPair(tuple2 -> {
                String[] splited = tuple2._1().split("_");
                String timestamp = splited[0];  //yyyy-MM-dd
                String adID = splited[1];
                String province = splited[2];
                String clickedRecord = timestamp + "_" + adID + "_" + province;
                return new Tuple2(clickedRecord, 1L);
            });
            JavaPairRDD<String,Long> reducedRDD = pairedRDD.reduceByKey((Long v1, Long v2) -> v1 + v2);
            JavaRDD<Row> rowRDD = reducedRDD.map(tuple -> {
                String[] splited = tuple._1().split("_");
                return RowFactory.create(splited[0], splited[1], splited[2], tuple._2());
            });
            StructType structType = DataTypes.createStructType(Arrays.asList(
                    DataTypes.createStructField("timestamp", DataTypes.StringType, true),
                    DataTypes.createStructField("adID", DataTypes.StringType, true),
                    DataTypes.createStructField("province", DataTypes.StringType, true),
                    DataTypes.createStructField("clickedCount", DataTypes.StringType, true)
            ));
            //HiveContext
            HiveContext sqlContext = new HiveContext(rdd.context());
            DataFrame df = sqlContext.createDataFrame(rowRDD,structType);
            df.registerTempTable("topNTableSource");
            DataFrame result = sqlContext.sql("SELECT * FROM (SELECT timestamp,adID,province,clickedCount, " +
                    "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickedCount DESC) rank " +
                    "FROM topNTableSource ) subquery WHERE rank<=5");

            return result.toJavaRDD();
        }).foreachRDD(rddRow -> {
            rddRow.foreachPartition(row -> {
                List<AdProvinceTopN> adProvinceTopNList = new ArrayList<AdProvinceTopN>();
                while (row.hasNext()) {
                    Row r = row.next();
                    AdProvinceTopN item = new AdProvinceTopN();
                    item.setTimestamp(r.getString(0));  //日期:yyyy-MM-dd
                    item.setAdID(r.getString(1));
                    item.setProvince(r.getString(2));
                    item.setClickedCount(r.getLong(3));
                    adProvinceTopNList.add(item);
                }
                //插入或更新
                new BlackDBOpt().saveAdProvinceTopN(adProvinceTopNList);
            });
        });
        /**
         * 第112课:Spark Streaming电商广告点击综合案例实战实现广告点击Trend趋势计算实战
         *
         * 计算过去30分钟内广告点击趋势
         */
        JavaPairDStream<String,AdTrend> adClickedDStream112 = filteredadClickedDStream.mapToPair(content->{
            String[]splited = content.split("\t");
            String adID = splited[3];
            //后续需要重构代码实现 时间戳和分钟的转换提取,此处需要提取出该广告点击的分钟单位
            String time = splited[0];
            //返回格式:当天_adID、AdTrend
            Date date = new Date(Long.parseLong(time));
            AdTrend bean = new AdTrend();
            bean.set_date(dateFormat.format(date));    //年月日
            bean.set_hour(date.getHours()); //小时
            bean.set_minute(date.getMinutes()); //分钟
            bean.setAdID(adID);
            bean.setClickedCount(1L);
            return new Tuple2(bean.get_date()+"_"+adID,bean);
        });
        //每隔5分钟计算30分钟内广告的点击
        JavaPairDStream<String,AdTrend> resultDStream112 = adClickedDStream112.reduceByKeyAndWindow(
                (AdTrend v1,AdTrend v2)->{ v1.setClickedCount(v1.getClickedCount()+v2.getClickedCount());return v1;},
                (AdTrend v1,AdTrend v2)->{ v1.setClickedCount(v1.getClickedCount()-v2.getClickedCount());return v1;},
                Durations.minutes(30),Durations.minutes(5));

        //物化到外部存储系统
        resultDStream112.foreachRDD(rdd->{
            rdd.foreachPartition(partition->{

                List<AdTrend> listAdTrend = new ArrayList();
                while(partition.hasNext()){
//                    Tuple2<String,Long> record = partition.next();
//                    String [] splited = record._1().split("_");
//                    String time = splited[0];   //格式应该保存的是System.currentTimeMillis()
//                    String adID = splited[1];
                    /**
                    * 在插入数据到数据库的时候具体需要哪些字段?time、adID、clickedCount
                    * 而我们通过J2EE技术进行趋势绘图的时候肯定是需要年、月、日、时、分这几个维度的,
                    *   所以我们在这里需要年月日、小时、分钟这些时间维度。
                    */
//                    Date date = new Date(Long.parseLong(time));
                    AdTrend bean = partition.next()._2();
//                    bean.set_date(dateFormat.format(date));    //年月日
//                    bean.set_hour(date.getHours()); //小时
//                    bean.set_minute(date.getMinutes()); //分钟
//                    bean.setAdID(adID);
//                    bean.setClickedCount(record._2());
                    listAdTrend.add(bean);
                }
                //保存数据库
                new BlackDBOpt().saveAdTrend(listAdTrend);
            });
        });

        jsc.start();
        jsc .awaitTermination();
    }
    static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
}

###SQL封装 BlackDBOpt.java

package com.dtspark.sparkapps.streaming.ads;

import scala.Tuple2;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
 * 第109课:Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
 * Created by Limaoran on 2016/7/15.
 *
 * 有关黑名单的数据库操作
 */
public class BlackDBOpt {
    /**
     * 获取黑名单列表
     * @return 数据格式:Tuple2<String,Boolean>
     */
    public List<Tuple2<String,Boolean>> blackListFromDB(){
        JDBCWrapper wrapper = JDBCWrapper.getJDBCInstance();
        List<Tuple2<String,Boolean>> list = new ArrayList<>();
        wrapper.doQuery("select * from blacklistTable", new Object[]{}, (ResultSet resultSet)->{
            while(resultSet.next()){
                list.add(new Tuple2(resultSet.getString("blackName"),true));
            }
            resultSet.close();
        });
        return list;
    }

    /**
     * 保存黑名单
     * @param lists
     * @return
     */
    public int[] saveBlackList(List<String> lists){
        if(lists.size()<1) return null;
//        System.out.println(System.currentTimeMillis()+":保存黑名单");
        JDBCWrapper wrapper = JDBCWrapper.getJDBCInstance();
        List<Object[]> paramList = new ArrayList<>();
        for(String str: lists){
            paramList.add(new String[]{str});
        }
        return wrapper.doBatch("INSERT INTO blacklistTable VALUES (?)",paramList);
    }

    /**
     * 保存用户广告点击<br/>
     * 流程:<br/>
     *  判断当天用户是否点击过广告:<br/>
     *      1.如果点击过,则更新点击次数;<br/>
     *      2.否则新增记录;<br/>
     * @param list
     */
    public void saveUserAds(List<UserAdClicked> list){
        if(list.size()<1) return;
//        System.out.println(System.currentTimeMillis()+":保存用户广告点击");
        JDBCWrapper wrapper = JDBCWrapper.getJDBCInstance();

        List<UserAdClicked> inserting = new ArrayList<>();
        List<UserAdClicked> updating = new ArrayList<>();
        //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
        for(UserAdClicked clicked:list) {
            wrapper.doQuery("SELECT count(1) FROM adClicked WHERE timestamp=? and userID=? and adID=?",
                    new Object[]{clicked.getTimestamp(),clicked.getUserID(),clicked.getAdID()},resultSet->{
                if(resultSet.next()){
                    long count = resultSet.getLong(1);
                    if(count>0){
                        //后续更新
                        clicked.setCount(count+1);
                        updating.add(clicked);
                    }else{  //后续插入
                        clicked.setCount(1L);
                        inserting.add(clicked);
                    }
                }
            });
        }
        //插入
        wrapper.doBatch("INSERT INTO adClicked VALUES(?,?,?,?,?,?,?)",inserting,(UserAdClicked bean,PreparedStatement pst)->{
            pst.setString(1,bean.getTimestamp());
            pst.setString(2,bean.getIp());
            pst.setString(3,bean.getUserID());
            pst.setString(4,bean.getAdID());
            pst.setString(5,bean.getProvince());
            pst.setString(6,bean.getCity());
            pst.setLong(7, bean.getCount());
        });
        //更新
//        wrapper.doBatch("UPDATE adClicked SET COUNT=COUNT+1 WHERE timestamp=? and userID=? and adID=?",updating,(UserAdClicked bean,PreparedStatement pst)->{
        wrapper.doBatch("UPDATE adClicked SET COUNT=? WHERE timestamp=? and userID=? and adID=?",updating,(UserAdClicked bean,PreparedStatement pst)->{
            pst.setLong(1,bean.getCount());
            pst.setString(2,bean.getTimestamp());
            pst.setString(3,bean.getUserID());
            pst.setString(4, bean.getAdID());
        });
    }
    /**
     * 保存总的广告点击<br/>
     * 流程:<br/>
     *  判断当天是否记录过该广告:<br/>
     *      1.如果记录过,则更新点击次数;<br/>
     *      2.否则新增记录;<br/>
     * @param list
     */
    public void saveAds(List<AdClicked> list){
        if(list.size()<1) return;
//        System.out.println(System.currentTimeMillis()+":保存总的广告点击");
        JDBCWrapper wrapper = JDBCWrapper.getJDBCInstance();

        List<AdClicked> inserting = new ArrayList<>();
        List<AdClicked> updating = new ArrayList<>();
        //adclicked 表的字段:timestamp、ip、userID、adID、province、city、clickedCount
        for(AdClicked clicked:list) {
            wrapper.doQuery("SELECT count(1) FROM adclickedTotal WHERE timestamp=? and adID=? and province=? and city=?",
                    new Object[]{clicked.getTimestamp(),clicked.getAdID(),clicked.getProvince(),clicked.getCity()},resultSet->{
                        if(resultSet.next()){
                            long count = resultSet.getLong(1);
                            if(count>0){
                                //后续更新
                                updating.add(clicked);
                            }else{  //后续插入
                                inserting.add(clicked);
                            }
                        }
                    });
        }
        //插入
        wrapper.doBatch("INSERT INTO adclickedTotal VALUES(?,?,?,?,?)",inserting,(AdClicked bean,PreparedStatement pst)->{
            pst.setString(1,bean.getTimestamp());
            pst.setString(2,bean.getAdID());
            pst.setString(3,bean.getProvince());
            pst.setString(4,bean.getCity());
            pst.setLong(5,bean.getClickedCount());
        });
        //更新
        wrapper.doBatch("UPDATE adclickedTotal SET clickedCount=? WHERE timestamp=? and adID=? and province=? and city=?",updating,(AdClicked bean,PreparedStatement pst)->{
            pst.setLong(1, bean.getClickedCount());
            pst.setString(2, bean.getTimestamp());
            pst.setString(3, bean.getAdID());
            pst.setString(4,bean.getProvince());
            pst.setString(5,bean.getCity());
        });
    }

    /**
     * 第111课:保存每个省市的广告Top5排名
     * @param list
     */
    public void saveAdProvinceTopN(List<AdProvinceTopN> list){
        if(list.size()<1) return;
//        System.out.println(System.currentTimeMillis()+":保存每个省市的广告Top5排名");
        JDBCWrapper wrapper = JDBCWrapper.getJDBCInstance();
        //清空今天的数据,然后插入
        //adprovinceTopN 表的字段:timestamp、adID、province、clickedCount
        //删除的时候,根据 日期和省 删除数据
        String sqlDelete = "DELETE FROM adprovinceTopN WHERE timestamp=? and province=? ";
        Set<String> provinces = new HashSet<>();
//        String timestamp = list.get(0).getTimestamp();
        for(int i=0;i<list.size();i++){
            provinces.add(list.get(i).getTimestamp()+'_'+list.get(i).getProvince());
        }
        List<Object[]> listDelete = new ArrayList<>();
        for(String province:provinces){
            listDelete.add(new Object[]{province.split("_")});
        }
        wrapper.doBatch(sqlDelete, listDelete);
        //插入
        wrapper.doBatch("INSERT INTO adprovinceTopN VALUES(?,?,?,,?)",list,(AdProvinceTopN bean,PreparedStatement pst)->{
            pst.setString(1,bean.getTimestamp());
            pst.setString(2,bean.getAdID());
            pst.setString(3,bean.getProvince());
            pst.setLong(4, bean.getClickedCount());
        });
    }
    /**
     * 第112课:保存过去30分钟的广告点击趋势图
     * @param list
     */
    public void saveAdTrend(List<AdTrend> list){
        if(list.size()<1) return;
//        System.out.println(System.currentTimeMillis()+":保存过去30分钟的广告点击趋势图");
        JDBCWrapper wrapper = JDBCWrapper.getJDBCInstance();
        //清空今天的数据,然后插入
        //先判断是否有数据
        List<AdTrend> inserting = new ArrayList<>();
        List<AdTrend> updating = new ArrayList<>();
        //adtrend 表的字段:timestamp、_hour、_minite、adID、clickedCount
        for(AdTrend clicked:list) {
            wrapper.doQuery("SELECT count(1) FROM adtrend WHERE _date=? and _hour=? and _minute=? and adID=?",
                    new Object[]{clicked.get_date(), clicked.get_hour(), clicked.get_minute(), clicked.getAdID()}, resultSet -> {
                        if (resultSet.next()) {
                            long count = resultSet.getLong(1);
                            if (count > 0) {
                                //后续更新
                                updating.add(clicked);
                            } else {  //后续插入
                                inserting.add(clicked);
                            }
                        }
                    });
        }
        //插入
        wrapper.doBatch("INSERT INTO adtrend VALUES(?,?,?,?,?)",inserting,(AdTrend bean,PreparedStatement pst)->{
            pst.setString(1, bean.get_date());
            pst.setInt(2, bean.get_hour());
            pst.setInt(3, bean.get_minute());
            pst.setString(4,bean.getAdID());
            pst.setLong(5,bean.getClickedCount());
        });
        //更新
        wrapper.doBatch("UPDATE adtrend SET clickedCount=? WHERE _date=? and _hour=? and _minute=? and adID=?",updating,(AdTrend bean,PreparedStatement pst)->{
            pst.setLong(1, bean.getClickedCount());
            pst.setString(2, bean.get_date());
            pst.setInt(3, bean.get_hour());
            pst.setInt(4,bean.get_minute());
            pst.setString(5, bean.getAdID());
        });
    }
}

###DB处理程序 JDBCWrapper.java

package com.dtspark.sparkapps.streaming.ads;

import java.io.Serializable;
import java.sql.*;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 第107课:JDBC 包装器(基于MySQL)
 * Created by Limaoran on 2016/7/15.
 */
public class JDBCWrapper implements Serializable {
    static{
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    private static JDBCWrapper jdbcInstance;
    public static JDBCWrapper getJDBCInstance(){
        if(jdbcInstance==null){
            synchronized (JDBCWrapper.class){
                if(jdbcInstance==null){
                    jdbcInstance = new JDBCWrapper();
                }
            }
        }
        return jdbcInstance;
    }
    private LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<>();
    private JDBCWrapper(){
        try {
            //"jdbc:mysql://localhost:3306/spark?user=root&password=root"
            for(int i=0;i<5;i++) {
                Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/sparkstreaming", "root", "root");
                dbConnectionPool.put(con);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public Connection getConnection(){
        while(dbConnectionPool.size()==0){ //如果池子里没有,则等待一会
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return dbConnectionPool.poll();
    }
    public void returnConnectioin(Connection con){
        try {
            dbConnectionPool.put(con);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public int[] doBatch(String sqlText,List<Object[]> paramsList){
        Connection con = getConnection();
        PreparedStatement pst = null;
        int[] result = null;
        try {
            con.setAutoCommit(false);
            pst = con.prepareStatement(sqlText);
            for(Object[] parameters:paramsList){
                for(int i=0;i<parameters.length;i++){
                    pst.setObject(i+1,parameters[i]);
                }
                pst.addBatch();
            }
            result = pst.executeBatch();
            con.commit();
//            pst.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            if(pst!=null){
                try {
                    pst.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(con!=null) {
                returnConnectioin(con);
            }
        }
        return result;
    }
    public<T> int[] doBatch(String sqlText,List<T> paramsList,ExecuteParamCallBack<T> callBack){
        Connection con = getConnection();
        PreparedStatement pst = null;
        int[] result = null;
        try {
            con.setAutoCommit(false);
            pst = con.prepareStatement(sqlText);
            for(T t:paramsList){
                callBack.setParameters(t,pst);
                pst.addBatch();
            }
            result = pst.executeBatch();
            con.commit();
//            pst.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            if(pst!=null){
                try {
                    pst.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(con!=null) {
                returnConnectioin(con);
            }
        }
        return result;
    }
    public void doQuery(String sqlText,Object[] params,ExecuteCallBack callBack){
        Connection con = getConnection();
        PreparedStatement pst = null;
        ResultSet result = null;
        try {
            pst = con.prepareStatement(sqlText);
            for(int i=0;i<params.length;i++){
                pst.setObject(i+1,params[i]);
            }
            result = pst.executeQuery();
            callBack.resultCallBack(result);
//            pst.close();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(pst!=null){
                try {
                    pst.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(con!=null) {
                returnConnectioin(con);
            }
        }
    }
    public interface ExecuteCallBack{
        void resultCallBack(ResultSet result) throws SQLException;
    }
    public interface ExecuteParamCallBack<T>{
        void setParameters(T params,PreparedStatement pst)throws SQLException;
    }
}

###JavaBean AdClicked.java

package com.dtspark.sparkapps.streaming.ads;

import java.io.Serializable;

/**
 * 第110课:广告点击Java Bean
 * Created by Limaoran on 2016/7/16.
 */
public class AdClicked implements Serializable{
    private String timestamp;
    private String adID;
    private String province;
    private String city;
    private Long clickedCount = 0L;

    public Long getClickedCount() {
        return clickedCount;
    }
    public void setClickedCount(Long clickedCount) {
        this.clickedCount = clickedCount;
    }
    public String getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
}

AdProvinceTopN.java

package com.dtspark.sparkapps.streaming.ads;

import java.io.Serializable;

/**
 * 第111课:在线实现每个Province点击排名Top5广告 Java Bean
 * Created by Limaoran on 2016/7/17.
 */
public class AdProvinceTopN  implements Serializable {
    private String timestamp;
    private String adID;
    private String province;
    private Long clickedCount;
    public String getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public Long getClickedCount() {
        return clickedCount;
    }
    public void setClickedCount(Long clickedCount) {
        this.clickedCount = clickedCount;
    }
}

AdTrend.java

package com.dtspark.sparkapps.streaming.ads;

import java.io.Serializable;

/**
 * 第112课 用于保存过去30分钟内的广告点击趋势图使用的Bean
 * Created by Limaoran on 2016/7/18.
 */
public class AdTrend implements Serializable{
    private String _date;
    private int _hour;
    private int _minute;
    private String adID;
    private Long clickedCount;
    public String get_date() {
        return _date;
    }
    public void set_date(String _date) {
        this._date = _date;
    }
    public int get_hour() {
        return _hour;
    }
    public void set_hour(int _hour) {
        this._hour = _hour;
    }
    public int get_minute() {
        return _minute;
    }
    public void set_minute(int _minute) {
        this._minute = _minute;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public Long getClickedCount() {
        return clickedCount;
    }
    public void setClickedCount(Long clickedCount) {
        this.clickedCount = clickedCount;
    }
}

UserAdClicked.java

package com.dtspark.sparkapps.streaming.ads;

import java.io.Serializable;

/**
 * 第109课:用户广告点击Java Bean
 * Created by Limaoran on 2016/7/15.
 */
public class UserAdClicked  implements Serializable {
    private String timestamp;
    private String ip;
    private String userID;
    private String adID;
    private String province;
    private String city;
    private Long count = 0L;

    public String getTimestamp() {
        return timestamp;
    }
    public Long getCount() {
        return count;
    }
    public void setCount(Long count) {
        this.count = count;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getUserID() {
        return userID;
    }
    public void setUserID(String userID) {
        this.userID = userID;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
}

##五:观察数据库结果