Spark之DataFrame通过编码创建
2016-06-06 22:57
302 查看
在某些情况下,我们不能通过反射的方式创建DataFrame,(反射在记录在一个字符串编码的结构和文本数据集将被解析和字段预计给不同的用户不能使用。)但是我们可以用编码的方式创建。
一,文件准备:
可以参考上一篇博客的文件。
二,过程解析:
三,代码展示:
一,文件准备:
可以参考上一篇博客的文件。
二,过程解析:
1,创建对应数据的编码. 2,对应的编码添加到格式化字段list中。 3,创建好模式类型。 4,将数据文件解析,然后以RDD行的格式返回。 5,通过指定的模式,和分割的数据创建DataFrame,创建表。 6,查询出表中所有的数据返回名字,类型DataFrame. 7.将其转化为RDD最后转成List输出名字。
三,代码展示:
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class ProgrammaticallySparkTest2 { public static void main(String[] args) { //因为反射在记录在一个字符串编码的结构和文本数据集将被解析和字段预计给不同的用户不能使用,所以 //提供编码的形式创建DataFrame SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "269522560000"); JavaSparkContext sc=new JavaSparkContext("local","SparkrefleCttest2",conf); System.out.println("与spark的连接建立成功"+conf); SQLContext sqlContext=new org.apache.spark.sql.SQLContext(sc); System.out.println("sqlContext建立成功" + sqlContext); JavaRDD<String> people = sc.textFile("hdfs://192.168.61.128:9000/spark001/sparkreflect.txt"); //创建对应的编码 String schemaString = "name age"; //对应的编码添加到格式化字段list中 List<StructField> fields = new ArrayList<StructField>(); for(String fieldname:schemaString.split(" ")){ fields.add(DataTypes.createStructField(fieldname, DataTypes.StringType, true)); } //创建好模式类型 StructType schema=DataTypes.createStructType(fields); //将数据文件解析,然后以RDD行的格式返回。 JavaRDD<Row> rowRDD=people.map(new Function<String,Row>(){ public Row call(String record) throws Exception { String[] fields=record.split(","); return RowFactory.create(fields[0],fields[1].trim()); }}); //通过指定的模式,和分割的数据创建DataFrame DataFrame peopleDataFrame=sqlContext.createDataFrame(rowRDD, schema); //创建表 peopleDataFrame.registerTempTable("people"); //查询出表中所有的数据返回名字 DataFrame results=sqlContext.sql("select name from people"); //将其转化为RDD最后转成List输出名字 List<String> names=results.javaRDD().map(new Function<Row,String>(){ public String call(Row row) throws Exception { String name=row.getString(0); return "name:"+name; } }).collect(); for(String name:names){ System.out.println("符合条件的人:"+name); } } }
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- 我是运营,我没有假期
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 新注册
- DB2数据库的安装
- C#实现把指定数据写入串口
- “传奇”图象数据存储方式
- Flex中对表格某列的值进行数字格式化并求百分比添加%
- 四大漏洞入侵博客
- 修复mysql数据库
- asp格式化日期时间格式的代码
- 浅析SQL数据操作语句
- SQLServer 数据导入导出的几种方法小结
- 简述MySQL分片中快速数据迁移
- MySQL数据备份之mysqldump的使用详解
- C#实现窗体间传递数据实例
- C#中的委托数据类型简介
- SQL Server删除表及删除表中数据的方法