Spark日志清洗一般流程(Python版)
2017-05-16 19:48
183 查看
Spark 1.6.1
Python 2.7.11
整理了一下使用spark来进行日志清洗及数据处理的套路,这里以pyspark为例
对于使用spark作为查询清洗工具而言,启动spark的套路主要使用sh文件进行终端带参数启动,启动后开始调用sh传递处理参数,并且构造好sparkconf后传递提交(spark-submit) python文件,当然最主要的函数逻辑都是在python的文件中处理的。对于工程类而非工具类,就稍微复杂点,而且会显得更规范点,以下为一个例子的套路
对于参数输入启动的方法主要有两种第一种是构造启动的sh文件,第二种是构造启动的python文件,启动文件的作用就是在终端执行该文件,然后输入参数,参数会被启动文件捕获并传递
方法一:start.sh
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
方法二:start.py
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
11
从脚本中获取外界传入的参数后,开始执行核心的逻辑操作,这里以pyspark为例,新建deal.py
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
对于一般的任务,这样差不多就可以结束了,然而对于日志清洗和存储来说,必不可少的是解压和存储过程,日志的量非常多,所以即使存在hdfs上也是压缩过的,比如文件格式是xxx.lzo样式的,即使取cat,没有解压也将看不到任何有用的信息,这里就要用到newAPIHadoopFile函数了,当然,textFile仍然可用
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
关于alais一些技巧,在终端各个目录内都可直接启动对应文件,工具化最方便的地方
Python 2.7.11
前言
整理了一下使用spark来进行日志清洗及数据处理的套路,这里以pyspark为例
pyspark的启动任务套路
对于使用spark作为查询清洗工具而言,启动spark的套路主要使用sh文件进行终端带参数启动,启动后开始调用sh传递处理参数,并且构造好sparkconf后传递提交(spark-submit) python文件,当然最主要的函数逻辑都是在python的文件中处理的。对于工程类而非工具类,就稍微复杂点,而且会显得更规范点,以下为一个例子的套路对于参数输入启动的方法主要有两种第一种是构造启动的sh文件,第二种是构造启动的python文件,启动文件的作用就是在终端执行该文件,然后输入参数,参数会被启动文件捕获并传递
方法一:start.sh
# start.sh read -p "Please input info as (year/month/day) and split by space:" raw_info year=$(echo $raw_info | awk -F'/' '{print $1}') month=$(echo $raw_info | awk -F'/' '{print $2}') day=$(echo $raw_info | awk -F'/' '{print $3}') echo $year echo $month echo $day # 进行spark任务的提交,这里需要注意的是提交的时候需要设置队列等 spark-submit --driver-memory 6G --queue 自己队列的名字 --conf "spark.scheduler.executorTaskBlacklistTime=30000" deal.py $year $month $day1
2
3
4
5
6
7
8
9
10
11
12
13
1
2
3
4
5
6
7
8
9
10
11
12
13
方法二:start.py
# start.py info=raw_input("Please input info as (year/month/day/):") # raw_input make input as string year=info.split("/")[0] month=info.split("/")[1] day=info.split("/")[2] print "year={year},month={month},day={day}".format(year=year,month=month,day=day) # 进行spark任务的提交,这里需要注意的是提交的时候需要设置队列等 spark-submit --driver-memory 6G --queue 自己队列的名字 --conf "spark.scheduler.executorTaskBlacklistTime=30000" deal.py year month day1
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
11
从脚本中获取外界传入的参数后,开始执行核心的逻辑操作,这里以pyspark为例,新建deal.py
#deal.py #-*-coding:utf-8-*- import sys from pyspark import * from pyspark.sql import * import time # 核心操作计算 def func_1(): funct... def dealfunc(sc): rdd=sc.textFile(hdfspath).map(func_1)... # 构建rdd方式可以从hdfs上把文件加载进来处理,之后进行各种Transformation操作,作为清洗数据的第一步 # 方法一:如果清洗完后数据量不大,完全可以加载到内存中然后当做流来处理,但是数据量非常大,那请不要作死使用collect() for data in rdd.collect(): func... ================== # 方法二:直接进行多次map,filter,等各种操作,凡事都可以靠函数解决,一个不够就写两个,函数传递进去的时候就当字符串流处理就可以了 data=rdd.map(func_1).filter(func_2).reduceByKey(func_3).map(func_4)... # 如果需要把清洗好的数据上传hdfs, data.repartition(5).saveAsTextFile('/user/test/restore') # 获取上层传递进来的参数 year = sys.argv[1] month = sys.argv[2] day = sys.argv[3] #在pysaprk中初始化spark sparkconf = (SparkConf().setAppName("Wifi-God wants to see air quality").set("spark.akka.frameSize","2000")) sc = (conf=sparkconf) # 强烈建议使用try,finally来处理 try: dealfunc(sc) except Exceptions as ex: print ex finally: sc.stop() # necessay1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
对于一般的任务,这样差不多就可以结束了,然而对于日志清洗和存储来说,必不可少的是解压和存储过程,日志的量非常多,所以即使存在hdfs上也是压缩过的,比如文件格式是xxx.lzo样式的,即使取cat,没有解压也将看不到任何有用的信息,这里就要用到newAPIHadoopFile函数了,当然,textFile仍然可用
# 读取 file = sc.newAPIHadoopFile(hadfspath, "com.hadoop.mapreduce.LzoTextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text") # 用法见官方文档 # 上传至hdfs,使用saveAsTextFile # repartition的作用是重新设定rdd分区数(关系到存入hdfs) rdd_dealed.repartition(5).saveAsTextFile('/user/test/rdd_restore') # 存在hdfs上rdd_restore文件夹中有几个片段就是有几个区 # $ hadoop fs -ls /user/test/rdd_restore # Found 5 items # -rw-r--r-- 3 owntest 0 2017-04-14 16:42 /user/test/rdd_restore/_SUCCESS # drwxr-xr-x - owntest 0 2017-04-14 16:42 /user/test/rdd_restore/_temporary # -rw-r--r-- 3 owntest 51 2017-04-14 16:42 /user/test/rdd_restore/part-00000 # -rw-r--r-- 3 owntest 150 2017-04-14 16:42 /user/rdd_restore/part-00001 # -rw-r--r-- 3 owntest 251 2017-04-14 16:42 /user/rdd_restore/part-000021
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
关于alais一些技巧,在终端各个目录内都可直接启动对应文件,工具化最方便的地方
#第一步,修改.bashrc文件,添加上以下信息,然后保存退出 $ vi ~/.bashrc alias test='cd ~/user/test; sh start.sh' #这句话的意思是,当在终端输入test的时候,执行的命令是先切换到user/test文件夹,然后,执行start.sh #第二步,使配置生效,或者重新再启动终端都可 $source ~/.bashrc #第三步,启动 $test #这样,使用别名的方法就可以再任何路径下启动自己对应的工具,值得注意的是,别名也不可用的太泛滥,不然不好管理
相关文章推荐
- Spark日志清洗一般流程
- 小Y的Python学习日志--流程控制(逻辑符)
- Spark日志清洗
- Hadoop兮,杀鸡别用牛刀,python+shell实现一般日志文件的查询、统计
- Python Pandas、Spark数据清洗
- hive 的日志处理统计网站的 PV 、UV案例 与 给合 python的数据清洗数据案例
- python scala kafka 集成一个流程项目 spark
- 小Y的Python学习日志--流程控制(if 条件判断)
- 数据挖掘一般流程(数据清洗,特征提取,建模,调参)
- ERP系统一般业务流程
- [项目管理入门系列] 师傅领进门-------项目运作的一般流程(一)
- python 日志记录 用于debug 【copy 一位老兄的】
- Google优化一般流程和一些常识
- Web项目开发的一般流程
- 黑客攻击的一般流程
- 一般软件具体测试流程
- ERP一般实施流程/步骤
- [项目管理入门系列] 师傅领进门-------项目运作的一般流程(三)
- 面试一般流程
- Python实现博客日志自动提交程序