您的位置:首页 > 编程语言 > Java开发

spark学习-SparkSQL--14-JavaRDD注册成表然后用SparkSQL查询

2017-08-20 14:20 316 查看
1.先看正确的例子

package com.lcc.spark.rdd.test;

import java.io.Serializable;

public class Person implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String name;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}

}


package com.lcc.spark.rdd.test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

/**
* Hello world!
*
*/
public class App {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// convert from other RDD
JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"));

line1.foreach(new VoidFunction<String>(){
@Override
public void call(String num) throws Exception {
// TODO Auto-generated method stub
System.out.println("numbers;"+num);
}
});

JavaRDD<Person> stuRDD= line1.map(new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] lineSplit=line.split(" ");
Person stu=new Person();
stu.setId(lineSplit[0]);
stu.setName(lineSplit[1]);
return stu;
}
});

Dataset<Row> stuDf=sqlContext.createDataFrame(stuRDD,Person.class);
//stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");  //对文件指定列名
stuDf.printSchema();
stuDf.createOrReplaceTempView("Person");
Dataset<Row> nameDf=sqlContext.sql("select * from Person ");
nameDf.show();


输出结果如下

numbers;1 aa
numbers;2 bb
numbers;4 cc
numbers;3 dd

root
|-- id: string (nullable = true)
|-- name: string (nullable = true)

+---+----+
| id|name|
+---+----+
|  1|  aa|
|  2|  bb|
|  4|  cc|
|  3|  dd|
+---+----+


这里有几个疑问

Dataset<Row> stuDf=sqlContext.createDataFrame(stuRDD,Person.class);
改成
DataFrame personsDF = sqlContext.createDataFrame(javaprdd, Person.class );
是行不通的,这个包会无法引入,因为DataFrame好像在spark2.x开始没有这个类型了,想使用这个类型需要使用SparkSession,这是一个疑问,未解决


2.我觉得里面有段代码

JavaRDD<Person> stuRDD= line1.map(new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] lineSplit=line.split(" ");
Person stu=new Person();
stu.setId(lineSplit[0]);
stu.setName(lineSplit[1]);
return stu;
}
});


每次都new一个对象,数据量非常大的时候,肯定会出问题,因此我就是想用DataFrame这个,看代码

package com.lcc.spark.rdd.test;

import java.sql.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache<
12020
/span>.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* Hello world!
*
*/
public class App3 {
public static void main(String[] args) {

SparkSession spark=SparkSession.builder()
.appName("RDDToDataset")
.master("local[*]")
.getOrCreate();

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
List<String> numberList = Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd");
//JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"));
JavaRDD<String> line1 = sc.parallelize(numberList);

line1.foreach(new VoidFunction<String>(){
@Override
public void call(String num) throws Exception {
System.out.println("numbers;"+num);
}
});

/**
* 第一步:在RDD的基础上创建类型为Row的RDD
*/
//首先,必须将RDD变成以Row为类型的RDD。Row可以简单理解为Table的一行数据
JavaRDD<Row> personsRDD = line1.map(new Function<String,Row>(){

@Override
public Row call(String line) throws Exception {
String[] splited = line.split(" ");
return RowFactory.create(splited[0],splited[1]);
}

});

List<StructField> fields=new ArrayList<StructField>();
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));

//dataframe更像是一张关系型数据表,是一种spark独有的数据格式吧,这种格式的数据可以使用sqlcontext里面的函数

StructType schema=DataTypes.createStructType(fields);
Dataset stuDf=spark.createDataFrame(personsRDD, schema);
//stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");
stuDf.printSchema();
stuDf.createOrReplaceTempView("Person");
Dataset<Row> nameDf=spark.sql("select * from Person ");
nameDf.show();

}
}


运行结果如下

+---+----+
| id|name|
+---+----+
|  1|  aa|
|  2|  bb|
|  4|  cc|
|  3|  dd|
+---+----+


上面的程序是可以运行的。但是看看下面这个程序

package com.lcc.spark.rdd.test;

import java.sql.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* Hello world!
*
*/
public class App4 {
public static void main(String[] args) {

SparkSession spark=SparkSession.builder()
.appName("RDDToDataset")
.master("local[*]")
.getOrCreate();

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
List<String> numberList = Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd");
//JavaRDD<String> line1 = sc.parallelize(Arrays.asList("1 aa", "2 bb", "4 cc", "3 dd"));
JavaRDD<String> line1 = sc.parallelize(numberList);

List<StructField> fields=new ArrayList<StructField>();
fields.add(DataTypes.createStructField("id", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));

//dataframe更像是一张关系型数据表,是一种spark独有的数据格式吧,这种格式的数据可以使用sqlcontext里面的函数

StructType schema=DataTypes.createStructType(fields);
Dataset stuDf=spark.createDataFrame(line1, schema);
//stuDf.select("id","name","age").write().mode(SaveMode.Append).parquet("par");
stuDf.printSchema();
stuDf.createOrReplaceTempView("Person");
Dataset<Row> nameDf=spark.sql("select * from Person ");
nameDf.show();

}
}


这个是不能运行的,可以对比看出来一个是
JavaRDD<String>
一个是
JavaRDD<Row>
,而官网上是

var peopleDF = spark.createDataFrame(rowRDD,schema);
源代码是
def createDataFrame(rowRDD:RDD[row],schema:StructType):DataFrame={
createDataFrame(rowRDD,schema,needsConversion=true)
}


一个是字符串,一个是row,我就是因为不知道这个细节认为打印出来都是一行,认为是相同的,所以一直错,所以再次提醒我细节很重要
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: