您的位置:首页 > 数据库

spark sql 基本用法

2016-04-04 16:49 357 查看
一、通过结构化数据创建DataFrame:

publicstaticvoid main(String[] args) {
SparkConf conf = new SparkConf() .setAppName("DataFrameCreate").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json"); //结构化数据直接加载为DataFrame

df.show();
}

二、通过RDD创建DataFrame的两种创建方式


(数据源students.txt的数据截图)

2.1通过已知类型的schema创建DataFrame,代码如下:

public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("RDD2DataFrameReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

JavaRDD<String> lines = sc.textFile("D://students.txt");

//将lines转换成 JavaRDD<Student>
JavaRDD<Student> students = lines.map(new Function<String, Student>() {

private static final long serialVersionUID = 1L;

@Override
public Student call(String line) throws Exception {
// TODO Auto-generated method stub
String[] strPlits = line.split(",");
Student stu = new Student();

stu.setId(Integer.valueOf(strPlits[0]));
stu.setName(strPlits[1]);
stu.setAge(Integer.valueOf(strPlits[2]));

return stu;
}

});

// 使用反射方式,将RDD转换为DataFrame
// 这里要求,JavaBean必须实现Serializable接口,是可序列化的

//根据student的schema 和 RDD创建DataFrame
DataFrame studentsDF = sqlContext.createDataFrame(students, Student.class);
studentsDF.show();
}

2.2手动创建schema的方式创建DataFrame

public static void main(String[] args) {

//... 省略创建sqlContext的过程

JavaRDD<String> lines = sc.textFile("D://students.txt");

//将普通RDD装换成JavaRDD<Row>
JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {

private static final long serialVersionUID = 1L;

@Override
public Row call(String line) throws Exception {
String[] strArray = line.split(",");
Row row= RowFactory.create(
Integer.valueOf(strArray[0]), //id
strArray[1], //name
Integer.valueOf(strArray[2])); //age

return row;
}
});

//第二步 创建元类型, 即创建schema
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(structFields);

//根据元数据类型将JavaRDD<Row>转化成DataFrame
DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);

studentDF.show();
}

-》DataFrame、RDD、List互转

JavaRDD<Row> rows = studentDF.javaRDD();List<Row> studentList = rows.collect();

三、DataFrame基本用法

// 打印DataFrame中所有的数据(select * from ...) df.show();
// 打印DataFrame的元数据(Schema)
df.printSchema();
// 查询某列所有的数据
df.select("name").show();
// 查询某几列所有的数据,并对列进行计算
df.select(df.col("name"), df.col("age").plus(1)).show();
// 根据某一列的值进行过滤
df.filter(df.col("age").gt(18)).show();
// 根据某一列进行分组,然后进行聚合
df.groupBy(df.col("age")).count().show();

DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);
studentDF.show();

studentDF.registerTempTable("students"); //将DataFrame注册为零时表,取名students

//对students零时表做sql查询
DataFrame oldStudentDF = sqlCotnext.sql("select * from students where age>18");

oldStudentDF.show();
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: