利用hadoopstreaming&python导入数据库数据
2015-08-25 15:14
651 查看
由于需要把4000万行数据导入数据库, R语言太慢,所以决定用hadoop streaming导入数据
运行方式:
运行结果
例子参考
http://blog.csdn.net/zhaoyl03/article/details/8657031/运行方式:
${HADOOP_HOME}/bin/hadoop \ jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \ -file mapper.py \ -file reducer.py \ -mapper mapper.py \ -reducer reducer.py \ -input /testdata.txt \ -output /streaminput3 \ -jobconf mapred.reduce.tasks=2
运行结果
[root@master codes]# /usr/hadoop/bin/hadoop fs -cat /streaminput3/part-00000 15/08/23 06:45:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (The 1 (which 1 2008). 1 Actually, 1 Alchemy 1 Art 1 But 1 Cs▒▒rdi 1 GDB, 1 GitHub. 1 I 7 I▒▒ll 1 Matloff 1 NSP, 1 R▒▒s 2 Software 2 Stanford 1 The 2 a 10 across 1 add 1 adding 2 also 3 and 3 another 1 around 2 at 1 before 1 before, 1 beginning, 1 book 1 but 6 by 2 cleaning 1 code 2 different 1 do 3 does 1 dump.frames(). 1 enhancements 1 especially 1 establishes 1 estimation, 1 facilities 1 finally 1 for 7 fun 1 general). 1 give 1 gotten 2 had 3 hadn▒▒t 2 hard, 1 having 1 higher 1 huge 1 idea, 1 in 7 it 3 it. 1 kernel 1 know, 1 latest 1 lot 1 main 1 might 1 more 1 much 1 nice 1 nodes 1 nonparametric 2 not 1 of 7 on 4 parallel 3 place 1 planning 1 portion 1 problem 1 properties 1 ran 1 regression 1 regression, 1 regression. 1 researchers 1 ridge 1 running 1 than 1 that 4 the 10 time 1 to 6 to. 2 tools 2 up 1 various 1 was 2 way 1 which 3 while 1 work 1 wrote 1 you 2 your 1
虚拟机测试
${HADOOP_HOME}/bin/hadoop jar \ ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \ -file mapperSQL.py -mapper mapperSQL.py \ -input /foreastdatatest.csv \ -output /streaminput/ \ -jobconf mapred.reduce.tasks=2
#!/usr/bin/env python import MySQLdb,sys #input One Line def input(value): try: conn=MySQLdb.connect(host='master' ,user='root',db='test',port=3306) cur=conn.cursor() #value = ['"1029418"', '"2014-07-31 23:45:00"', '"575.914531958545"'] cur.execute("insert into DataPowerPrediction values(%s,%s,%s)",value) # conn.commit() cur.close() conn.close() except MySQLdb.Error,e: pass #value = ['1029418', '2014-07-31 23:45:00', '575.914531958545'] # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() line = line.replace('"','') # split the line into words words = line.split(',') # increase counters print words input(words)
在集群中测试
hadoop \ jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.2-transwarp.jar \ -file mapper.py -mapper mapper.py \ -input ./test/forecastdata \ -output ./test/forcastdatatosqloutputfile2 \ -jobconf mapred.map.tasks=8 \ -jobconf mapred.reduce.tasks=2
相关文章推荐
- Python基础学习
- python 函数
- python logging usage
- python使用supervisord管理进程
- 7.python 三元操作符和断言
- python数据类型——字典
- python练习
- python2.7.10安装配置
- python 流程控制
- Python的列表推导式学习
- windows32位安装MYSQL-python1.2.3
- [python]发送邮件(可带附件+转中文)
- python并行化介绍及使用 Pool
- python菜鸟日记8
- 超算上安装python+HTSeq+numpy+easy_install
- 使用Python实现Hadoop MapReduce程序
- pythonXXX云所有厂商
- python find file
- (转)python 计算代码行数以及相关知识点
- Python并发编程