业务系统JSON日志通过python处理并导入Mysql方案
2015-12-19 11:47
991 查看
一、环境准备及说明
1、程序目录路径
2、Mysql目标表结构
3、目标表分区维护的存储过程
二、运行的python及shell脚本
1、连接mysql数据库的python脚本
2、处理JSON数据的python脚本
3、定时调度的shell脚本
由于CSDN编码问题,上面脚本的grep部分应为“grep -h "\[{.*}\]" /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log >> ./log_tmpdir/yemaopythonlog;”
4、手动调度的shell脚本
由于CSDN编码问题,上面脚本的grep部分应为“grep -h "\[{.*}\]" /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log >> ./log_tmpdir/yemaopythonlog;”
日期列表数据内容:
三、其他说明
1、在json2mysql_python_recordasarray.py脚本上包含参数传递的内容,数据最终通过Py_logproc.sh脚本的yesterday传递进去。
2、在json2mysql_python_recordasarray.py脚本中添加了 sys.setdefaultencoding( "utf-8" )字符集转换部分,强制成utf8。
3、自动调度的脚本放在crontab中调度,供每日数据跑批;手动调度脚本可在日期列表中定义日期,用于补跑历史日志数据。
4、手动调度的时候可以使用“nohup sh Py_logproc_manual.sh &”,退出xshell客户端不影响正在运行的程序;查看程序是否正在执行使用“ps -ef |grep Py_logproc_manual”;查看输出日志查看当前文件夹下新生成的nohup.out,或实时刷新查看使用“tail -f 100 nohup.out”。
5、目录log_tmpdir用于临时存放数据用,程序执行结束会将临时数据删除。
1、程序目录路径
[spark@Master Log_Data]$ pwd /home/spark/opt/Log_Data [spark@Master Log_Data]$ ls -LGR .: Py_logproc yemao ymlog_proc_manual.sh ymlog_proc.sh ./Py_logproc: date.list db.py db.pyc json2mysql_python_recordasarray.py log_tmpdir nohup.out Py_logproc_manual.sh Py_logproc.sh ./Py_logproc/log_tmpdir: yemaopythonlog ./yemao: yemao1_20151109.tar.gz yemao1_20151117.tar.gz yemao1_20151125.tar.gz yemao1_20151203.tar.gz yemao1_20151211.tar.gz yemao2_20151106-08.tar.gz yemao2_20151116.tar.gz yemao2_20151124.tar.gz yemao2_20151202.tar.gz yemao2_20151210.tar.gz yemao2_20151218.tar.gz yemao1_20151110.tar.gz yemao1_20151118.tar.gz yemao1_20151126.tar.gz yemao1_20151204.tar.gz yemao1_20151212.tar.gz yemao2_20151109.tar.gz yemao2_20151117.tar.gz yemao2_20151125.tar.gz yemao2_20151203.tar.gz yemao2_20151211.tar.gz yemao1_20151111.tar.gz yemao1_20151119.tar.gz yemao1_20151127.tar.gz yemao1_20151205.tar.gz yemao1_20151213.tar.gz yemao2_20151110.tar.gz yemao2_20151118.tar.gz yemao2_20151126.tar.gz yemao2_20151204.tar.gz yemao2_20151212.tar.gz yemao1_20151112.tar.gz yemao1_20151120.tar.gz yemao1_20151128.tar.gz yemao1_20151206.tar.gz yemao1_20151214.tar.gz yemao2_20151111.tar.gz yemao2_20151119.tar.gz yemao2_20151127.tar.gz yemao2_20151205.tar.gz yemao2_20151213.tar.gz yemao1_20151113.tar.gz yemao1_20151121.tar.gz yemao1_20151129.tar.gz yemao1_20151207.tar.gz yemao1_20151215.tar.gz yemao2_20151112.tar.gz yemao2_20151120.tar.gz yemao2_20151128.tar.gz yemao2_20151206.tar.gz yemao2_20151214.tar.gz yemao1_20151114.tar.gz yemao1_20151122.tar.gz yemao1_20151130.tar.gz yemao1_20151208.tar.gz yemao1_20151216.tar.gz yemao2_20151113.tar.gz yemao2_20151121.tar.gz yemao2_20151129.tar.gz yemao2_20151207.tar.gz yemao2_20151215.tar.gz yemao1_20151115.tar.gz yemao1_20151123.tar.gz yemao1_20151201.tar.gz yemao1_20151209.tar.gz yemao1_20151217.tar.gz yemao2_20151114.tar.gz yemao2_20151122.tar.gz yemao2_20151130.tar.gz yemao2_20151208.tar.gz yemao2_20151216.tar.gz yemao1_20151116.tar.gz yemao1_20151124.tar.gz yemao1_20151202.tar.gz yemao1_20151210.tar.gz yemao1_20151218.tar.gz yemao2_20151115.tar.gz yemao2_20151123.tar.gz yemao2_20151201.tar.gz yemao2_20151209.tar.gz yemao2_20151217.tar.gz [spark@Master Log_Data]$
2、Mysql目标表结构
CREATE TABLE `yemao_logpy` ( `id` varchar(80) DEFAULT NULL, `time` varchar(80) DEFAULT NULL, `url_from` varchar(800) DEFAULT NULL, `url_current` varchar(800) DEFAULT NULL, `url_to` varchar(800) DEFAULT NULL, `options` varchar(800) DEFAULT NULL, `ip` varchar(100) DEFAULT NULL, `uid` varchar(80) DEFAULT NULL, `new_visitor` varchar(8) DEFAULT NULL, `province` varchar(8) DEFAULT NULL, `city` varchar(8) DEFAULT NULL, `site` varchar(80) DEFAULT NULL, `device` varchar(80) DEFAULT NULL, `browser` varchar(800) DEFAULT NULL, `phone` varchar(80) DEFAULT NULL, `token` varchar(800) DEFAULT NULL, `dorm` varchar(800) DEFAULT NULL, `order_phone` varchar(80) DEFAULT NULL, `order_dormitory` varchar(80) DEFAULT NULL, `order_amount` varchar(80) DEFAULT NULL, `order_id` varchar(80) DEFAULT NULL, `uname` varchar(80) DEFAULT NULL, `site_id` varchar(80) DEFAULT NULL, `address` varchar(800) DEFAULT NULL, `dorm_id` varchar(80) DEFAULT NULL, `dormentry_id` varchar(80) DEFAULT NULL, `tag` varchar(800) DEFAULT NULL, `rid` varchar(80) DEFAULT NULL, `cart_quantity` varchar(80) DEFAULT NULL, `response` text, `paytype` varchar(80) DEFAULT NULL, `data` text, `info` varchar(800) DEFAULT NULL, `status` varchar(80) DEFAULT NULL, `log_date` int(8) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 /*!50100 PARTITION BY LIST (log_date) (PARTITION p0 VALUES IN (0) ENGINE = InnoDB, PARTITION p20151109 VALUES IN (20151109) ENGINE = InnoDB, PARTITION p20151110 VALUES IN (20151110) ENGINE = InnoDB, PARTITION p20151111 VALUES IN (20151111) ENGINE = InnoDB, PARTITION p20151112 VALUES IN (20151112) ENGINE = InnoDB, PARTITION p20151113 VALUES IN (20151113) ENGINE = InnoDB, PARTITION p20151114 VALUES IN (20151114) ENGINE = InnoDB, PARTITION p20151115 VALUES IN (20151115) ENGINE = InnoDB, PARTITION p20151116 VALUES IN (20151116) ENGINE = InnoDB, PARTITION p20151117 VALUES IN (20151117) ENGINE = InnoDB) */
3、目标表分区维护的存储过程
CREATE DEFINER=`datahouse`@`%` PROCEDURE `p_ymlogpy_maintain`(IN `v_log_date` int) BEGIN DECLARE v_partition_exists INT; SELECT count(1) INTO v_partition_exists FROM information_schema.`PARTITIONS` WHERE TABLE_SCHEMA = 'logdata' AND table_name = 'yemao_logpy' AND partition_name = concat('p',v_log_date); IF v_partition_exists = 1 THEN SET @exec_sql=concat("ALTER TABLE logdata.yemao_logpy DROP PARTITION p",v_log_date); PREPARE stmt FROM @exec_sql; EXECUTE stmt; END IF; SET @exec_sql=concat("ALTER TABLE logdata.yemao_logpy ADD PARTITION (PARTITION p",v_log_date," VALUES IN (",v_log_date,"));"); PREPARE stmt FROM @exec_sql; EXECUTE stmt; END
二、运行的python及shell脚本
1、连接mysql数据库的python脚本
[spark@Master Py_logproc]$ cat db.py import MySQLdb db_config = { 'host': '120.55.189.188', 'user': 'datahouse', 'passwd': 'DTHS2016', 'port': 3306, 'db': 'logdata', 'charset': 'utf8' } def getDB(): try: conn = MySQLdb.connect(host=db_config['host'],user=db_config['user'],passwd=db_config['passwd'],port=db_config['port'],charset=db_config['charset']) conn.autocommit(True) curr = conn.cursor() curr.execute("SET NAMES utf8"); curr.execute("USE %s" % db_config['db']); return conn, curr except MySQLdb.Error,e: print "Mysql Error %d: %s" % (e.args[0], e.args[1]) return None, None
2、处理JSON数据的python脚本
[spark@Master Py_logproc]$ cat json2mysql_python_recordasarray.py # -*- encoding:utf-8 -*- from db import getDB import json import warnings warnings.filterwarnings("ignore") conn,curr = getDB() if __name__=="__main__": import sys reload(sys) sys.setdefaultencoding( "utf-8" ) if len(sys.argv)==1: print "need argv" else: print sys.argv i=0 for json_array in open('/home/spark/opt/Log_Data/Py_logproc/log_tmpdir/yemaopythonlog'): yemao_array = json.loads(json_array) for yemao in yemao_array: if not yemao.has_key('_reason'): id = yemao['id'] time = yemao['time'] url_from = yemao['url_from'] url_current = yemao['url_current'] url_to = yemao['url_to'] options = yemao['options'] ip = yemao['ip'] uid = yemao['uid'] new_visitor = yemao['new_visitor'] province = yemao['province'] city = yemao['city'] site = yemao['site'] device = yemao['device'] browser = yemao['browser'] phone = yemao['phone'] token = yemao['token'] dorm = yemao['dorm'] order_phone = yemao['order_phone'] order_dormitory = yemao['order_dormitory'] order_amount = yemao['order_amount'] order_id = yemao['order_id'] uname = yemao['uname'] site_id = yemao['site_id'] address = yemao['address'] dorm_id = yemao['dorm_id'] dormentry_id = yemao['dormentry_id'] tag = yemao['tag'] rid = yemao['rid'] cart_quantity = yemao['cart_quantity'] response = yemao['response'] paytype = yemao['paytype'] if yemao.has_key('data'): data = yemao['data'] else: data = '0' data = '"'+str(data)+'"' if yemao.has_key('info'): info = yemao['info'] else: info = '0' if yemao.has_key('status'): status = yemao['status'] else: status = '0' log_date = int(sys.argv[1]) sql = "insert into yemao_logpy(id,time,url_from,url_current,url_to,options,ip,uid,new_visitor,province,city,site,device,browser,phone,token,dorm,order_phone,order_dormitory,order_amount,order_id,uname,site_id,address,dorm_id,dormentry_id,tag,rid,cart_quantity,response,paytype,data,info,status,log_date) values ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %s, '%s', '%s',%d)" % (id,time,url_from,url_current,url_to,options,ip,uid,new_visitor,province,city,site,device,browser,phone,token,dorm,order_phone,order_dormitory,order_amount,order_id,uname,site_id,address,dorm_id,dormentry_id,tag,rid,cart_quantity,response,paytype,data,info,status,log_date) print sql curr.execute(sql) print i i += 1 else: print i i += 1 print i curr.close() conn.close() print 'yemao_array_python done' [spark@Master Py_logproc]$
3、定时调度的shell脚本
[spark@Master Py_logproc]$ cat Py_logproc.sh #/bin/bash export yesterday=`date -d last-day +%Y%m%d` cd /home/spark/opt/Log_Data/Py_logproc for tar in /home/spark/opt/Log_Data/yemao/yemao*$yesterday.tar.gz; do tar zxvf $tar -C /home/spark/opt/Log_Data/Py_logproc/log_tmpdir; grep -h "\[{.*}\]" /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log >> ./log_tmpdir/yemaopythonlog; rm -rf /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log done #sed -i 's/^.//' yemaolog #sed -i 's/..$//' yemaolog /usr/local/mysql/bin/mysql -h120.55.189.188 -udatahouse -pDTHS2016 -e "call logdata.p_ymlogpy_maintain($yesterday);" python /home/spark/opt/Log_Data/Py_logproc/json2mysql_python_recordasarray.py $yesterday #/home/spark/opt/hive-1.2.1/bin/hive -e "alter table yemao_log drop if exists partition (log_date=$yesterday);alter table yemao_log add if not exists partition (log_date=$yesterday);load data local inpath '/home/spark/opt/Log_Data/yemao/yemao.dat' into table yemao_log partition (log_date=$yesterday);" rm -rf /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/yemaopythonlog [spark@Master Py_logproc]$
由于CSDN编码问题,上面脚本的grep部分应为“grep -h "\[{.*}\]" /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log >> ./log_tmpdir/yemaopythonlog;”
4、手动调度的shell脚本
[spark@Master Py_logproc]$ cat Py_logproc_manual.sh #/bin/bash #export yesterday=`date -d last-day +%Y%m%d` while read yesterday do cd /home/spark/opt/Log_Data/Py_logproc for tar in /home/spark/opt/Log_Data/yemao/yemao*$yesterday.tar.gz; do tar zxvf $tar -C /home/spark/opt/Log_Data/Py_logproc/log_tmpdir; grep -h "\[{.*}\]" /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log >> ./log_tmpdir/yemaopythonlog; rm -rf /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log done #sed -i 's/^.//' yemaolog #sed -i 's/..$//' yemaolog /usr/local/mysql/bin/mysql -h120.55.189.188 -udatahouse -pDTHS2016 -e "call logdata.p_ymlogpy_maintain($yesterday);" python /home/spark/opt/Log_Data/Py_logproc/json2mysql_python_recordasarray.py $yesterday #/home/spark/opt/hive-1.2.1/bin/hive -e "alter table yemao_log drop if exists partition (log_date=$yesterday);alter table yemao_log add if not exists partition (log_date=$yesterday);load data local inpath '/home/spark/opt/Log_Data/yemao/yemao.dat' into table yemao_log partition (log_date=$yesterday);" rm -rf /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/yemaopythonlog done < /home/spark/opt/Log_Data/Py_logproc/date.list
由于CSDN编码问题,上面脚本的grep部分应为“grep -h "\[{.*}\]" /home/spark/opt/Log_Data/Py_logproc/log_tmpdir/*.log >> ./log_tmpdir/yemaopythonlog;”
日期列表数据内容:
[spark@Master Py_logproc]$ cat date.list 20151109 20151110 20151111 20151112 20151113 20151114 20151115 20151116 20151117 20151118 20151119 [spark@Master Py_logproc]$
三、其他说明
1、在json2mysql_python_recordasarray.py脚本上包含参数传递的内容,数据最终通过Py_logproc.sh脚本的yesterday传递进去。
2、在json2mysql_python_recordasarray.py脚本中添加了 sys.setdefaultencoding( "utf-8" )字符集转换部分,强制成utf8。
3、自动调度的脚本放在crontab中调度,供每日数据跑批;手动调度脚本可在日期列表中定义日期,用于补跑历史日志数据。
4、手动调度的时候可以使用“nohup sh Py_logproc_manual.sh &”,退出xshell客户端不影响正在运行的程序;查看程序是否正在执行使用“ps -ef |grep Py_logproc_manual”;查看输出日志查看当前文件夹下新生成的nohup.out,或实时刷新查看使用“tail -f 100 nohup.out”。
5、目录log_tmpdir用于临时存放数据用,程序执行结束会将临时数据删除。
相关文章推荐
- Python Network Programming(2)---地址转换函数
- Python: easy_install & pip 下载PyPi 公共资源库的工具
- [python]Numpy快速入门
- 安装python的图形处理库: pillow
- Python 基础【第七篇】集合
- 用conda管理Python包
- python数据类型详解
- 最大似然估计(MLE)的一些公式与定理(python实践)
- 再看Python.第2天
- 再看Python.第1天
- My way to Python - Day06 socket基础
- Python 面向对象 三
- Python 面向对象 四
- 一入Python深似海--print
- python sqlalchemy-migrate 使用方法
- python 遍历hadoop, 跟指定列表对比 包含列表中值的取出。
- python之类--socket
- python学习笔记之基础一(第一天)
- Python单例模式的4种实现方法
- python 计算一年内的所有周的具体日期