您的位置:首页 > 编程语言 > Python开发

使用python访问hbase

2009-04-19 12:06 435 查看
通过thrift,我们可以使用python访问hbase。

关于thrift

thrift是一个跨语言服务的软件开发框架(Thrift is a software framework for scalable cross-language services development.)。

它的官方网站是:http://incubator.apache.org/thrift/

下载thrift

svn co http://svn.apache.org/repos/asf/incubator/thrift/trunk thrift

[b]安装thritf(Linux)[/b]

cd thrift

./bootstrap.sh

./configure

make

make install

生成hbase的client代码

cd $HBASE_HOME/src/java/org/apache/hadoop/hbase/thrift

thrift --gen py Hbase.thrift

然后将生成的gen-py文件夹下的hbase文件夹拷贝到

/usr/lib/python2.5/site-packages/

准备hbase

首先确认hbase正常工作,然后启动hbase的thrift服务:

$HBASE_HOME//bin/hbase-deamon.sh start thrift

OK,准备工作到此为止,我们开始编写python客户程序。

假设我们需要一个表保存从网上抓取下来的网页。

表命名为"webpages"

它使用网页的url反转后作为行标识符,使用列组"contents:"(注意结尾的冒号)保存网页的内容。

导入需要的模块:

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, NotFound


建立与hbase的连接:

transport = TTransport.TBufferedTransport(
TSocket.TSocket(netloc, port))
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
transport.open()


创建表:

#只保留一个版本,使用BLOCK方式压缩
#其他参数请参考hbase的API
contents=ColumnDescriptor(name="contents:", maxVersions=1, compression="BLOCK")
client.createTable(“webpages”,[contents,])


写入数据:

def write(url, content):
row = self.reverseUrl(url)
mutations = [Mutation(column="contents:", value=content)]
client.mutateRow(“webpages”, row, mutations)


完整的代码和单元测试如下:

from unittest import TestCase, main
from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, NotFound
class HbaseWriter:

def __init__(self, netloc, port, table="webpages"):
self.tableName = table

self.transport = TTransport.TBufferedTransport(
TSocket.TSocket(netloc, port))
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = Hbase.Client(self.protocol)
self.transport.open()

tables = self.client.getTableNames()
if self.tableName not in tables:
self.__createTable()

def __del__(self):
self.transport.close()

def __createTable(self):
self.client.createTable(self.tableName,
[ColumnDescriptor(name="contents:", maxVersions=1, compression="BLOCK"),])

def reverseUrl(self, url):
link = filter(None, url.split("//"))[-1]
hops = filter(None, link.split("/"))
domain = hops[0].split(".")
domain.reverse()
domain = '.'.join(domain)
hops[0] = domain
return '/'.join(hops)

def write(self, url, content):
row = self.reverseUrl(url)
mutations = [Mutation(column="contents:", value=content)]
self.client.mutateRow(self.tableName, row, mutations)

class TestHbaseWriter(TestCase):
def setUp(self):
self.writer = HbaseWriter("192.168.1.103", 9090, "test")

def tearDown(self):
name = self.writer.tableName
client = self.writer.client
client.disableTable(name)
client.deleteTable(name)

def testReverseUrl(self):
self.assertEquals(self.writer.reverseUrl("http://www.a.com"), "com.a.www")
self.assertEquals(self.writer.reverseUrl("http://www.a.com/"), "com.a.www")
self.assertEquals(self.writer.reverseUrl("http://a.com"), "com.a")
self.assertEquals(self.writer.reverseUrl("http://www.b.com/foo"), "com.b.www/foo")
self.assertEquals(self.writer.reverseUrl("aaa.bbb.ccc.com.cn/foo1/foo2"), "cn.com.ccc.bbb.aaa/foo1/foo2")

def testCreate(self):
tableName = self.writer.tableName
client = self.writer.client
self.assertTrue(self.writer.tableName in client.getTableNames())
columns = dict()
columns["contents"] = ColumnDescriptor(name="contents", maxVersions=1, compression="BLOCK")
cds = client.getColumnDescriptors(tableName)
for name,column in cds.items():
self.assertTrue(column.name in columns)

def testWrite(self):
tableName = self.writer.tableName
client = self.writer.client
data = {"http://www.a.com":"com.a.www",
"http://www.a.com/bbb":"com.a.www/bbb",
"http://www.foo.com/foo":"foo"}
for url, content in data.items():
self.writer.write(url, content)

scannerId = client.scannerOpen(tableName, "", ["contents:",])
while True :
try:
result = client.scannerGet(scannerId)
except NotFound:
break
row = result.row
contents = result.columns["contents:"].value
url = "http://" + self.writer.reverseUrl(row)
self.assertTrue(url in data)
self.assertEqual(data[url], contents)
client.scannerClose(scannerId)

if __name__ == "__main__":
main()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: