您的位置:首页 > 编程语言 > Java开发

Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)

2017-09-15 16:28 531 查看
转载:http://blog.csdn.net/u010592112/article/details/73730796 感谢原博主分享!!!

一:准备数据源

[java] view
plain copy

  

      在项目下新建一个student.txt文件,里面的内容为:

       

[plain] view
plain copy

1,zhangsan,20  

2,lisi,21  

3,wanger,19  

4,fangliu,18  

      二:实现

     Java版:

    1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

    

[java] view
plain copy

import java.io.Serializable;  

  

public class Student implements Serializable{  

  

    private int id;  

    private String name;  

    private int age;  

    public int getId() {  

        return id;  

    }  

    public void setId(int id) {  

        this.id = id;  

    }  

    public String getName() {  

        return name;  

    }  

    public void setName(String name) {  

        this.name = name;  

    }  

    public int getAge() {  

        return age;  

    }  

    public void setAge(int age) {  

        this.age = age;  

    }  

    @Override  

    public String toString() {  

        return "Student [id=" + id + ", name=" + name + ", age=" + age + "]";  

    }  

      

      

      

}  

         2.转换,具体代码如下

[java] view
plain copy

import java.util.ArrayList;  

import java.util.List;  

  

import org.apache.spark.api.java.JavaRDD;  

import org.apache.spark.api.java.function.Function;  

import org.apache.spark.sql.Dataset;  

import org.apache.spark.sql.Row;  

import org.apache.spark.sql.RowFactory;  

import org.apache.spark.sql.SparkSession;  

import org.apache.spark.sql.types.DataTypes;  

import org.apache.spark.sql.types.StructField;  

import org.apache.spark.sql.types.StructType;  

  

public class RDDToDataset {  

  

    public static void main(String[] args) {  

        SparkSession spark=SparkSession.builder()  

                   .appName("RDDToDataset")  

                   .master("local[*]")  

                   .getOrCreate();  

        reflectCreate(spark);  

        dynamicCreate(spark);  

          

    }  

  

    /** 

     * 通过Java反射转换 

     * @param spark 

     */  

    public static void reflectCreate(SparkSession spark)  

    {  

        JavaRDD<Student> stuRDD= spark.read().textFile("student.txt").javaRDD().map(new Function<String, Student>() {  

  

            public Student call(String line) throws Exception {  

              

                String[] lineSplit=line.split(",");  

                Student stu=new Student();  

                stu.setId(Integer.valueOf(lineSplit[0]));  

                stu.setName(lineSplit[1]);  

                stu.setAge(Integer.valueOf(lineSplit[2]));  

                return stu;  

            }  

        });  

          

        Dataset<Row> stuDf=spark.createDataFrame(stuRDD,Student.class);  

        //stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");  //对文件指定列名  

        stuDf.printSchema();  

        stuDf.createOrReplaceTempView("student");  

        Dataset<Row> nameDf=spark.sql("select name from student where age<20");  

        nameDf.show();  

        //nameDf.write().mode("Append").text("name");//将查询结果写入一个文件  

    }  

    /** 

     * 动态转换 

     * @param spark 

     */  

    public static void dynamicCreate(SparkSession spark)  

    {  

        JavaRDD<Row> rows=spark.read().textFile("student.txt").javaRDD().map(new Function<String, Row>() {  

  

            public Row call(String line) throws Exception {  

                  

                String[] parts=line.split(",");  

                return RowFactory.create(  

                        Integer.valueOf(parts[0]),  

                        parts[1],  

                        Integer.valueOf(parts[2]));  

            }  

        });  

          

        //动态创建元数据  

        List<StructField> fields=new ArrayList<StructField>();  

        fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));  

        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));  

        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  

        StructType schema=DataTypes.createStructType(fields);  

        Dataset<Row> stuDf=spark.createDataFrame(rows, schema);  

        //stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");  

        stuDf.printSchema();  

        stuDf.createOrReplaceTempView("student");  

        Dataset<Row> nameDf=spark.sql("select name from student where age<20");  

        nameDf.show();  

        //nameDf.write().mode("Append").text("age");//将查询结果写入一个文件  

    }  

}  

     scala版本:

    

[plain] view
plain copy

import org.apache.spark.sql.SparkSession  

import org.apache.spark.sql.types.StringType  

import org.apache.spark.sql.types.StructField  

import org.apache.spark.sql.types.StructType  

import org.apache.spark.sql.Row  

import org.apache.spark.sql.types.IntegerType  

  

object RDD2Dataset {  

    

  case class Student(id:Int,name:String,age:Int)  

  def main(args:Array[String])  

  {  

      

    val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()  

    import spark.implicits._  

    reflectCreate(spark)  

    dynamicCreate(spark)  

  }  

    

 /**  

     * 通过Java反射转换  

     * @param spark  

     */  

  private def reflectCreate(spark:SparkSession):Unit={  

    import spark.implicits._  

    val stuRDD=spark.sparkContext.textFile("student2.txt")  

    //toDF()为隐式转换  

    val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()  

    //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名  

    stuDf.printSchema()  

    stuDf.createOrReplaceTempView("student")  

    val nameDf=spark.sql("select name from student where age<20")  

    //nameDf.write.text("result") //将查询结果写入一个文件  

    nameDf.show()  

  }  

    

  /**  

     * 动态转换  

     * @param spark  

     */  

  private def dynamicCreate(spark:SparkSession):Unit={  

    val stuRDD=spark.sparkContext.textFile("student.txt")  

    import spark.implicits._  

    val schemaString="id,name,age"  

    val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))  

    val schema=StructType(fields)  

    val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))  

    val stuDf=spark.createDataFrame(rowRDD, schema)  

        stuDf.printSchema()  

    val tmpView=stuDf.createOrReplaceTempView("student")  

    val nameDf=spark.sql("select name from student where age<20")  

    //nameDf.write.text("result") //将查询结果写入一个文件  

    nameDf.show()  

  }  

}  

     注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

             2.此代码不适用于spark2.0以前的版本。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: