spark-DataFrame之RDD和DataFrame之间的转换
2016-07-18 15:45
369 查看
package cn.spark.study.core.mycode_dataFrame;
import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class RDD2DataFrameReflection implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("RDD2DataFrameReflection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("D:/students.txt");
JavaRDD<student> students = lines.map(new Function<String, student>() {
@Override
public student call(String line) throws Exception {
String[] lineSplit = line.split(",");
student stu = new student();
stu.setId(Integer.valueOf(lineSplit[0].trim()));
stu.setName(String.valueOf(lineSplit[1].trim()));
stu.setAge(Integer.valueOf(lineSplit[2].trim()));
return stu;
}
});
// 使用反射方式,将RDD转换为DataFrame
DataFrame studentDF = sqlContext.createDataFrame(students, student.class);
// 拿到了一个DataFrame之后,就可以将其注册为一个临时表,然后针对其中的数据执行SQL语句
studentDF.registerTempTable("students");
DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");
// 将查询出来的DataFrame,再次转换为RDD(中间查询 结果转换RDD)
JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD();
JavaRDD<student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, student>() {
@Override
public student call(Row row) throws Exception {
// row中的数据的顺序,可能是跟我们期望的是不一样的!
student stu = new student();
stu.setAge(row.getInt(0));
stu.setId(row.getInt(1));
stu.setName(row.getString(2));
return stu;
}
});
List<student> studentList = teenagerStudentRDD.collect();
for(student stu : studentList)
{
System.out.println(stu);
}
}
}
import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class RDD2DataFrameReflection implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("RDD2DataFrameReflection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile("D:/students.txt");
JavaRDD<student> students = lines.map(new Function<String, student>() {
@Override
public student call(String line) throws Exception {
String[] lineSplit = line.split(",");
student stu = new student();
stu.setId(Integer.valueOf(lineSplit[0].trim()));
stu.setName(String.valueOf(lineSplit[1].trim()));
stu.setAge(Integer.valueOf(lineSplit[2].trim()));
return stu;
}
});
// 使用反射方式,将RDD转换为DataFrame
DataFrame studentDF = sqlContext.createDataFrame(students, student.class);
// 拿到了一个DataFrame之后,就可以将其注册为一个临时表,然后针对其中的数据执行SQL语句
studentDF.registerTempTable("students");
DataFrame teenagerDF = sqlContext.sql("select * from students where age <= 18");
// 将查询出来的DataFrame,再次转换为RDD(中间查询 结果转换RDD)
JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD();
JavaRDD<student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, student>() {
@Override
public student call(Row row) throws Exception {
// row中的数据的顺序,可能是跟我们期望的是不一样的!
student stu = new student();
stu.setAge(row.getInt(0));
stu.setId(row.getInt(1));
stu.setName(row.getString(2));
return stu;
}
});
List<student> studentList = teenagerStudentRDD.collect();
for(student stu : studentList)
{
System.out.println(stu);
}
}
}
相关文章推荐
- xxx is not in the sudoers file.This incident will be reported.的解决方法
- Oracle EBS新建一个账簿(1)
- 25. Reverse Nodes in k-Group
- BestCoder 2nd Anniversary
- Dynamics CRM 2016交互服务中心体验
- Android中的多点触摸交互处理,可以达到缩放图片的效果
- combobox输入中文而对应的hidden域的value值没有及时修改的问题
- ROS学习之 cpp服务
- 23种设计模式之观察者模式(Observer)
- 安卓 并行计算 renderscript
- 每天一个设计模式(4):简单工厂“模式”
- Ubuntu 12 修改环境变量
- sqlserver 数据库索引建立原则
- 23 是时候学点面向对象的东东了
- JavaScript中定时器的暂停和继续
- ArcGIS教程:图形处理概述
- android毕业设计-注册界面
- Linux循环设备/dev/loop解析
- 利用MemoryAnalyzer进行OutOfMemoryError的诊断分析
- 取得类的全部的方法