您的位置:首页 > 大数据

【大数据学习之路】SparkSQL,mapreduce(大数据离线计算)方向学习(三)

2019-06-02 14:03 507 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_41945793/article/details/90703430

【大数据学习之路】SparkSQL,mapreduce(大数据离线计算)方向学习(三)

spark SQL编写一些小demo

任务如下:
1、用 SQL 语句的方式统计男性中身高超过 180cm 的人数。
2、用 SQL 语句的方式统计女性中身高超过 170cm 的人数。
3、对人群按照性别分组并统计男女人数。
4、用类 RDD 转换的方式对 DataFrame 操作来统计并打印身高大于 210cm 的前 50名男性。
5、对所有人按身高进行排序并打印前 50 名的信息。

任务数据

文件总共包含三列,第一列是 ID,第二列是性别信息 (F -> 女,M -> 男),第三列是人口的身高信息,单位是 cm,具体格式如下。

生成数据代码:

package demo3;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

/**
* args[0] : 文件路径
* args[1] : 生成数据量条数
*/
public class sss {

public static void main(String[] args) {
//        if (args.length < 2) {
//            System.out.println("file path or num is null");
//            System.exit(1);
//        }
String filePath = "D:/file/samplePeopleInfomin.txt";
int peopleNum = Integer.valueOf(300);
File file = new File(filePath);
FileWriter fw = null;
BufferedWriter writer = null;
Random rand = new Random();
int age = 0;
int se = 0;
String sex = null;
try {
fw = new FileWriter(file);
writer = new BufferedWriter(fw);
for(int i = 1;i<= peopleNum ;i++){
age =  rand.nextInt(100)+120;
se = rand.nextInt(2);
if(se!=0){
sex = "F";
}else {
sex = "M";
}
writer.write(i+","+sex+","+age);
writer.newLine();//换行
writer.flush();
}

} catch (IOException e) {
e.printStackTrace();
}finally {
try {
writer.close();
fw.close();
} catch (IOException e) {
e.printStackTrace();
}

}
}
}

在这里我生成了300条数据来进行任务

spark SQL任务代码:

package SparkSQLs

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
* args(0) : HDFS数据文件地址
*
*统计任务如下:
用 SQL 语句的方式统计男性中身高超过 180cm 的人数。
用 SQL 语句的方式统计女性中身高超过 170cm 的人数。
对人群按照性别分组并统计男女人数。
用类 RDD 转换的方式对 DataFrame 操作来统计并打印身高大于 210cm 的前 50 名男性。
对所有人按身高进行排序并打印前 50 名的信息。
*/
object PeopleDataStatistics {
private val schemaString = "id,gender,height"

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("PeopleDataStatistics")
val sc = new SparkContext(conf)
val peopleDateRdd = sc.textFile(args(0).trim);
val sqlCtx = new SQLContext(sc)

val schemaArr = schemaString.split(",")
val schema = StructType(schemaArr.map(fieldName => StructField(fieldName,StringType,true)))
val rowRdd : RDD[Row] = peopleDateRdd
.map(_.split(","))
.map(eachRow => Row(eachRow(0),eachRow(1),eachRow(2)))
val peopleDF = sqlCtx.createDataFrame(rowRdd,schema)

//注意DF缓存在内存中,会根据数据量节省很多时间
peopleDF.persist(StorageLevel.MEMORY_ONLY_SER)
peopleDF.registerTempTable("people")

//找身高超过180的男性
val higherMale180 = sqlCtx.sql("" +
"select *" +
" from people" +
" where height > 180 and gender = 'F'")
println("Men whose height are more than 180: " + higherMale180.count())
println("<Display #1>")

//找身高超过170的女性
val higherMale170 = sqlCtx.sql("" +
"select *" +
" from people" +
" where height > 170 and gender = 'M'")
println("female whose height are more than 170: " + higherMale170.count())
println("<Display #2>")

//按性别分组并计算人数
peopleDF.groupBy(peopleDF("gender")).count().show()
println("People Count Grouped By Gender")
println("<Display #3>")

//数数并打印前50名身高超过210厘米的男子
peopleDF.filter(peopleDF("gender").equalTo("M")).filter(peopleDF("height") > 210).show(50)
println("Men whose height is more than 210")
println("<Display #4>")

//按身高对所有人排序并打印前50名
peopleDF.sort(peopleDF("height").desc).take(50).foreach { println }
println("Sorted the people by height in descend order,Show top 50 people")
println("<Display #5>")

println("All the statistics actions are finished on structured People data.")
}

}

运行结果:



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