Skip to content

Latest commit

 

History

History
359 lines (309 loc) · 11.8 KB

0017.md

File metadata and controls

359 lines (309 loc) · 11.8 KB

第17课:RDD案例(join、cogroup、reduceByKey、groupByKey等)

标签: sparkIMF


##最常用的算子,也是构建复杂算法的基石 map、filter、flatMap reduceByKey、groupByKey join、cogroup

###map

map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理。

/**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
  
nums.map(item => item * 2)

##filter

根据filter中的函数作为参数的函数的Boolean值来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD

 /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

nums.filter( item => item%2 ==0 )

##flatMap

首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合

 /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

bigDataString.flatMap( line => line.split(" "))

##groupByKey 按照相同的Key对Value进行分组,分组后的Value是一个集合

/**
   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
   * resulting RDD with the existing partitioner/parallelism level. The ordering of elements
   * within each group is not guaranteed, and may even differ each time the resulting RDD is
   * evaluated.
   *
   * Note: This operation may be very expensive. If you are grouping in order to perform an
   * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
   * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
   */
  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }

dataRDD.groupByKey()

##reduceByKey 按照相同的Key对Value进行函数操作,操作完成后的结果返回一个新的RDD集合

/**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
  
pairs.reduceByKey((v1,v2)=>v1+v2)  

##join join和cogroup是所有Spark学习者必须掌握的内容,没有任何商量的余地。 大数据中最重要的算子操作是:join!

 /**
   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
   */
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }

names.join(scores)

##cogroup cogroup是大数据中第二重要的算子!

/**
   * For each key k in `this` or `other1` or `other2` or `other3`,
   * return a resulting RDD that contains a tuple with the list of values
   * for that key in `this`, `other1`, `other2` and `other3`.
   */
  def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
      other2: RDD[(K, W2)],
      other3: RDD[(K, W3)],
      partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
    cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
       (vs.asInstanceOf[Iterable[V]],
         w1s.asInstanceOf[Iterable[W1]],
         w2s.asInstanceOf[Iterable[W2]],
         w3s.asInstanceOf[Iterable[W3]])
    }
  }

names.cogroup(scores)

为了更好的理解cogroup,采用java代码:

package com.dt.spark.sparkapps.cores;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

/**
 * cogroup算子演示
 * @author lmr
 *
 */
public class CogroupOps {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.setAppName("CogroupOps");
		conf.setMaster("local");
		
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer,String>> listNames = Arrays.asList(
				new Tuple2<Integer, String>(1, "Spark"),
				new Tuple2<Integer, String>(2, "Tachyon"),
				new Tuple2<Integer, String>(3, "Hadoop")
				);
		List<Tuple2<Integer,Integer>> listScores = Arrays.asList(
				new Tuple2<Integer, Integer>(1, 100),
				new Tuple2<Integer, Integer>(2, 90),
				new Tuple2<Integer, Integer>(3, 70),
				new Tuple2<Integer, Integer>(1, 110),
				new Tuple2<Integer, Integer>(2, 95),
				new Tuple2<Integer, Integer>(3, 60)
				);
		JavaPairRDD<Integer, String> names = sc.parallelizePairs(listNames);
		JavaPairRDD<Integer,Integer> scores = sc.parallelizePairs(listScores);
		
		JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>  result = names.cogroup(scores);
		
		result.collect().forEach(pair -> {
			System.out.print("ID:"+ pair._1);
			System.out.print(",Name:"+ pair._2._1);
			System.out.println(",Score:"+ pair._2._2);
		});
		
		sc.stop();
	}
}

结果:

ID:1Name:[Spark],Score:[100, 110]
ID:3Name:[Hadoop],Score:[70, 60]
ID:2Name:[Tachyon],Score:[90, 95]

##整体源代码

package com.dt.spark.sparkapps

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 最常用、最重要的Spark Transformation案例实战
 * @author 李茂然
 * Created by Limaoran on 2016/5/10.
 */
object Transformations {

  def main(args: Array[String]) {
    val sc = sparkContext("Transformations Operations") //创建SparkContext

    mapTransformation(sc)     //map案例
    filterTransformation(sc)  //filter案例
    flatMapTransformation(sc) //flatMap案例

    groupByKeyTransformation(sc)  //groupByKey案例
    reduceByKeyTransformation(sc) //reduceByKey案例
    joinTransformation(sc)      //join案例
    cogroupTransformation(sc)   //cogroup案例

    sc.stop()   //停止SparkContext,销毁相关的Driver对象,释放资源
  }

