您的位置:首页 > 编程语言 > Python开发

利用hadoopstreaming&python导入数据库数据

2015-08-25 15:14 651 查看
由于需要把4000万行数据导入数据库, R语言太慢,所以决定用hadoop streaming导入数据

例子参考

http://blog.csdn.net/zhaoyl03/article/details/8657031/

运行方式:

${HADOOP_HOME}/bin/hadoop \
jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
-file mapper.py \
-file reducer.py \
-mapper mapper.py  \
-reducer reducer.py \
-input /testdata.txt  \
-output /streaminput3  \
-jobconf mapred.reduce.tasks=2


运行结果

[root@master codes]# /usr/hadoop/bin/hadoop fs -cat /streaminput3/part-00000                                    15/08/23 06:45:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(The    1
(which  1
2008).  1
Actually,       1
Alchemy 1
Art     1
But     1
Cs▒▒rdi 1
GDB,    1
GitHub. 1
I       7
I▒▒ll   1
Matloff 1
NSP,    1
R▒▒s    2
Software        2
Stanford        1
The     2
a       10
across  1
add     1
adding  2
also    3
and     3
another 1
around  2
at      1
before  1
before, 1
beginning,      1
book    1
but     6
by      2
cleaning        1
code    2
different       1
do      3
does    1
dump.frames().  1
enhancements    1
especially      1
establishes     1
estimation,     1
facilities      1
finally 1
for     7
fun     1
general).       1
give    1
gotten  2
had     3
hadn▒▒t 2
hard,   1
having  1
higher  1
huge    1
idea,   1
in      7
it      3
it.     1
kernel  1
know,   1
latest  1
lot     1
main    1
might   1
more    1
much    1
nice    1
nodes   1
nonparametric   2
not     1
of      7
on      4
parallel        3
place   1
planning        1
portion 1
problem 1
properties      1
ran     1
regression      1
regression,     1
regression.     1
researchers     1
ridge   1
running 1
than    1
that    4
the     10
time    1
to      6
to.     2
tools   2
up      1
various 1
was     2
way     1
which   3
while   1
work    1
wrote   1
you     2
your    1


虚拟机测试

${HADOOP_HOME}/bin/hadoop jar \
${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
-file mapperSQL.py -mapper mapperSQL.py  \
-input /foreastdatatest.csv \
-output /streaminput/  \
-jobconf mapred.reduce.tasks=2


#!/usr/bin/env python

import MySQLdb,sys

#input One Line
def input(value):
try:

conn=MySQLdb.connect(host='master' ,user='root',db='test',port=3306)

cur=conn.cursor()

#value = ['"1029418"', '"2014-07-31 23:45:00"', '"575.914531958545"']

cur.execute("insert into DataPowerPrediction values(%s,%s,%s)",value)

# conn.commit()
cur.close()
conn.close()

except MySQLdb.Error,e:
pass

#value = ['1029418', '2014-07-31 23:45:00', '575.914531958545']

# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
line = line.replace('"','')
# split the line into words
words = line.split(',')
# increase counters
print words
input(words)


在集群中测试

hadoop \
jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.2-transwarp.jar \
-file mapper.py -mapper mapper.py  \
-input ./test/forecastdata \
-output ./test/forcastdatatosqloutputfile2 \
-jobconf mapred.map.tasks=8 \
-jobconf mapred.reduce.tasks=2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: