Skip to content

Latest commit

 

History

History
169 lines (147 loc) · 5.23 KB

File metadata and controls

169 lines (147 loc) · 5.23 KB

第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作

标签: sparkIMF


##代码实战

###Java版本的RDD转换成DataFrame Person.java

package com.dtspark.sparkapps.sql;

import java.io.Serializable;

/**
 * 第59课:使用的JavaBean,由于Spark是分布式的,所以JavaBean也必须实现序列化接口
 * Created by Limaoran on 2016/7/2.
 */
public class Person implements Serializable{
    private int id;
    private String name;
    private int age;
    @Override
    public String toString() {
        return "Person{ id=" + id + ", name='" + name + '\'' + ", age=" + age +  '}';
    }
    public Person(){}
    public Person(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
}

RDD2DataFrameByReflection.java

package com.dtspark.sparkapps.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import java.util.List;

/**
 * 第59课:使用反射的方式将RDD转换成为DataFrame
 * Created by Limaoran on 2016/7/2.
 */
public class RDD2DataFrameByReflection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameByReflection");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("G:\\txt\\test\\persons.txt");
//        lines.map(line-> { //java8 lambda
//            String [] words = line.split(",");
//            return new Person(Integer.parseInt(words[0]),words[1],Integer.parseInt(words[2]));
//        });
        JavaRDD<Person> persons = lines.map(new Function<String, Person>() {
            @Override
            public Person call(String line) throws Exception {
                String[] splited = line.split(",");
                Person p = new Person();
                p.setId(Integer.valueOf(splited[0].trim()));
                p.setName(splited[1]);
                p.setAge(Integer.valueOf(splited[2].trim()));
                return p;
            }
        });
        //createDataFrame有两个参数,第一个是JavaRDD,第二个是Bean Class
        //在底层通过反射的方式获得Person的所有fields,结合RDD本身,就生成了DataFrame
        DataFrame df = sqlContext.createDataFrame(persons, Person.class);

        df.registerTempTable("persons");

        DataFrame bigDataS = sqlContext.sql("select * from persons where age >= 6");
        JavaRDD<Row> bigDataRDD = bigDataS.javaRDD();
        JavaRDD<Person> resultRDD = bigDataRDD.map(row ->{
            System.out.println(row);
            //此处row获得的信息是:1,7,Spark,之所以顺序会乱,是因为DataFrame会对数据结构各方面进行优化的考量,优化之后再进行处理,此时不能确保原来的顺序,
            //  所以想要正确的获得信息使用 row.fieldIndex,例如:row.getInt(row.fieldIndex("id"))
            //  或者使用简单的方式:row.getAs("id"),其内部也是调用的 getAs[T](fieldIndex(fieldName)) >
            return new Person(row.getAs("id"), row.getString(row.fieldIndex("name")), row.getInt(row.fieldIndex("age")));
        });
        List<Person> list = resultRDD.collect();
        for(Person p : list){
            System.out.println(p);
        }
        sc.close();
    }
}

###Scala版代码

RDD2DataFrameByReflection.scala

package com.dt.spark.sparkapps.sql

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

/**
 * 第59课:使用反射的方式将RDD转换成为DataFrame
 * Created by Limaoran on 2016/7/2.
 */
case class Person(val id:Int,val name:String,val age:Int);

object RDD2DataFrameByReflection {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("RDD2DataFrameByReflectionScala").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val lines = sc.textFile("G:\\txt\\test\\persons.txt")
    val rdd = lines.map(line=> {
      val splited = line.split(",")
      Person(splited(0).toInt,splited(1),splited(2).toInt)
    } )
    val df = sqlContext.createDataFrame(rdd)
    df.registerTempTable("persons")
    val personDF = sqlContext.sql("select * from persons where age>=6")
    val personRDD = personDF.rdd
    val resultRDD = personRDD.map(row=>Person(row.getAs[Int]("id"),row.getAs[String]("name"),row.getAs[Int]("age")))
    resultRDD.collect().foreach(println(_))
    sc.stop()
  }
}

源文件数据信息:persons.txt

1,Spark,7
2,Hadoop,11
3,Flink,5

执行结果数据:

Person{ id=1, name='Spark', age=7}
Person{ id=2, name='Hadoop', age=11}