Skip to content

Latest commit

 

History

History
201 lines (183 loc) · 9.99 KB

File metadata and controls

201 lines (183 loc) · 9.99 KB

第108课:Spark Streaming电商广告点击综合案例动态黑名单过滤真正的实现代码

标签: sparkIMF


大数据的开发和传统的JavaEE开发、桌面版本的软件开发或者说Android移动开发有很大的不同点:

  • 大数据开发有大约40%的时间放在性能调优,例如Shuffle等。

实际编码最多占30%的时间,需求分析、数据模型、业务场景、技术架构占大约30%的时间,而性能调优、数据倾斜至少占40%的时间。

##代码实战

AdClickedStreamStates.java

package com.dtspark.sparkapps.streaming.ads;

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.*;

/**
 * 第105-108课:Spark Streaming电商广告点击综合案例在线点击统计实战
 * Created by Limaoran on 2016/7/14.
 *
 * 在线处理广告点击流
 * 广告点击的基本数据格式:timestemp、ip、userID、adID、province、city
 */
public class AdClickedStreamStates {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("AdClickedStreamStates")
                .setMaster("local[4]");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

        Map<String,String> kafkaParameters = new HashMap<>();
        kafkaParameters.put("metadata.broker.list", "MasterWin:9092");
        Set<String> topics = new HashSet<>();
        topics.add("AdClicked");
        JavaPairInputDStream<String,String> adClickStream = KafkaUtils.createDirectStream(jsc,
                String.class, String.class, StringDecoder.class, StringDecoder.class,
                kafkaParameters, topics);
        /**
         * 因为要对黑名单进行在线过滤,而数据是在RDD中的,所以必然使用transform这个函数。
         * 但是在这里我们必须使用transformToPair,原因是读取进来的Kafka的数据是Pair<String,String>类型的;
         * 另外一个原因是过滤后的数据要进行进一步处理,所以必须是读进来的Kafka数据的原始类型DStream<String,String>
         *
         * 在此:再次说明每个Batch Duration中实际上讲输入的数据就是被一个且仅被一个RDD封装的,
         *  你可以有多个InputStream,但其实在产生Job的时候,这些不同的InputDStream在Batch Duration中
         *  就相当于Spark基于HDFS数据操作的不同文件来源而已罢了。
         */
        JavaDStream<String> filteredadClickedDStream = adClickStream.transform((JavaPairRDD<String, String> rdd) -> {
            /**
             * 在线黑名单过滤思路:
             * 1,从数据库中获取黑名单转换成RDD,即用新的RDD实例封装黑名单数据。
             * 2,然后把代表黑名单的RDD的实例和Batch Duration产生的rdd进行Join操作,
             *      准确的说是进行leftOuterJoin操作,也就是说Batch Duration产生的rdd和代表黑名单的rdd实例
             *      进行leftOuterJoin操作,如果两者都有内容的话,就会使true,否则的话就是false;
             *
             * 我们要留下的是leftOuterJoin操作结果为false;
             */
            //TODO 数据来自于查询的黑名单的表,并且映射为<String,Boolean>
            List blackListFromDB = null;
            /**
             * 黑名单的表中只有userID,但是如果要进行join操作的话,就必须是Key-Value,所以
             *  在这里我们需要基于数据表中的数据产生Key-Value类型的数据集合。
             */
            JavaPairRDD blackListRDD = jsc.sparkContext().parallelizePairs(blackListFromDB);

            /**
             * 进行操作的时候肯定是基于userID进行join的,所以必须把传入的rdd进行mapToPair操作
             * 转换成为符合格式的rdd
             */
            JavaPairRDD<String, String> rdd2Pair = rdd.mapToPair(tuple2 -> {
                String userID = tuple2._2().split("\t")[2];
                return new Tuple2(userID, tuple2._2());
            });
            JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joinedRDD = rdd2Pair.leftOuterJoin(blackListRDD);
            JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filterRDD = joinedRDD.filter(tuple2 -> {
                if (true == tuple2._2()._2().orElse(false)) {
                    return false;
                } else {
                    return true;
                }
            });
            JavaRDD<String> resultRDD = filterRDD.map(tuple2 -> {
                return tuple2._2()._1();
            });
            return resultRDD;
        });