  def sparkContext(name:String) = {
    //创建SparkConf,初始化程序的配置
    val conf = new SparkConf().setAppName(name).setMaster("local")
    //创建SparkContext,这是第一个RDD创建的唯一入口,也是Driver的灵魂,是通往集群的唯一通道。
    new SparkContext(conf)
  }
  def mapTransformation(sc : SparkContext ): Unit ={
    val nums = sc.parallelize(1 to 10) //根据集合创建RDD

    val mapped = nums.map(item => item * 2) //map适用于任何类型的元素且对其作用的集合中的每一个元素循环遍历并调用其作为参数的函数对每一个遍历的元素进行具体化处理。

    mapped.collect().foreach( pair => println( pair)) //收集计算结果并通过foreach循环打印
  }

  def filterTransformation(sc:SparkContext): Unit ={
    val nums = sc.parallelize(1 to 10)

    val filtered = nums.filter( item => item%2 ==0 ) //根据filter中的函数作为参数的函数的Boolean值来判断符合条件的元素,并基于这些元素构成新的MapPartitionsRDD

    filtered.collect().foreach(pair => println(pair))
  }

  def flatMapTransformation(sc:SparkContext):Unit = {
    val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon") //实例化字符串类型的Array
    val bigDataString = sc.parallelize(bigData)           //创建以字符串为类型的ParallelCollectionRDD
    val words = bigDataString.flatMap( line => line.split(" ")) //首先是通过传入的作为参数的函数来作用于RDD的每个字符串进行单词切分(是以集合的方式存在的),然后把切分后的结果合并成一个大的集合,产生结果为 { Scala Spark Java Hadoop Java Tachyon }
    words.collect().foreach(println)
  }

  def groupByKeyTransformation(sc: SparkContext) = {
    val data = Array( Tuple2(100,"Spark"),Tuple2(100,"Tachyon"),Tuple2(80,"Kafka"),Tuple2(70,"Hadoop"),Tuple2(80,"HBase") ) //准备数据
    val dataRDD = sc.parallelize(data)  //创建RDD
    val result = dataRDD.groupByKey()   //按照相同的Key对Value进行分组,分组后的Value是一个集合
    result.collect().foreach(pair=>println("分数:"+pair._1+",人数:"+pair._2.size))
  }

  def reduceByKeyTransformation(sc:SparkContext) = {
    val lines = sc.textFile("G:/runtime/spark-1.6.0/README.md")  //读取本地文件,并切分成不同的Partition
    val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词的拆分,并把所有行的拆分结果通过flat合并成一个大的单词集合
    val pairs = words.map { word => (word,1) }
    val wordCount = pairs.reduceByKey((v1,v2)=>v1+v2) //对相同的Key进行Value的累加(包括Local和Reducer级别同时Reduce)
    val wordOrdered = wordCount.map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1))
    wordOrdered.collect().foreach(wordNumberPair => println(wordNumberPair._1+" : "+wordNumberPair._2) ) //打印reduceByKey之后的计算结果
  }

  def joinTransformation(sc:SparkContext): Unit ={
    val studentNames = Array(
      Tuple2(1,"Spark"),
      Tuple2(2,"Tachyon"),
      Tuple2(3,"Hadoop")
    )
    val studentScores = Array(
      Tuple2(1,100),
      Tuple2(2,95),
      Tuple2(3,65)
    )
    val names = sc.parallelize(studentNames)
    val scores = sc.parallelize(studentScores)
    val studentNameAndScores = names.join(scores)
    studentNameAndScores.collect.foreach(println)
  }

  def cogroupTransformation(sc:SparkContext): Unit ={
    val studentNames = Array(
      Tuple2(1,"Spark"),
      Tuple2(2,"Tachyon"),
      Tuple2(3,"Hadoop")
    )
    val studentScores = Array(
      Tuple2(1,100),
      Tuple2(2,95),
      Tuple2(3,65),
      Tuple2(1,110),
      Tuple2(2,90),
      Tuple2(3,60)
    )
    val names = sc.parallelize(studentNames)
    val scores = sc.parallelize(studentScores)
    val studentNameAndScores = names.cogroup(scores)
    studentNameAndScores.collect.foreach(println)
  }
}

结果:

map:
    2
    4
    6
    8
    10
    12
    14
    16
    18
    20
filter:
    2
    4
    6
    8
    10
flatMap:
    Scala
    Spark
    Java
    Hadoop
    Java
    Tachyon
groupByKey:
    分数:100,人数:2
    分数:80,人数:2
    分数:70,人数:1
reduceByKey:
    Spark : 13
    for : 11
    and : 10
join:
    (1,(Spark,100))
    (3,(Hadoop,65))
    (2,(Tachyon,95))
cogroup:
    (1,(CompactBuffer(Spark),CompactBuffer(100, 110)))
    (3,(CompactBuffer(Hadoop),CompactBuffer(65, 60)))
    (2,(CompactBuffer(Tachyon),CompactBuffer(95, 90)))

main方法里面调用的每一个功能都必须是模块化的,每个模块可以使用函数封装。