您的位置:首页 > 编程语言 > Python开发

php/perl/python , 通过thrift 连接 hbase,进行条件过滤选择

2013-02-01 18:28 573 查看
云计算nosql中的棋手级产品,hbase应用越来越广。所以我也用到了,咨询了相关同事,为了将来存取海量数据,rowkey 设计成 uid_<bissid> 之类的形式。但是产品中,肯定会有根据uid取得 bissid列表的需求,根据文档,用scan,filter 可以方便的取出来。同时,我还记录了thrift 部署及环境的相关问题,网上诸多文档千篇一律,很多都是有问题的。或者没说明什么原因。1 安装thrift, 我这里是用的0.8版本的;0.9 很早以前用过,没有弄成功,所以就以这个为基础了。安装步骤就不说了,只要你在linux上装过软件,基本都一样的。thrift的功能很强大,其中之一是异构系统数据交换的一个媒介。想比于rest,他的效率更好。相比于rpc,又没那么复杂,总之是个不错的东西。 如果你有类似的服务提供给别人,可以考虑用thrift; 而hbase的除了native方式的,所以想提供给其他语言访问,hbase就是用了thrift接口。所以,在hbase的源码里会包含一个 Hbase.thrift的定义文件(这个我们是需要的)2 用thrift 命令行接口生成各语言客户端包文件。这个就要用到Hbase.thrift 这个定义文件了。可以生成各种主流语言的 gen-[lang] 之类的文件夹了。在哪个目录生成不重要,我稍后会说一下文件部署结构。3 假设我现在在 ~/test/ 这个目录下,以下所有的说明及命令,我的pwd都是这里。 不加特殊说明,我及不列绝对路径了。4 把thrfit源码包里对应语言的 lib包,copy到当前目录(~/test/ ) , 以php为例# mkdir php-src   perl-src  thrift (本来我想做成py-src的,但是调试时有点问题,就姑且把python的包先放入到这个文件夹吧)# cp -r xxx/thrift0.8.0/lib/php/src/*  php-src# cp -r xx/thrift0.8.0/lib/php/lib/perl/lib/* perl-src# cp -r xxx/thrift0.8.0/lib/py/src/*  thrift5 # 把第2 步骤生成的 gen-[lang] 对应的文件mv 到 php-src/packages/ 里(这里的packages文件夹需要你先建立好,当然用其他名字也行;比如 pkg)6 python perl的也这样mv 过来。但是语言要对应。 perl 到 perl-src/packages. python 到 thrift/packages/7 接下来是每种语言的脚本php:
$GLOBALS['THRIFT_ROOT'] = './php-src';require_once( $GLOBALS['THRIFT_ROOT'] . '/Thrift.php' );require_once( $GLOBALS['THRIFT_ROOT'] . '/transport/TSocket.php' );require_once( $GLOBALS['THRIFT_ROOT'] . '/transport/TBufferedTransport.php');require_once( $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinaryProtocol.php' );// thrift 文件生成的require_once( $GLOBALS['THRIFT_ROOT'] . '/packages/Hbase/Hbase.php' );  // 注意这里的packages,就是第5 步里建立的那个文件夹名$table  = 'your_table';$host   = '192.168.1.10';$port   = '9090';$filter ="PrefixFilter('123_')";$socket = new TSocket($host, $port);$socket->setSendTimeout(10000); //$socket->setRecvTimeout(20000); // Twenty seconds$transport = new TBufferedTransport($socket);$protocol  = new TBinaryProtocol($transport);$client    = new HbaseClient($protocol);try{$transport->open();$scan = new TScan();$scan->filterString=$filter;$scanner = $client->scannerOpenWithScan($table, $scan);for($i=0; $i<100; $i++) {echo "============= $i =============== \n";$get_arr = $client->scannerGetList($scanner,1);if(!$get_arr) break;foreach ( $get_arr as $rowresult ){foreach ($rowresult as $k=>$item) {echo "row: ".$rowresult->{'row'}."\n";echo "cols: ";print_r($rowresult->{'columns'} );echo "-----\n";}}}$client->scannerClose($scan );$transport->close();}catch (Exception $e) {echo "";}
perl:
#!/bin/env perluse strict;use warnings;use Data::Dumper;use lib './perl-src';use lib './perl-src/packages';use Thrift;use Thrift::BinaryProtocol;use Thrift::Socket;use Thrift::BufferedTransport;use Hbase::Hbase;my $host   = '192.168.1.10';my $port   = '9090';my $table  = 'your_table';my $filter ="PrefixFilter('123_')";my $socket    = new Thrift::Socket($host,$port);my $transport = new Thrift::BufferedTransport($socket,1024,1024);my $protocol  = new Thrift::BinaryProtocol($transport);my $client    = new Hbase::HbaseClient($protocol);eval{$transport->open();my $scan = new Hbase::TScan();$scan->{filterString} = $filter;my $scanner = $client->scannerOpenWithScan($table, $scan);for(1..100) {print "============ $_ ========\n";my $get_arr = $client->scannerGetList($scanner,1);last unless @$get_arr;#print Dumper ($get_arr);foreach my $rowresult ( @$get_arr) {foreach my $k (keys %$rowresult) {print "row: ",$rowresult->{row}, $/;print "cols: ";print Dumper $rowresult->{columns};print "-----\n";}}}$client->scannerClose( $scan );$transport->close();};if($@){warn(Dumper($@));}
python:
#!/bin/env python#-*- coding: utf-8 -*-import syssys.path.append('./thrift/packages')sys.path.append('./thrift')from thrift import Thriftfrom thrift.transport import TSocketfrom thrift.transport import TTransportfrom thrift.protocol import TBinaryProtocolfrom thrift.packages.hbase import Hbasefrom thrift.packages.hbase.ttypes import *host    = '192.168.1.10'port    = 9090table   = 'your_table'filter  = "PrefixFilter('123_')"transport = TSocket.TSocket(host, port)transport = TTransport.TBufferedTransport(transport)protocol = TBinaryProtocol.TBinaryProtocol(transport)client = Hbase.Client(protocol)try:transport.open()scan = TScan()scan.filterString=filterscanner = client.scannerOpenWithScan(table, scan)for i in range(1,100):print "============%d============" %(i)get_arr = client.scannerGetList(scanner,1)if not get_arr :break;for  rowresult in get_arr:print "row: %s" % (rowresult.row)for k in rowresult.columns:print "colomns: %s" % (k)print '-----'client.scannerClose( scan )transport.close()except AlreadyExists, tx:print "Thrift exception"print '%s' % (tx.message)except:print "error"
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: