您的位置:首页 > 编程语言 > Python开发

利用mapreduce计算框架向hbase插入数据(python脚本)

2017-11-12 10:13 761 查看
mapreduce计算框架是hadoop项目中的一个分布式计算框架,他的强大的吞吐能力和批量的数据输出使之成为离线数据挖掘的首选框架。

hbase是一个nosql数据库,是参考了google内部的bigtable模型设计出来的一个nosql数据库,他减少了数据的冗余和使查询的效率提高,是实现数据挖掘的相关数据库的nosql数据库的首选语言,且底层数据存储在hadoop中的hdfs中。

使用版本:hadoop1.0和hbase0.98

hbase使用java编写的,所以要使用java开发的话,可以直接调用hbase计算框架,但是要使用java以外的语言开发的话,必须使用相关的依赖组件。

Hbase的python操作

thrift就是使用python操作Hbase数据库的依赖组件

安装thriftserver:

要使用thrift,得把这些插件安装完成

yum -y install automake libtool bison pkgconfig gcc-c++ boost-devel libevent-devel alib-devel python-devel ruby-devel openssl-devel boost-devel.x86_64 libevent-devel.x86_64


接下来安装thrift

wget http://archive.apache.org/dist/thrift/0.8.0/thrift-0.8.0.tar.gz //下载thrift插件
tar zxvf thrift-0.8.0.tar.gz
//解压thrift


然后进行配置和编译

#./configure --with-cpp=no --with-ruby=no
#make
#make install
//检查是否编译完成
thrift

file
Options:
-version    Print the compiler version
-o dir      Set the output directory for gen-* packages
(default: current directory)
-out dir    Set the ouput location for generated files.
(no gen-* folder will be created)
-I dir      Add a directory to the list of directories
searched for include directives
-nowarn     Suppress all compiler warnings (BAD!)
-strict     Strict compiler warnings on
-v[erbose]  Verbose mode
-r[ecurse]  Also generate included files
-debug      Parse debug trace to stdout
--allow-neg-keys  Allow negative field keys (Used to preserve protocol
compatibility with older .thrift files)
--allow-64bit-consts  Do not print warnings about using 64-bit constants
--gen STR   Generate code with a dynamically-registered generator.
STR has the form language[:key1=val1[,key2,[key3=val3]]].
Keys and values are options passed to the generator.
Many options will not require values.

Available generators (and options):
as3 (AS3):
bindable:          Add [bindable] metadata to all the struct classes.
c_glib (C, using GLib):
cocoa (Cocoa):
log_unexpected:  Log every time an unexpected field ID or type is encountered.
cpp (C++):
cob_style:       Generate "Continuation OBject"-style classes.
no_client_completion:
Omit calls to completion__() in CobClient class.
templates:       Generate templatized reader/writer methods.
pure_enums:      Generate pure enums instead of wrapper classes.
dense:           Generate type specifications for the dense protocol.
include_prefix:  Use full include paths in generated files.
csharp (C#):
delphi (delphi):
ansistr_binary:  Use AnsiString as binary properties.
erl (Erlang):
go (Go):
hs (Haskell):
html (HTML):
java (Java):
beans:           Members will be private, and setter methods will return void.
private-members: Members will be private, but setter methods will return 'this' like usual.
nocamel:         Do not use CamelCase field accessors with beans.
hashcode:        Generate quality hashCode methods.
android_legacy:  Do not use java.io.IOException(throwable) (available for Android 2.3 and above).
java5:           Generate Java 1.5 compliant code (includes android_legacy flag).
javame (Java ME):
js (Javascript):
jquery:          Generate jQuery compatible code.
node:            Generate node.js compatible code.
ocaml (OCaml):
perl (Perl):
php (PHP):
inlined:         Generate PHP inlined files
server:          Generate PHP server stubs
autoload:        Generate PHP with autoload
oop:             Generate PHP with object oriented subclasses
rest:            Generate PHP REST processors
namespace:       Generate PHP namespaces as defined in PHP >= 5.3
py (Python):
new_style:       Generate new-style classes.
twisted:         Generate Twisted-friendly RPC services.
utf8strings:     Encode/decode strings using utf8 in the generated code.
slots:           Generate code using slots for instance members.
dynamic:         Generate dynamic code, less code generated but slower.
dynbase=CLS      Derive generated classes from class CLS instead of TBase.
dynexc=CLS       Derive generated exceptions from CLS instead of TExceptionBase.
dynimport='from foo.bar import CLS'
Add an import line to generated code to find the dynbase class.
rb (Ruby):
st (Smalltalk):
xsd (XSD):


然后下载hbase源码:产生针对python的hbase的api

下载hbase源码

wget http://mirrors.hust.edu.cn/apache/hbase/0.98.24/hbase-0.98.24-src.tar.gz //下载hbase源码并且解压,进入hbase解压后的目录
find . -name Hbase.thrift
./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
//进入目录
cd ./hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift
//然后进行获取秘钥
thrift -gen py Hbase.thrift
//产生一个gen-py
Cp -raf gen  -py/hbase/ home/badou/hbase_test


然后启动thriftserver

//在hbase的bin目录底下,启动thrift服务
hbase-daemon.sh start thrift
//9090是thrift的端口
jps
3489 JobTracker
5416 Jps
4539 Main
3248 NameNode
4103 HRegionServer
4003 HMaster
3399 SecondaryNameNode
3888 HQuorumPeer
5336 ThriftServer


然后使用mapreduce进行插入数据到hbase数据库中

map.py

#!/usr/bin/python

import os
import sys

os.system('tar xvzf hbase.tgz > /dev/null')
os.system('tar xvzf thrift.tgz > /dev/null')

reload(sys)
sys.setdefaultencoding('utf-8')

sys.path.append("./")

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import *

transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)

protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = Hbase.Client(protocol)

transport.open()

tableName = 'new_music_table'

def mapper_func():
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
key = ss[0].strip()
val = ss[1].strip()

rowKey = key

mutations = [Mutation(column="meta-data:name", value=val), \
Mutation(column="flags:is_valid", value="TRUE")]

client.mutateRow(tableName, rowKey, mutations, None)

if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)


run.sh

HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"

INPUT_FILE_PATH_1="/input.data"
OUTPUT_PATH="/output_hbase"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func" \
-file ./map.py \
-file "./hbase.tgz" \
-file "./thrift.tgz"


这里需要注意得是hbase解压后的目录必须放在同一级别下才能使用

先在本机测试一下

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import *

transport = TSocket.TSocket('master', 9090)
transport = TTransport.TBufferedTransport(transport)

protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = Hbase.Client(protocol)

transport.open()

#==============================

base_info_contents = ColumnDescriptor(name='meta-data:', maxVersions=1)
other_info_contents = ColumnDescriptor(name='flags:', maxVersions=1)

client.createTable('new_music_table', [base_info_contents, other_info_contents])

print client.getTableNames()


//现将input.data数据传入hadoop中
hadoop fs -put input.data /


还有就是当使用mapreduce向hbase中插入数据时候,必须每个节点有hbase插件和thriftserver,这里就存在一个问题,你不能将所有的节点都安装上thrift和hbase,所以使用压缩方式向各个节点分发并且将所有使用thriftserver的地方指向本地机器,

这时候一个完整的利用mapreduce计算框架向hbase插入数据(python脚本)才算完成。

下次会使用hive语句向hbase插入数据。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase hadoop 数据库