标签: sparkIMF
##基础Top N算法实战
take和top的区别 在于top会进行排序,自动降序
代码实战:
package com.dt.spark.sparkapps
import org.apache.spark.{SparkContext, SparkConf}
/**
* 基础TopN案例实战
* @author DT大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains
* Created by Limaoran on 2016/5/11.
*/
object TopNBasic {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("TopNBasic").setMaster("local"))
sc.setLogLevel("WARN")
val lines = sc.textFile("G:\\txt\\testData\\basicTopN.txt")
val pairs = lines.map(line=>(line.toInt,line)) //生成Key-Value键值对以方便sortByKey进行排序
val sortedPairs = pairs.sortByKey(false) //降序操作
//只要是改变每一行列的数据,一般都是用map操作
val sortedData = sortedPairs.map(pair=>pair._2)
val top5 = sortedData.take(5) //获取排名前5位数据的内容,元素内容构建成一个Array
top5.foreach(println)
//take和top的区别 在于top会进行排序,自动降序
lines.map(_.toInt).top(5).foreach(println)
sc.stop()
}
}
##分组Top N算法实战
package com.dt.spark.sparkapps.cores;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* 使用Java的方式开发分组TopN程序
* @author DT大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains
*/
public class TopNGroup {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("TopNGroup").setMaster("local"));
JavaRDD<String> lines = sc.textFile("G:/txt/testData/topNGroup.txt");
//把每行数据变成符合要求的Key-Value的方式
JavaPairRDD<String, Integer> pairs = lines.mapToPair(line->{
String [] strs = line.split(" ");
return new Tuple2(strs[0],Integer.parseInt(strs[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupPairs = pairs.groupByKey(); //对数据进行分组
JavaPairRDD<String, Iterable<Integer>> top5 = groupPairs.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
@Override
public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData)
throws Exception {
Integer [] top5 = new Integer[5]; //保存top5的数据
String groupKey = groupedData._1; //获取分组的组名
Iterator<Integer> groupedValue = groupedData._2.iterator(); //获取每组的内容集合
while(groupedValue.hasNext()){ //查看是否有下一个元素,如果有则循环
Integer value = groupedValue.next(); //获取当前循环的元素本身的内容
//具体实现分组内部的TopN
for(int i=0;i<top5.length;i++){
if(top5[i]==null){
top5[i] = value;
break;
}else if(value>top5[i]){
for(int j=4;j>i;j--){
top5[j] = top5[j-1];
}
top5[i] = value;
break;
}
}
}
return new Tuple2<String, Iterable<Integer>>(groupKey,Arrays.asList(top5));
}
});
//打印分组后的内容
top5.collect().forEach(line->System.out.println("Group Key:"+line._1+",Group Value:"+line._2));
sc.stop();
}
}
文件内容:
Spark 100
Hadoop 65
Spark 99
Hadoop 61
Spark 95
Hadoop 60
Spark 98
Hadoop 69
Spark 91
Hadoop 64
Spark 89
Hadoop 98
Spark 88
Hadoop 99
Spark 68
Hadoop 60
Spark 79
Hadoop 97
Spark 69
Hadoop 96
结果:
Group Key:Spark,Group Value:[100, 99, 98, 95, 91]
Group Key:Hadoop,Group Value:[99, 98, 97, 96, 69]
###Scala代码实现
package com.dt.spark.sparkapps
import org.apache.spark.{SparkContext, SparkConf}
/**
* 分组Top N
* Created by Limaoran on 2016/5/12.
*/
object GroupTopN {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("GroupTopN").setMaster("local"))
sc.setLogLevel("WARN")
val lines = sc.textFile("G:\\txt\\testData\\topNGroup.txt")
val groupRDD = lines.map(line=>(line.split(" ")(0),line.split(" ")(1).toInt)).groupByKey()
val top5 = groupRDD.map(pair=>(pair._1,pair._2.toList.sortWith(_>_).take(5))).sortByKey()
top5.collect().foreach(pair=>{
println(pair._1+":"+pair._2)
})
sc.stop()
}
}
##排序算法RangePartitioner内幕解密
RangePartitioner主要是把依赖的RDD的数据分成不同的范围,关键的地方是不同的范围是有序的。
Google的面试题:如何在一个不确定数据规模的范围内进行排序?
水塘抽样:目的是从一个集合中选取具体个数的样本,特别适合内存容纳不下数据。
RangePartitioner内部使用了水塘抽样算法。
HashPartitioner的弊端是数据倾斜!!! 极端情况下某(几)个分区拥有RDD的所有数据!!!
RangePartitioner除了是结果有序的基石以外,最为重要的是尽量保证每个Partition中的数据量是均匀的!!!
源码解析:
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
乘3的目的保证数据量特别小的分区能够抽取到足够的数据,同时保证数据量特别大的分区能够二次采样。
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {...}
Array[(Int, Long, Array[K])]:
- Int:分区的编号
- Long:分区中总元素具体有多少个
- Array[K]:重复RDD每个分区中采样到的数据
SamplingUtils.scala
/**
* Reservoir sampling implementation that also returns the input size.
*
* @param input input size
* @param k reservoir size
* @param seed random seed
* @return (samples, input size)
*/
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i < k) {
// If input size < k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
l += 1
}
(reservoir, l)
}
}
(reservoir, l)中的l记录的是该分区中元素的总和
至此,标志Spark零基础入门实战阶段结束。