spark学习-SparkSQL--14-JavaRDD注册成表然后用SparkSQL查询
2017-08-20 14:20
316 查看
1.先看正确的例子
输出结果如下
这里有几个疑问
2.我觉得里面有段代码
每次都new一个对象,数据量非常大的时候,肯定会出问题,因此我就是想用DataFrame这个,看代码
运行结果如下
上面的程序是可以运行的。但是看看下面这个程序
这个是不能运行的,可以对比看出来一个是
一个是字符串,一个是row,我就是因为不知道这个细节认为打印出来都是一行,认为是相同的,所以一直错,所以再次提醒我细节很重要
package com.lcc.spark.rdd.test; import java.io.Serializable; public class Person implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String id; private String name; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
package com.lcc.spark.rdd.test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; /** * Hello world! * */ public class App { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // convert from other RDD JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd")); line1.foreach(new VoidFunction<String>(){ @Override public void call(String num) throws Exception { // TODO Auto-generated method stub System.out.println("numbers;"+num); } }); JavaRDD<Person> stuRDD= line1.map(new Function<String, Person>() { public Person call(String line) throws Exception { String[] lineSplit=line.split(" "); Person stu=new Person(); stu.setId(lineSplit[0]); stu.setName(lineSplit[1]); return stu; } }); Dataset<Row> stuDf=sqlContext.createDataFrame(stuRDD,Person.class); //stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par"); //对文件指定列名 stuDf.printSchema(); stuDf.createOrReplaceTempView("Person"); Dataset<Row> nameDf=sqlContext.sql("select * from Person "); nameDf.show();
输出结果如下
numbers;1 aa numbers;2 bb numbers;4 cc numbers;3 dd root |-- id: string (nullable = true) |-- name: string (nullable = true) +---+----+ | id|name| +---+----+ | 1| aa| | 2| bb| | 4| cc| | 3| dd| +---+----+
这里有几个疑问
Dataset<Row> stuDf=sqlContext.createDataFrame(stuRDD,Person.class); 改成 DataFrame personsDF = sqlContext.createDataFrame(javaprdd, Person.class ); 是行不通的,这个包会无法引入,因为DataFrame好像在spark2.x开始没有这个类型了,想使用这个类型需要使用SparkSession,这是一个疑问,未解决
2.我觉得里面有段代码
JavaRDD<Person> stuRDD= line1.map(new Function<String, Person>() { public Person call(String line) throws Exception { String[] lineSplit=line.split(" "); Person stu=new Person(); stu.setId(lineSplit[0]); stu.setName(lineSplit[1]); return stu; } });
每次都new一个对象,数据量非常大的时候,肯定会出问题,因此我就是想用DataFrame这个,看代码
package com.lcc.spark.rdd.test; import java.sql.Array; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache< 12020 /span>.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * Hello world! * */ public class App3 { public static void main(String[] args) { SparkSession spark=SparkSession.builder() .appName("RDDToDataset") .master("local[*]") .getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); List<String> numberList = Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"); //JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd")); JavaRDD<String> line1 = sc.parallelize(numberList); line1.foreach(new VoidFunction<String>(){ @Override public void call(String num) throws Exception { System.out.println("numbers;"+num); } }); /** * 第一步:在RDD的基础上创建类型为Row的RDD */ //首先,必须将RDD变成以Row为类型的RDD。Row可以简单理解为Table的一行数据 JavaRDD<Row> personsRDD = line1.map(new Function<String,Row>(){ @Override public Row call(String line) throws Exception { String[] splited = line.split(" "); return RowFactory.create(splited[0],splited[1]); } }); List<StructField> fields=new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); //dataframe更像是一张关系型数据表,是一种spark独有的数据格式吧,这种格式的数据可以使用sqlcontext里面的函数 StructType schema=DataTypes.createStructType(fields); Dataset stuDf=spark.createDataFrame(personsRDD, schema); //stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par"); stuDf.printSchema(); stuDf.createOrReplaceTempView("Person"); Dataset<Row> nameDf=spark.sql("select * from Person "); nameDf.show(); } }
运行结果如下
+---+----+ | id|name| +---+----+ | 1| aa| | 2| bb| | 4| cc| | 3| dd| +---+----+
上面的程序是可以运行的。但是看看下面这个程序
package com.lcc.spark.rdd.test; import java.sql.Array; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * Hello world! * */ public class App4 { public static void main(String[] args) { SparkSession spark=SparkSession.builder() .appName("RDDToDataset") .master("local[*]") .getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); List<String> numberList = Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"); //JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd")); JavaRDD<String> line1 = sc.parallelize(numberList); List<StructField> fields=new ArrayList<StructField>(); fields.add(DataTypes.createStructField("id", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); //dataframe更像是一张关系型数据表,是一种spark独有的数据格式吧,这种格式的数据可以使用sqlcontext里面的函数 StructType schema=DataTypes.createStructType(fields); Dataset stuDf=spark.createDataFrame(line1, schema); //stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par"); stuDf.printSchema(); stuDf.createOrReplaceTempView("Person"); Dataset<Row> nameDf=spark.sql("select * from Person "); nameDf.show(); } }
这个是不能运行的,可以对比看出来一个是
JavaRDD<String>一个是
JavaRDD<Row>,而官网上是
var peopleDF = spark.createDataFrame(rowRDD,schema); 源代码是 def createDataFrame(rowRDD:RDD[row],schema:StructType):DataFrame={ createDataFrame(rowRDD,schema,needsConversion=true) }
一个是字符串,一个是row,我就是因为不知道这个细节认为打印出来都是一行,认为是相同的,所以一直错,所以再次提醒我细节很重要
相关文章推荐
- spark学习-SparkSQL--13-java版JavaRDD与JavaPairRDD的互相转换
- spark学习-17-Java版SparkSQL程序读取Hbase表注册成表SQL查询
- 第67课:Spark SQL下采用Java和Scala实现Join的案例综合实战(巩固前面学习的Spark SQL知识)
- spark学习-SparkSQL--11-scala版写的SparkSQL程序读取Hbase表注册成表SQL查询
- Spark SQL概念学习系列之DataFrame与RDD的区别
- 学习笔记_Java_day14—编码实战___一个注册页面的完整流程
- geotools学习1从postgressql取shp并且展示(java postgis)
- 找个学习sql java的师傅
- Oracle常用命令14(.net / java代码调用(sql代码、程序包过程))
- JAVA面试题jsp ejb sql 数据库 等面试题 Oracle Sql server MySQL 经典数据库学习资料
- java 设计模式 学习笔记 (14) 备忘录模式
- java学习日记(第11—14 篇)———关于java高新知识的学习
- Java学习笔记33:mybatis动态SQL语句
- Java与Flex学习笔记(14)----Flex中实现倒计时的效果
- Java学习札记14:一个比较String、StringBuffer和StringBuilder之间效率差别的简单例子
- Java 学习笔记14:Spring 数据库数据源DBCP配置说明
- 黑马程序员--SQL注册漏洞 学习日志
- Effective Java 学习笔记(14)
- 黑马程序员_Java学习日记14_IO流3
- T-Sql学习(14) - 事务处理