您的位置:首页 > 其它

Spark之DataFrame通过编码创建

2016-06-06 22:57 302 查看
在某些情况下,我们不能通过反射的方式创建DataFrame,(反射在记录在一个字符串编码的结构和文本数据集将被解析和字段预计给不同的用户不能使用。)但是我们可以用编码的方式创建。

一,文件准备:

可以参考上一篇博客的文件。

二,过程解析:

1,创建对应数据的编码.
2,对应的编码添加到格式化字段list中。
3,创建好模式类型。
4,将数据文件解析,然后以RDD行的格式返回。
5,通过指定的模式,和分割的数据创建DataFrame,创建表。
6,查询出表中所有的数据返回名字,类型DataFrame.
7.将其转化为RDD最后转成List输出名字。


三,代码展示:

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class ProgrammaticallySparkTest2 {

public static void main(String[] args) {
//因为反射在记录在一个字符串编码的结构和文本数据集将被解析和字段预计给不同的用户不能使用,所以
//提供编码的形式创建DataFrame
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "269522560000");
JavaSparkContext sc=new JavaSparkContext("local","SparkrefleCttest2",conf);
System.out.println("与spark的连接建立成功"+conf);
SQLContext sqlContext=new org.apache.spark.sql.SQLContext(sc);
System.out.println("sqlContext建立成功" + sqlContext);
JavaRDD<String> people = sc.textFile("hdfs://192.168.61.128:9000/spark001/sparkreflect.txt");

//创建对应的编码
String schemaString = "name age";

//对应的编码添加到格式化字段list中
List<StructField> fields = new ArrayList<StructField>();
for(String fieldname:schemaString.split(" ")){
fields.add(DataTypes.createStructField(fieldname, DataTypes.StringType, true));
}
//创建好模式类型
StructType schema=DataTypes.createStructType(fields);
//将数据文件解析,然后以RDD行的格式返回。
JavaRDD<Row> rowRDD=people.map(new Function<String,Row>(){
public Row call(String record) throws Exception {
String[] fields=record.split(",");
return RowFactory.create(fields[0],fields[1].trim());
}});
//通过指定的模式,和分割的数据创建DataFrame
DataFrame peopleDataFrame=sqlContext.createDataFrame(rowRDD, schema);
//创建表
peopleDataFrame.registerTempTable("people");
//查询出表中所有的数据返回名字
DataFrame results=sqlContext.sql("select name from people");

//将其转化为RDD最后转成List输出名字
List<String> names=results.javaRDD().map(new Function<Row,String>(){
public String call(Row row) throws Exception {
String name=row.getString(0);
return "name:"+name;
}

}).collect();

for(String name:names){
System.out.println("符合条件的人:"+name);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息