        //数据格式:timestemp、ip、userID、adID、province、city
        JavaPairDStream<String,Long> pairDStream = filteredadClickedDStream.mapToPair(line -> {
            String[] splited = line.split("\t");
            String timestamp = splited[0];  //yyyy-MM-dd
            String ip = splited[1];
            String userID = splited[2];
            String adID = splited[3];
            String province = splited[4];
            String city = splited[5];
            String clickedRecord = timestamp + "_" + ip + "_" + userID + "_" + adID + "_" + province + "_" + city;
            return new Tuple2(clickedRecord, 1L);
        });
        /**
         * 计算每个Batch Duration中每个User的广告点击量
         */
        JavaPairDStream<String,Long> adClickedUsers = pairDStream.reduceByKey((v1,v2)->v1+v2);
        /**
         * 计算出什么叫有效的点击?
         * 1,复杂化的一般都是采用机器学习训练好模型直接在线进行过滤;
         * 2,简单的可以通过一个Batch Duration中的点击次数来判断是不是非法广告点击,但是
         *  实际上讲非法广告点击程序会尽可能的模拟真实的广告点击行为,所以通过一个Batch来判断
         *  是不完整的,我们需要对例如一天(也可以是每一小时)的数据进行判断!
         * 3,比在线机器学习退而求其次的做法如下:
         *  例如:一段时间内,同一个IP(MAC地址)有多个账号访问
         *  例如:可以统计一天内一个用户点击广告的次数,如果一天点击同样的广告超过50次的话,
         *      就列入黑名单。
         *
         * 黑名单有一个重要的特征:动态生成!所以每一个Batch Duration都要考虑是否有新的黑名单加入,此时黑名单需要存储起来。
         *  具体存储在什么地方呢,存储在DB/Redis中即可!
         *
         * 例如邮件系统中的“黑名单”,可以采用Spark Streaming不断的监控每个用户的操作,
         *  如果用户发送邮件的频率超过了设定的值,可以暂时把用户列入“黑名单”,从而阻止用户过度频繁的发送邮件。
         */
        JavaPairDStream<String,Long> filteredClickedInBatch = adClickedUsers.filter(tuple->{
            if(tuple._2() > 1){ //每10秒钟,点击超过1次,判为非法点击
                //更新一下黑名单的数据表
                return false;
            }else{
                return true;
            }
        });
//        filteredClickedInBatch.print();
        filteredClickedInBatch.foreachRDD(rdd -> {
            rdd.foreachPartition(record -> {
                /**
                 * 在这里我们使用数据库连接池的高效读写数据库的方式把数据写入数据库MySQL;
                 * 由于传入的参数是一个Iterator类型的集合,所以为了更加高效的操作,我们需要批量处理;
                 * 例如说一次性插入1000条Record,使用insertBatch或者updateBatch类型的操作
                 * 插入的用户信息可以只包含:userID、adID、clickedCount
                 * 这里面有一个问题:可能出现两条记录的Key是一样的,此时就需要更新累加操作
                 */
            });
        });
        JavaPairDStream<String,Long> blackListOnHistory = filteredClickedInBatch.filter(tuple -> {
            //广告点击的基本数据格式:timestemp、ip、userID、adID、province、city
            String [] splited = tuple._1().split("_");
            String date = splited[0];
            String userID = splited[2];
            String adID = splited[3];
            /**
             * 接下来根据date、userID、adID等条件去查询用户点击广告的数据表,获得总的点击次数。
             * 这个时候基于点击次数判断是否属于黑名单点击
             */
            int clickedCountTotalToday = 81;
            if(clickedCountTotalToday>50){
                return true;
            }else{
                return false;
            }
        });

        /**
         * 对黑名单的整个RDD进行去重操作!!!
         */
        JavaDStream<String> blackListUniqueUserIDBaseOnHistory = blackListOnHistory.map(tuple2 -> {
            return tuple2._1().split("_")[2];
        });
        JavaDStream<String> blackList = blackListUniqueUserIDBaseOnHistory.transform((JavaRDD<String> rdd) -> {
            return rdd.distinct();
        });

        //下一步写入黑名单数据表中
        blackListUniqueUserIDBaseOnHistory.foreachRDD(rdd->{
            rdd.foreachPartition(record->{
                /**
                * 插入的用户信息可以只包含:userID
                 * 此时直接插入黑名单数据表即可。
                */

            });
        });

        jsc.start();
        jsc .awaitTermination();
    }
}