Spark日志分析项目Demo(3)--Spark入口和DataFrame
2017-09-01 20:21
477 查看
我们先从入口main函数说起
具体解释一下上面的代码
(1)通过SparkConf创建JavaSparkContext,SparkConf默认去读取Spark.*的配置文件,也可以通过调用set的方法配置属性,例如上述的setMaster和setAppName。通过set方法配置的属性会覆盖读取的配置文件属性,SparkConf里面的所有set方法都支持链式调用chaining,例如上述的setAppName(“Constants.SPARK_APP_NAME_SESSION”).setMaster(“local”).
setAppName:设置应用名字,此名字会在Spark web UI显示
setMaster:设置主节点URL,本例使用“local”是指本机单线程,另外还有以下选项:
local[K]:本机K线程
local[*]:本机多线程,线程数与服务器核数相同
spark://HOST:PORT:Spark集群地址和端口,默认端口为7077
mesos://HOST:PORT:Mesos集群地址和端口,默认端口为5050
yarn:YARN集群
(2)SparkSQL有两个分支,sqlContext和hiveContext,sqlContext现在只支持SQL语法解析器(SQL-92语法);hiveContext现在支持SQL语法解析器和hivesql语法解析器,默认为hiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行hiveSQL不支持的语法,
Hive on Spark和Spark SQL是不同的东西
Hive on Spark: 是除了DataBricks之外的其他几个公司搞的,想让Hive跑在Spark上;
Spark SQL: Shark的后继产品, 解除了不少Hive的依赖,且让SQL更加抽象通用化, 支持json,parquet等格式;
SqlContext: 应该是对应spark-sql这个project; 与hive解耦,不支持hql查询;
HiveContext:应该是对应spark-hive这个项目; 与hive有部分耦合, 支持hql,是SqlContext的子类,也就是说兼容SqlContext;
(3)我们需要一些测试数据作为数据驱动,希望可以快速让spark跑起来。
一个简单的方法是代码创建DataFrame,另外也可以从csv,hive,sql数据源,或一个存在的RDD创建。
下面介绍一下DataFrame的来历:
Spark RDD是分布式弹性数据集,一个比较核心的是粗粒度的分布式计算,粗粒度是指用户不用关心太多的分布式细节,用声明式的API就能完成分布式计算,比如Word Count用一行就能写完。RDD易用性很好,那Spark为啥还要有Dataframe呢?
DataFrame的从API上借鉴了R和pandas的DataFRame的概念,是业界标准结化数据处理API。DataFrame的数据抽象是命名元组,代码里是Row类型,Dataframe结合了过程化编程和声名式的API,让用户能用过程化编程的方法处理结构化数据。
Dataframe比RDD多了限制,带来了更多的优化,基于Spark Catalyst优化器,提供如列裁剪,谓词下推,map join等优化。同时,采用code generation ,动态编译表达式,提升性能,比用rdd的自定义函数性能高5倍左右。
举个例子,
用rdd读结构化文本要用map函数,需要按位置获取数据,没有schema,性能和可读性都不好。而用dataframe可以直接读取结构化数据,性能比RDD高2到3倍左右,比MR高5倍左右,同时,具有结构化的数据,可读性更好。
DataFrame具有很好的易用性,支持多种语言,在一个上下文可以写udf,具有部署一致性,以前写HQL Transform的用户可以试试Dataframe,在复杂统计分析中,有dataframe可以过程化编程,模块化会更好,可读性强。
Dataframe可以用df.rdd等方式转化为RDD,处理更多灵活的操作。
这个项目要处理的是日志,所以测试数据要包括时间,用户的sessionId,用户的userId,还有其他的自定义的信息字段。团购网的用户一般行为是”search”, “click”, “order”, “pay”,这些相关信息可以在数据里作为自定义字段。
下面是创建DataFrame的代码
可以看到生成了两个内存临时表user_visit_action,user_info.
public static void main(String[] args) { // 构建Spark上下文 SparkConf conf = new SparkConf() .setAppName(Constants.SPARK_APP_NAME_SESSION) .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = getSQLContext(sc.sc()); // 生成模拟测试数据 mockData(sc, sqlContext); // 关闭Spark上下文 sc.close(); } /* 获取SQLContext * 如果是在本地测试环境的话,那么就生成SQLContext对象 * 如果是在生产环境运行的话,那么就生成HiveContext对象 * @param sc SparkContext * @return SQLContext */ private static SQLContext getSQLContext(SparkContext sc) { boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if(local) { return new SQLContext(sc); } else { return new HiveContext(sc); } } /** * 生成模拟数据(只有本地模式,才会去生成模拟数据) * @param sc * @param sqlContext */ private static void mockData(JavaSparkContext sc, SQLContext sqlContext) { boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if(local) { MockData.mock(sc, sqlContext); } }
具体解释一下上面的代码
(1)通过SparkConf创建JavaSparkContext,SparkConf默认去读取Spark.*的配置文件,也可以通过调用set的方法配置属性,例如上述的setMaster和setAppName。通过set方法配置的属性会覆盖读取的配置文件属性,SparkConf里面的所有set方法都支持链式调用chaining,例如上述的setAppName(“Constants.SPARK_APP_NAME_SESSION”).setMaster(“local”).
setAppName:设置应用名字,此名字会在Spark web UI显示
setMaster:设置主节点URL,本例使用“local”是指本机单线程,另外还有以下选项:
local[K]:本机K线程
local[*]:本机多线程,线程数与服务器核数相同
spark://HOST:PORT:Spark集群地址和端口,默认端口为7077
mesos://HOST:PORT:Mesos集群地址和端口,默认端口为5050
yarn:YARN集群
(2)SparkSQL有两个分支,sqlContext和hiveContext,sqlContext现在只支持SQL语法解析器(SQL-92语法);hiveContext现在支持SQL语法解析器和hivesql语法解析器,默认为hiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器,来运行hiveSQL不支持的语法,
Hive on Spark和Spark SQL是不同的东西
Hive on Spark: 是除了DataBricks之外的其他几个公司搞的,想让Hive跑在Spark上;
Spark SQL: Shark的后继产品, 解除了不少Hive的依赖,且让SQL更加抽象通用化, 支持json,parquet等格式;
SqlContext: 应该是对应spark-sql这个project; 与hive解耦,不支持hql查询;
HiveContext:应该是对应spark-hive这个项目; 与hive有部分耦合, 支持hql,是SqlContext的子类,也就是说兼容SqlContext;
(3)我们需要一些测试数据作为数据驱动,希望可以快速让spark跑起来。
一个简单的方法是代码创建DataFrame,另外也可以从csv,hive,sql数据源,或一个存在的RDD创建。
下面介绍一下DataFrame的来历:
Spark RDD是分布式弹性数据集,一个比较核心的是粗粒度的分布式计算,粗粒度是指用户不用关心太多的分布式细节,用声明式的API就能完成分布式计算,比如Word Count用一行就能写完。RDD易用性很好,那Spark为啥还要有Dataframe呢?
DataFrame的从API上借鉴了R和pandas的DataFRame的概念,是业界标准结化数据处理API。DataFrame的数据抽象是命名元组,代码里是Row类型,Dataframe结合了过程化编程和声名式的API,让用户能用过程化编程的方法处理结构化数据。
Dataframe比RDD多了限制,带来了更多的优化,基于Spark Catalyst优化器,提供如列裁剪,谓词下推,map join等优化。同时,采用code generation ,动态编译表达式,提升性能,比用rdd的自定义函数性能高5倍左右。
举个例子,
rdd.map(lambda line: line.split("\t")) .map(lambda items: (items[0], items[1], items[2], items[3])) .filter(lambda items: int(items[2]) >= 19) .select(lambda items: (items[0], items[1])) sqlContext.table("people") .filter(col("age") >= 19) .select("id", "name")
用rdd读结构化文本要用map函数,需要按位置获取数据,没有schema,性能和可读性都不好。而用dataframe可以直接读取结构化数据,性能比RDD高2到3倍左右,比MR高5倍左右,同时,具有结构化的数据,可读性更好。
DataFrame具有很好的易用性,支持多种语言,在一个上下文可以写udf,具有部署一致性,以前写HQL Transform的用户可以试试Dataframe,在复杂统计分析中,有dataframe可以过程化编程,模块化会更好,可读性强。
Dataframe可以用df.rdd等方式转化为RDD,处理更多灵活的操作。
这个项目要处理的是日志,所以测试数据要包括时间,用户的sessionId,用户的userId,还有其他的自定义的信息字段。团购网的用户一般行为是”search”, “click”, “order”, “pay”,这些相关信息可以在数据里作为自定义字段。
下面是创建DataFrame的代码
public static void mock(JavaSparkContext sc, SQLContext sqlContext) { List<Row> rows = new ArrayList<Row>(); String[] searchKeywords = new String[] {"火锅", "蛋糕", "重庆辣子鸡", "重庆小面", "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉"}; String date = DateUtils.getTodayDate(); String[] actions = new String[]{"search", "click", "order", "pay"}; Random random = new Random(); for(int i = 0; i < 100; i++) { long userid = random.nextInt(100); for(int j = 0; j < 10; j++) { String sessionid = UUID.randomUUID().toString().replace("-", ""); String baseActionTime = date + " " + random.nextInt(23); Long clickCategoryId = null; for(int k = 0; k < random.nextInt(100); k++) { long pageid = random.nextInt(10); String actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))); String searchKeyword = null; Long clickProductId = null; String orderCategoryIds = null; String orderProductIds = null; String payCategoryIds = null; String payProductIds = null; String action = actions[random.nextInt(4)]; if("search".equals(action)) { searchKeyword = searchKeywords[random.nextInt(10)]; } else if("click".equals(action)) { if(clickCategoryId == null) { clickCategoryId = Long.valueOf(String.valueOf(random.nextInt(100))); } clickProductId = Long.valueOf(String.valueOf(random.nextInt(100))); } else if("order".equals(action)) { orderCategoryIds = String.valueOf(random.nextInt(100)); orderProductIds = String.valueOf(random.nextInt(100)); } else if("pay".equals(action)) { payCategoryIds = String.valueOf(random.nextInt(100)); payProductIds = String.valueOf(random.nextInt(100)); } Row row = RowFactory.create(date, userid, sessionid, pageid, actionTime, searchKeyword, clickCategoryId, clickProductId, orderCategoryIds, orderProductIds, payCategoryIds, payProductIds, Long.valueOf(String.valueOf(random.nextInt(10)))); rows.add(row); } } } JavaRDD<Row> rowsRDD = sc.parallelize(rows); StructType schema = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("date", DataTypes.StringType, true), DataTypes.createStructField("user_id", DataTypes.LongType, true), DataTypes.createStructField("session_id", DataTypes.StringType, true), DataTypes.createStructField("page_id", DataTypes.LongType, true), DataTypes.createStructField("action_time", DataTypes.StringType, true), DataTypes.createStructField("search_keyword", DataTypes.StringType, true), DataTypes.createStructField("click_category_id", DataTypes.LongType, true), DataTypes.createStructField("click_product_id", DataTypes.LongType, true), DataTypes.createStructField("order_category_ids", DataTypes.StringType, true), DataTypes.createStructField("order_product_ids", DataTypes.StringType, true), DataTypes.createStructField("pay_category_ids", DataTypes.StringType, true), DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true), DataTypes.createStructField("city_id", DataTypes.LongType, true))); DataFrame df = sqlContext.createDataFrame(rowsRDD, schema); //生成内存临时表 df.registerTempTable("user_visit_action"); for(Row _row : df.take(1)) { System.out.println(_row); } } rows.clear(); String[] sexes = new String[]{"male", "female"}; for(int i = 0; i < 100; i ++) { long userid = i; String username = "user" + i; String name = "name" + i; int age = random.nextInt(60); String professional = "professional" + random.nextInt(100); String city = "city" + random.nextInt(100); String sex = sexes[random.nextInt(2)]; Row row = RowFactory.create(userid, username, name, age, professional, city, sex); rows.add(row); } rowsRDD = sc.parallelize(rows); StructType schema2 = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("user_id", DataTypes.LongType, true), DataTypes.createStructField("username", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true), DataTypes.createStructField("professional", DataTypes.StringType, true), DataTypes.createStructField("city", DataTypes.StringType, true), DataTypes.createStructField("sex", DataTypes.StringType, true))); DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2); for(Row _row : df2.take(1)) { System.out.println(_row); } df2.registerTempTable("user_info"); rows.clear();
可以看到生成了两个内存临时表user_visit_action,user_info.
相关文章推荐
- Spark日志分析项目Demo(4)--RDD使用,用户行为统计分析
- Spark日志分析项目Demo(7)--临时表查询,各区域top3热门商品统计
- Spark日志分析项目Demo(5)--自定义Accumulator
- Spark日志分析项目Demo(8)--SparkStream,广告点击流量实时统计
- Spark日志分析项目Demo(6)--页面单跳转化率分析
- Spark日志分析项目Demo(1)--Flume-ng的安装
- 日志采集分析项目Demo
- Spark-项目中分析日志的核心代码
- Spring or SpringBoot项目整合spark日志冲突解决
- Hadoop项目实战---黑马论坛日志分析
- PYTHON上海分享活动小记---SQUID日志分析项目开发
- THINKPHP项目开发中的日志记录实例分析
- Hadoop学习笔记—20.网站日志分析项目案例(一)项目介绍
- Spark Streaming揭秘 Day30 集群模式下SparkStreaming日志分析
- 大数据日志分析系统-spark进行日志计算
- 基于Hadoop+Hive+Sqoop+HBase+Zookeeper+MySql日志统计分析项目
- 使用Spark进行流式实时日志分析系统的实现
- Spark 基于pyspark下的实时日志分析
- Spark大型项目实战:电商用户行为分析大数据平台
- 基于spark之上的即席分析-日志分析场景