大数据IMF传奇行动绝密课程第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作
2016-09-11 23:48
453 查看
使用Java和Scala在IDE中实战RDD和DataFrame转换操作
1、RDD与DataFrame转换的重大意义2、使用Java实战RDD与DataFrame转换
3、使用Scala实战RDD与DataFrame转换
RDD接上数据库、接上文件系统,无限想象空间~,极大加速和简化了大数据开发
通过反射来预测转换
case class/JavaBean适合于知道RDD的元数据,
不知道RDD的元数据动态获取元数据
JavaBean不支持嵌套,也不可以有复杂数据结构(List等)
Person.class传进去后,会以反射的方式创建DataFrame
遇到错误
java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) at java.lang.reflect.Method.invoke(Method.java:490) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/09/07 12:42:35 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) at java.lang.reflect.Method.invoke(Method.java:490) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
需要将Person类放到一个单独的文件并标记为public
又遇到一个错误
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String at org.apache.spark.sql.Row$class.getString(Row.scala:250) at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:192) at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:57) at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
原因是列的顺序与原来不一致,列被排序了
Java实现
public static void ByReflection(){ SparkConf conf = new SparkConf().setAppName("RDD2DataFrameByReflection").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> lines = sc.textFile("F:\\sparkData\\personBean.txt"); JavaRDD<Person> persons = lines.map(new Function<String, Person>() { public Person call(String line) throws Exception { // TODO Auto-generated method stub String[] arr = line.split(","); Person p = new Person(); p.setId(Integer.valueOf(arr[0].trim())); p.setName(arr[1].trim()); p.setAge(Integer.valueOf(arr[2].trim())); return p; } }); //在底层,通过反射的方式获得Person的所有fields,结合RDD本身,就生成了DataFrame DataFrame df = sqlContext.createDataFrame(persons, Person.class); //注册成临时表,在临时表上就可以写SQL df.registerTempTable("persons"); DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6"); bigDatas.show(); JavaRDD<Row> row = bigDatas.javaRDD(); JavaRDD<Person> result = row.map(new Function<Row, Person>() { public Person call(Row row) throws Exception { // TODO Auto-generated method stub Person p = new Person(); p.setId(row.getInt(1)); p.setName(row.getString(2)); p.setAge(row.getInt(0)); return p; } }); List<Person> personlist = result.collect(); for(Person p : personlist) { System.out.println(p); } }
Scala实现
def ByReflection(): Unit = { val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines = sc.textFile("F:\\sparkData\\personBean.txt") val persons = lines.map(line => { val arr = line.split(",") Person(arr(0).toInt, arr(1), arr(2).toInt) }) val df = sqlContext.createDataFrame(persons) df.registerTempTable("persons") df.printSchema() val bigDatas = sqlContext.sql("select * from persons where age >= 6") bigDatas.show() val result = bigDatas.rdd.map(row => { Person(row.getInt(0), row.getString(1), row.getInt(2)) }) result.collect().foreach(println) }
相关文章推荐
- 大数据IMF传奇行动绝密课程第60课:使用Java和Scala在IDE中实战RDD和DataFrame动态转换操作
- 大数据IMF传奇行动绝密课程第83课:透彻讲解使用Scala和Java两种方式实战Spark Streaming开发
- 大数据IMF传奇行动绝密课程第58课:使用Java和Scala在IDE中开发DataFrame实战
- 大数据IMF传奇行动绝密课程第16课:RDD实战(RDD基本操作实战及Transformation流程图)
- 大数据IMF传奇行动绝密课程第62课:Spark SQL下的Parquet使用最佳实践和代码实战
- 大数据IMF传奇行动绝密课程第98-99课:使用Spark Streaming实战对论坛网站动态行为的多维度分析
- 大数据IMF传奇行动绝密课程第89课:SparkStreaming On Kafka之kafka解析和安装实战
- 大数据IMF传奇行动绝密课程第57课:Spark SQL on Hive配置及实战
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第53课:Spark性能优化第九季 Spark Tungsten内存使用彻底解密
- 大数据IMF传奇行动绝密课程第14课:Spark RDD解密
- 大数据IMF传奇行动绝密课程第61课:Spark SQL数据加载和保存内幕深度解密实战
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第55课:60分钟从零起步驾驭Hive实战
- 大数据IMF传奇行动绝密课程第18课:RDD持久化、广播、累加器
- 大数据IMF传奇行动绝密课程第75-79课:Spark SQL基于网站Log的综合案例实战
- 第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作’学习笔记
- 大数据IMF传奇行动绝密课程第22课:RDD的依赖关系彻底解密
- 大数据IMF传奇行动绝密课程第69课:Spark SQL通过Hive数据源实战
- 大数据IMF传奇行动绝密课程第17课:RDD案例(join、cogroup、reduceByKey、groupByKey等)