Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
2017-09-15 16:28
531 查看
转载:http://blog.csdn.net/u010592112/article/details/73730796 感谢原博主分享!!!
一:准备数据源
[java] view
plain copy
在项目下新建一个student.txt文件,里面的内容为:
[plain] view
plain copy
1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18
二:实现
Java版:
1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:
[java] view
plain copy
import java.io.Serializable;
public class Student implements Serializable{
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Student [id=" + id + ", name=" + name + ", age=" + age + "]";
}
}
2.转换,具体代码如下
[java] view
plain copy
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class RDDToDataset {
public static void main(String[] args) {
SparkSession spark=SparkSession.builder()
.appName("RDDToDataset")
.master("local[*]")
.getOrCreate();
reflectCreate(spark);
dynamicCreate(spark);
}
/**
* 通过Java反射转换
* @param spark
*/
public static void reflectCreate(SparkSession spark)
{
JavaRDD<Student> stuRDD= spark.read().textFile("student.txt").javaRDD().map(new Function<String, Student>() {
public Student call(String line) throws Exception {
String[] lineSplit=line.split(",");
Student stu=new Student();
stu.setId(Integer.valueOf(lineSplit[0]));
stu.setName(lineSplit[1]);
stu.setAge(Integer.valueOf(lineSplit[2]));
return stu;
}
});
Dataset<Row> stuDf=spark.createDataFrame(stuRDD,Student.class);
//stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par"); //对文件指定列名
stuDf.printSchema();
stuDf.createOrReplaceTempView("student");
Dataset<Row> nameDf=spark.sql("select name from student where age<20");
nameDf.show();
//nameDf.write().mode("Append").text("name");//将查询结果写入一个文件
}
/**
* 动态转换
* @param spark
*/
public static void dynamicCreate(SparkSession spark)
{
JavaRDD<Row> rows=spark.read().textFile("student.txt").javaRDD().map(new Function<String, Row>() {
public Row call(String line) throws Exception {
String[] parts=line.split(",");
return RowFactory.create(
Integer.valueOf(parts[0]),
parts[1],
Integer.valueOf(parts[2]));
}
});
//动态创建元数据
List<StructField> fields=new ArrayList<StructField>();
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType schema=DataTypes.createStructType(fields);
Dataset<Row> stuDf=spark.createDataFrame(rows, schema);
//stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");
stuDf.printSchema();
stuDf.createOrReplaceTempView("student");
Dataset<Row> nameDf=spark.sql("select name from student where age<20");
nameDf.show();
//nameDf.write().mode("Append").text("age");//将查询结果写入一个文件
}
}
scala版本:
[plain] view
plain copy
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
case class Student(id:Int,name:String,age:Int)
def main(args:Array[String])
{
val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
import spark.implicits._
reflectCreate(spark)
dynamicCreate(spark)
}
/**
* 通过Java反射转换
* @param spark
*/
private def reflectCreate(spark:SparkSession):Unit={
import spark.implicits._
val stuRDD=spark.sparkContext.textFile("student2.txt")
//toDF()为隐式转换
val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
//stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
stuDf.printSchema()
stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //将查询结果写入一个文件
nameDf.show()
}
/**
* 动态转换
* @param spark
*/
private def dynamicCreate(spark:SparkSession):Unit={
val stuRDD=spark.sparkContext.textFile("student.txt")
import spark.implicits._
val schemaString="id,name,age"
val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema=StructType(fields)
val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
val stuDf=spark.createDataFrame(rowRDD, schema)
stuDf.printSchema()
val tmpView=stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //将查询结果写入一个文件
nameDf.show()
}
}
注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
一:准备数据源
[java] view
plain copy
在项目下新建一个student.txt文件,里面的内容为:
[plain] view
plain copy
1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18
二:实现
Java版:
1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:
[java] view
plain copy
import java.io.Serializable;
public class Student implements Serializable{
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Student [id=" + id + ", name=" + name + ", age=" + age + "]";
}
}
2.转换,具体代码如下
[java] view
plain copy
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class RDDToDataset {
public static void main(String[] args) {
SparkSession spark=SparkSession.builder()
.appName("RDDToDataset")
.master("local[*]")
.getOrCreate();
reflectCreate(spark);
dynamicCreate(spark);
}
/**
* 通过Java反射转换
* @param spark
*/
public static void reflectCreate(SparkSession spark)
{
JavaRDD<Student> stuRDD= spark.read().textFile("student.txt").javaRDD().map(new Function<String, Student>() {
public Student call(String line) throws Exception {
String[] lineSplit=line.split(",");
Student stu=new Student();
stu.setId(Integer.valueOf(lineSplit[0]));
stu.setName(lineSplit[1]);
stu.setAge(Integer.valueOf(lineSplit[2]));
return stu;
}
});
Dataset<Row> stuDf=spark.createDataFrame(stuRDD,Student.class);
//stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par"); //对文件指定列名
stuDf.printSchema();
stuDf.createOrReplaceTempView("student");
Dataset<Row> nameDf=spark.sql("select name from student where age<20");
nameDf.show();
//nameDf.write().mode("Append").text("name");//将查询结果写入一个文件
}
/**
* 动态转换
* @param spark
*/
public static void dynamicCreate(SparkSession spark)
{
JavaRDD<Row> rows=spark.read().textFile("student.txt").javaRDD().map(new Function<String, Row>() {
public Row call(String line) throws Exception {
String[] parts=line.split(",");
return RowFactory.create(
Integer.valueOf(parts[0]),
parts[1],
Integer.valueOf(parts[2]));
}
});
//动态创建元数据
List<StructField> fields=new ArrayList<StructField>();
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType schema=DataTypes.createStructType(fields);
Dataset<Row> stuDf=spark.createDataFrame(rows, schema);
//stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");
stuDf.printSchema();
stuDf.createOrReplaceTempView("student");
Dataset<Row> nameDf=spark.sql("select name from student where age<20");
nameDf.show();
//nameDf.write().mode("Append").text("age");//将查询结果写入一个文件
}
}
scala版本:
[plain] view
plain copy
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
case class Student(id:Int,name:String,age:Int)
def main(args:Array[String])
{
val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
import spark.implicits._
reflectCreate(spark)
dynamicCreate(spark)
}
/**
* 通过Java反射转换
* @param spark
*/
private def reflectCreate(spark:SparkSession):Unit={
import spark.implicits._
val stuRDD=spark.sparkContext.textFile("student2.txt")
//toDF()为隐式转换
val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
//stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
stuDf.printSchema()
stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //将查询结果写入一个文件
nameDf.show()
}
/**
* 动态转换
* @param spark
*/
private def dynamicCreate(spark:SparkSession):Unit={
val stuRDD=spark.sparkContext.textFile("student.txt")
import spark.implicits._
val schemaString="id,name,age"
val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema=StructType(fields)
val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
val stuDf=spark.createDataFrame(rowRDD, schema)
stuDf.printSchema()
val tmpView=stuDf.createOrReplaceTempView("student")
val nameDf=spark.sql("select name from student where age<20")
//nameDf.write.text("result") //将查询结果写入一个文件
nameDf.show()
}
}
注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
相关文章推荐
- Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
- Java和scala实现 Spark RDD转换成DataFrame的两种方法小结
- Spark将RDD转换成DataFrame的两种方式
- Spark将RDD转换成DataFrame的两种方式
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 大数据IMF传奇行动绝密课程第83课:透彻讲解使用Scala和Java两种方式实战Spark Streaming开发
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- spark基础之RDD和DataFrame的转换方式
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 第83课:使用Scala和Java两种方式实战Spark Streaming开发 本地webui小技巧
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- sparkrdd转dataframe的两种方式
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame
- Java接入Spark之创建RDD的两种方式和操作RDD
- 3.Spark SQL:使用反射方式、编程方式,将RDD转换为DataFrame