您的位置:首页 > 数据库 > SQL

hadoop项目实战--ETL--(三)实现mysql表到HIVE表的全量导入与增量导入

2016-12-01 18:16 477 查看
一 在HIVE中创建ETL数据库

  ->create database etl;

二 在工程目录下新建MysqlToHive.py 和conf文件夹

  在conf文件夹下新建如下文件,最后的工程目录如下图

  


三 源码

  Import.xml

<?xml version="1.0" encoding="UTF-8"?>
<root>
<importtype>
<value>add</value>    <!-- 增量导入或者全导入 -->
</importtype>

<task type="all">
<table>user_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
<table>oder_all</table> <!-- 数据库中需要增量导入的第一张表名 -->
</task>

<task type="add">
<table>user_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
<table>oder_add</table> <!-- 数据库中需要增量导入的第一张表名 -->
</task>

</root>


  oder_add.xml

<?xml version="1.0" encoding="UTF-8"?>

<root>
<sqoop-shell type="import">
<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
<param key="username">root</param> <!-- 数据库用户名 -->
<param key="password">123456</param> <!-- 数据库密码 -->
<param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
<param key="hive-partition-value">$dt</param>
<param key="hive-import"></param>
<param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
<param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
<param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
</sqoop-shell>
</root>


  oder_all.xml

<?xml version="1.0" encoding="UTF-8"?>

<root>
<sqoop-shell type="import">
<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
<param key="username">root</param><!-- 数据库用户名 -->
<param key="password">123456</param><!-- 数据库密码 -->
<param key="table">oderinfo</param><!-- 数据库中待导出的表名 -->
<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
<param key="hive-partition-value">$dt</param>
<param key="hive-import"></param>
<param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
<param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
</sqoop-shell>
</root>


  user_add.xml

<?xml version="1.0" encoding="UTF-8"?>

<root>
<sqoop-shell type="import">
<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param> <!-- 数据库连接地址 -->
<param key="username">root</param> <!-- 数据库用户名 -->
<param key="password">123456</param> <!-- 数据库密码 -->
<param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
<param key="hive-partition-value">$dt</param>
<param key="hive-import"></param>
<param key="check-column">crt_time</param> <!-- 增量导入检查的列 -->
<param key="incremental">lastmodified</param> <!-- 按照时间簇来进行增量导入 -->
<param key="last-value">23:59:59</param> <!-- 增量导入时间划分点 -->
<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
</sqoop-shell>
</root>


  user_all.xml

  

<?xml version="1.0" encoding="UTF-8"?>

<root>
<sqoop-shell type="import">
<param key="connect">jdbc:mysql://192.168.0.154:3306/db_etl</param>   <!-- 数据库连接地址 -->
<param key="username">root</param><!-- 数据库用户名 -->
<param key="password">123456</param><!-- 数据库密码 -->
<param key="table">userinfo</param><!-- 数据库中待导出的表名 -->
<param key="hive-database">etl</param> <!-- 指定导入到HIVE的哪个数据库中 -->
<param key="hive-partition-key">dt</param>   <!-- 通过时间分区 -->
<param key="hive-partition-value">$dt</param>
<param key="hive-import"></param>
<param key="create-hive-table"></param>   <!-- 在hive中新建一张同名同结构的表 -->
<param key="hive-overwrite"></param> <!-- 覆盖原来以存在的表 -->
<param key="num-mappers">1</param>   <!-- 使用map任务个数 -->
<param key="split-by">id</param> <!-- 将表按照id水平切分交给map处理  -->
</sqoop-shell>
</root>


  MysqlToHive.py

# _*_ coding:UTF-8 _*_
'''
Created on 2016��12��1��

@author: duking
'''
import datetime
import os
import xml.etree.ElementTree as ET
import collections

#获取昨天时间
def getYesterday():
today=datetime.date.today()
oneday=datetime.timedelta(days=1)
yesterday=today-oneday
return yesterday

def Resolve_Conf(dt):

#获取当前工程目录
PROJECT_DIR = os.getcwd()
#获得配置文件名
conf_file = PROJECT_DIR + "\conf\Import.xml"
#解析配置文件
xml_tree = ET.parse(conf_file)

#提取出本次导入的类型  全导入或者增量导入  通过配置import.xml中的plan标签的value值设定
import_types = xml_tree.findall('./importtype')
for import_type in import_types:
aim_types = import_type.findall('./value')
for i in range(len(aim_types)):
aim_type = aim_types[i].text

#获得task元素
tasks = xml_tree.findall('./task')

#用来保存待执行的sqoop命令的集合
cmds = []

for task in tasks:
#获得导入类型,增量导入或者全量导入
import_type = task.attrib["type"]

#如果task的标签导入类型与设定类型不同,结束本次循环
if(import_type != aim_type):
continue

#获得表名集合
tables = task.findall('./table')

#迭代表名集合,解析表配置文件
for i in range(len(tables)):
#表名
table_name = tables[i].text
#表配置文件名
table_conf_file = PROJECT_DIR + "\conf\\" + table_name + ".xml"

#解析表配置文件
xmlTree = ET.parse(table_conf_file)

#获取sqoop-shell 节点
sqoopNodes = xmlTree.findall("./sqoop-shell")
#获取sqoop 命令类型
sqoop_cmd_type = sqoopNodes[0].attrib["type"]

#首先组装成sqoop命令头
command = "sqoop " + sqoop_cmd_type

#获取
praNodes = sqoopNodes[0].findall("./param")

#用来保存param的信息的有序字典
cmap = collections.OrderedDict()
#将所有param中的key-value存入字典中
for i in range(len(praNodes)):
#获取key的属性值
key = praNodes[i].attrib["key"]
#获取param标签中的值
value = praNodes[i].text
#保存到字典中
cmap[key] = value

#迭代字典将param的信息拼装成字符串
for key in cmap:

value = cmap[key]

#如果不是键值对形式的命令 或者值为空,跳出此次循环
if(value == None or value == "" or value == " "):
value = ""

if(key == "hive-partition-value"):
value = value.replace('$dt',str(dt))
#合成前一天的时间
if(key == "last-value"):
value = '"' + str(dt) + " " + value + '"'

#拼装为命令
command += " --" + key + " " + value + " "

#将命令加入至待执行的命令集合
cmds.append(command)

return cmds

#python 模块的入口:main函数
if __name__ == '__main__':

dt = getYesterday();

#解析配置文件,生成相应的HQL语句
cmds = Resolve_Conf(dt)

#迭代集合,执行命令
for i in range(len(cmds)):
cmd = cmds[i]
print cmd
#执行导入过秤
os.system(cmd)


  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: