您的位置:首页 > 数据库

10.Spark SQL:销售额统计案例实战

2017-11-02 17:16 281 查看
SparkSQL内置函数以及每日UV销售额统计案例实战

java版本:

java版本:

package cn.spark.study.sql;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

/**
* SparkSQL内置函数以及每日UV销售额统计案例实战
*/

public class DailySale {

public static void main(String[] args) {
// 创建SparkConf,本地运行
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("DailySale");

// 创建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

// 说明一下,业务的特点
// 实际上呢,我们可以做一个,单独统计网站登录用户的销售额的统计
// 有些时候,会出现日志的上报的错误和异常,比如日志里丢了用户的信息,那么这种,我们就一律不统计了

// 模拟数据
List<String> userSaleLog = Arrays.asList("2015-10-01, 55.05,1122",
"2015-10-01, 23.15,1133",
"2015-10-01, 15.20",
"2015-10-02, 56.05,1144",
"2015-10-02, 78.87,1155",
"2015-10-02, 113.02,1123");

// 并行化集合创建RDD,要通过并行化集合的方式创建RDD,那么就调用SparkContext以及其子类,的parallelize()方法
JavaRDD<String> userSaleLogRDD = sc.parallelize(userSaleLog, 5);

// 数据清洗,过滤掉无用信息
JavaRDD<String> userSaleLogFilterRDD = userSaleLogRDD.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(String line) throws Exception {
boolean realFlag = line.split(",").length >= 3;
return realFlag;
}
});

// 将清洗后的数据转换为RDD
JavaPairRDD<String, Double> dailySalesRDD = userSaleLogFilterRDD.mapToPair(new PairFunction<String, String, Double>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Double> call(String line) throws Exception {
return new Tuple2<String, Double>(line.split(",")[0], Double.valueOf(line.split(",")[1]));
}
});

// 将RDD数据转换为JavaRDD<Row>形式,为后续转换为DataFrame作准备
JavaRDD<Row> dateSalesRowRDD = dailySalesRDD.map(new Function<Tuple2<String,Double>, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(Tuple2<String, Double> t) throws Exception {

return RowFactory.create(t._1, t._2);
}
});

// 创建元数据类型,为后续转换为DataFrame作准备
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("date", DataTypes.StringType, true),
DataTypes.createStructField("sale_mount", DataTypes.DoubleType, true));
StructType structType = DataTypes.createStructType(structFields);

// 转换为DataFrame
DataFrame dateSalesRowDF = sqlContext.createDataFrame(dateSalesRowRDD, structType);

// DataFrame执行相关转换计算,开始进行每日销售额的统计;并转换为JavaRDD
JavaRDD<Tuple2<String, Double>> salesRDD = dateSalesRowDF.groupBy("date").sum("sale_mount").javaRDD().map(new Function<Row, Tuple2<String, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Double> call(Row row) throws Exception {
return new Tuple2<String, Double>(row.getString(0), row.getDouble(1));
}
});

// 遍历JavaRDD
salesRDD.foreach(new VoidFunction<Tuple2<String,Double>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Double> tuple) throws Exception {
System.out.println(tuple._1 + " total sales: " + tuple._2.toString());
}
});

// 关闭JavaSparkContext
sc.close();
}
}

scala版本:

package cn.spark.study.sql

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.StructField;

// 手动导入一个函数
import org.apache.spark.sql.functions._

/**
* SparkSQL内置函数以及每日UV销售额统计案例实战
*/

object DailySale {
def main(args:Array[String]){
// 创建SparkConf,本地运行
val conf = new SparkConf()
.setMaster("local")
.setAppName("DailySale")

// 创建JavaSparkContext
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);

import sqlContext.implicits._

// 说明一下,业务的特点
// 实际上呢,我们可以做一个,单独统计网站登录用户的销售额的统计
// 有些时候,会出现日志的上报的错误和异常,比如日志里丢了用户的信息,那么这种,我们就一律不统计了

// 模拟数据
val userSaleLog = Array("2015-10-01, 55.05,1122",
"2015-10-01, 23.15,1133",
"2015-10-01, 15.20",
"2015-10-02, 56.05,1144",
"2015-10-02, 78.87,1155",
"2015-10-02, 113.02,1123")

// 并行化集合创建RDD,要通过并行化集合的方式创建RDD,那么就调用SparkContext以及其子类,的parallelize()方法
val userSaleLogRDD = sc.parallelize(userSaleLog, 5)

// 进行有效销售日志的过滤
val filteredUserSaleLogRDD = userSaleLogRDD.filter{log =>
if (log.split(",").length == 3) true else false}

// 转换为JavaRDD<Row>
val userSaleLogRowRDD = filteredUserSaleLogRDD
.map{log => Row(log.split(",")(0), log.split(",")(1).toDouble)}

// 动态构造元数据
val structType = StructType(Array(
StructField("date", StringType, true),
StructField("sale_amount", DoubleType, true)))

// 将使用动态构造的元数据,将RDD转换为DataFrame
val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)

// 开始进行每日销售额的统计
userSaleLogDF.groupBy("date")
.agg('date, sum('sale_amount))
.map{row => Row(row(1), row(2))}
.collect()
.foreach(println)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark_sql