您的位置:首页 > 编程语言 > PHP开发

php通过thrift操作hbase

2012-06-19 10:40 381 查看
环境配置

操作系统 centos 5.8 hadoop版本cloudera cdh3u3 hbase版本hbase-0.90.4-cdh3u3 php版本5.2
1. 下载并编译thrift
# wget http://ftp.tc.edu.tw/pub/Apache/thrift/0.8.0/thrift-0.8.0.tar.gz 安装所需的依赖包
# yum install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel php php-devel
# tar zxvf thrift-0.8.0.tar.gz
# cd thrift-0.8.0
# ./configure --prefix=/home/thrift --with-php-config=/usr/bin/php-config
# make && make install
2 生成php和hbase的接口文件:
# cd /home/thrift/
# bin/thrift --gen php $HBASE_HOME/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
# cd gen-php/Hbase
# ls
Hbase.php Hbase_types.php

3. 把PHP客户端需要的包及刚才生成的接口文件复制出来供php程序调用:
# mkdir -p /var/www/html/hbasethrift/libs (/var/www/html为apache的web主目录)
# cp -a /home/soft/thrift-0.8.0/lib/php/src /var/www/html/hbasethrift/libs
# mkdir -p /var/www/html/hbasethrift/libs/packages
# cp -a /home/thrift/gen-php/Hbase /var/www/html/hbasethrift/libs/packages
4. 启动hbase thrift server,测试php连接hbase
# ./bin/hbase-daemon.sh start thrift
hbase thrift 默认监听端口为9090
测试php连接与操作hbase代码
# vi hbasethrift.php

<?php
$GLOBALS['THRIFT_ROOT'] = '/home/www/html/hbasethrift/libs';
require_once( $GLOBALS['THRIFT_ROOT'].'/Thrift.php' );
require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php' );
require_once( $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php' );
require_once( $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php' );
require_once( $GLOBALS['THRIFT_ROOT'].'/packages/Hbase/Hbase.php' );
$socket = new TSocket( 'localhost', 9090 );
$socket->setSendTimeout( 10000 ); // Ten seconds (too long for production, but this is just a demo ;)
$socket->setRecvTimeout( 20000 ); // Twenty seconds
$transport = new TBufferedTransport( $socket );
$protocol = new TBinaryProtocol( $transport );
$client = new HbaseClient( $protocol );
$transport->open();
echo nl2br( "listing tables...\n" );
$tables = $client->getTableNames();
sort( $tables );
foreach ( $tables as $name ) {
echo nl2br( "  found: {$name}\n" );
}
$columns = array(
new ColumnDescriptor( array(
'name' => 'entry:',
'maxVersions' => 10
) ),
new ColumnDescriptor( array(
'name' => 'unused:'
) )
);
$t = "table1";
echo( "creating table: {$t}\n" );
try {
$client->createTable( $t, $columns );
} catch ( AlreadyExists $ae ) {
echo( "WARN: {$ae->message}\n" );
}
$t = "test";
echo( "column families in {$t}:\n" );
$descriptors = $client->getColumnDescriptors( $t );
asort( $descriptors );
foreach ( $descriptors as $col ) {
echo( "  column: {$col->name}, maxVer: {$col->maxVersions}\n" );
}
$t = "table1";
echo( "column families in {$t}:\n" );
$descriptors = $client->getColumnDescriptors( $t );
asort( $descriptors );
foreach ( $descriptors as $col ) {
echo( "  column: {$col->name}, maxVer: {$col->maxVersions}\n" );
}
$t = "table1";
$row = "row_name";
$valid = "foobar-\xE7\x94\x9F\xE3\x83\x93";
$mutations = array(
new Mutation( array(
'column' => 'entry:foo',
'value' => $valid
) ),
);
// 多记录批量提交(200提交一次时测试小记录大概在5000/s左右):  $rows = array('timestamp'=>$timestamp, 'columns'=>array('txt:col1'=>$col1, 'txt:col2'=>$col2, 'txt:col3'=>$col3));  $records = array(rowkey=>$rows,...);  $batchrecord = array();  foreach ($records as $rowkey => $rows) {      $timestamp = $rows['timestamp'];      $columns = $rows['columns'];      // 生成一条记录      $record = array();      foreach($columns as $column => $value) {          $col = new Mutation(array('column'=>$column, 'value'=>$value));          array_push($record, $col);      }      // 加入记录数组      $batchTmp = new BatchMutation(array('row'=>$rowkey, 'mutations'=>$record));      array_push($batchrecord, $batchTmp);  }  $ret = $hbase->mutateRows('test', $batchrecord);

$client->mutateRow( $t, $row, $mutations );
$table_name = "table1";
$row_name = 'row_name';
$fam_col_name = 'entry:foo';
$arr = $client->get($table_name, $row_name , $fam_col_name);
// $arr = array
foreach ( $arr as $k=>$v  ) {
// $k = TCell
echo ("value = {$v->value} , <br>  ");
echo ("timestamp = {$v->timestamp}  <br>");
}
$table_name = "table1";
$row_name = "row_name";
$arr = $client->getRow($table_name, $row_name);
// $client->getRow return a array
foreach ( $arr as $k=>$TRowResult  ) {
// $k = 0 ; non-use
// $TRowResultTRowResult = TRowResult
var_dump($TRowResult);
}

//scannerOpenWithStop($tableName, $startRow, $stopRow, $columns);
$table_name = 'zTest';
$startRow="9-9-20120627-";
$stopRow="9-9-20120627_";
$columns = Array ('info:');
$result =$client->scannerOpenWithStop($table_name,$startRow,$stopRow,$columns);
while (true) {
$record = $client->scannerGet($result);
if ($record == NULL) {
break;
}

foreach($record as $TRowResult) {
$row = $TRowResult->row;
$column = $TRowResult->columns;
foreach($column as $family_column=>$Tcell){
echo("$family_column={$Tcell->value}<br>");
echo("timestamp is $Tcell->timestamp");
}
}
}
$transport->close();
?>

通过浏览器访问http://localhost/hbasethrift/hbasethrift.php,如果显示hbase中的表名与新建表table1 ,说明连接成功。

hbase thrift api 参考http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/thrift/doc-files/index.html

参考http://www.banping.com/2011/07/08/hbase-thrift-php/

本文出自 “东杰书屋” 博客,请务必保留此出处http://jiedushi.blog.51cto.com/673653/902550
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: