您的位置:首页 > 编程语言 > C语言/C++

使用C++(通过Thrift)访问/操作/读写Hbase

2012-03-05 13:09 441 查看
(1)安装Thrift;(2)用Thrift 生成访问Hbase所需的C++文件;(3)在程序中通过Thrift来访问Hbase。

另外,本文只包含读写Hbase数据的例子,不包含配置Hbase的方法,如需这些内容,请自行搜索。

首先声明一下,本文基于以下环境:

操作系统:RHEL 5.3,64位

Thrift 版本:0.7.0

要访问的 Hbase 版本:0.20.6

我使用0.90.4的 Hbase 安装包来生成C++所需的Hbase.h等文件(用新版的应该能兼容旧版的)

下面开始,一步步来。

(1)安装Thrift

不是一件很轻松的事。如果你的系统比较干净,可能很顺利地就搞定了,如果有依赖库缺失问题或者库冲突问题,那么就只能根据具体情况,一个个问题去fix了,谁知道会有多少麻烦。我运气比较好,在一个干净的系统上,很快就完成了。

Thrift 至少会依赖于两个系统中一般不会带的库:libevent,boost。

libevent 到这里下载:http://monkey.org/~provos/libevent/ 我使用的版本是:2.0.12-stable

boost 到这里下载:http://www.boost.org/ 我使用的版本是:1.47.0

安装libevent:

./configure --prefix=/usr/local/libevent
make
make install


安装boost(boost不像一般的Linux源码安装包一样,它的安装不是configure,make,make install,有点怪):



./bootstrap.sh --prefix=/usr/local/boost

不出错的话接着执行以下命令开始编译(也可以通过编辑project-config.jam文件调整编译参数):



./b2

./b2 install

安装Thrift:



chmod +x configure

./configure --with-boost=/usr/local --prefix=/usr/local/thrift

make

make install

至此,安装Thrift 的工作就完成了。

(2)用Thrift 生成访问Hbase所需的C++文件

访问Hbase需要在你的程序中使用若干.h,.cpp文件,这些文件是用 Thrift 生成的。

解压Hbase源码安装包:



tar zxf hbase-0.90.4.tar.gz

cd hbase-0.90.4

在解压出来的文件中, 你可以找到一个名为 Hbase.thrift 的文件,这个文件定义了如何通过 Thrift 接口来访问Hbase。用这个Thrift文件,可以生成访问Hbase所需的C++文件:

[shel]

/usr/local/thrift/bin/thrift –gen cpp ./src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift

[/shell]

会发现生成了gen-cpp目录:



ls gen-cpp/



Hbase_constants.cpp Hbase_constants.h Hbase.cpp Hbase.h Hbase_server.skeleton.cpp Hbase_types.cpp Hbase_types.h

除了Hbase_server.skeleton.cpp之外,其余文件都是在我们的程序里要用到的,将它们拷贝到我们的工程目录下。

(3)在程序中使用Thrift来访问Hbase

要能通过 Thrift 访问Hbase,你必须首先要打开HBase的 Thrift 服务,请参考其他文档确保这一点是可用的。

下一步,我们在程序中如何读取Hbase的数据?我们先看看hbase源码安装包中自带的例子:在解压出来的安装包中的 examples/thrift/ 目录下的 DemoClient.cpp 文件,有如下代码:



boost::shared_ptr<TTransport> socket(new TSocket("localhost", 9090));

boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));

boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));

HbaseClient client(protocol);

try {

transport->open();



// do something



transport->close();

} catch (TException &tx) {

printf("ERROR: %s\n", tx.what());

}

我们就仿照这个例子来做。从DemoClient.cpp可见,我们要先创建三个指针socket,transport和protocol,后两个分别依赖于前两个,最后再创建一个client对象,我们操作Hbase就是使用这个client对象。在操作Hbase前,需要先打开到Hbase Thrift service的连接,即 transport->open(),在操作完 Hbase之后,需要关闭连接,即 transport->close(),这下就比较清楚了:我们可以写一个自己的类CHbaseOperate,它应该有一个open函数和一个close函数,分别用于打开、关闭连接,还应该有读写Hbase的基本功能。读写Hbase的方法,请参考Hbase.h中的函数,例子还是看DemoClient.cpp。

下面上代码:

HbaseOperate.h:



#ifndef __HBASE_OPERATE_H

#define __HBASE_OPERATE_H



#include <string>

#include <protocol/TBinaryProtocol.h>

#include <transport/TSocket.h>

#include <transport/TTransportUtils.h>

#include "Hbase.h"



/**

* Class to operate Hbase.

*

* @author Darran Zhang (codelast.com)

* @version 11-08-24

* @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.

* You must not remove this declaration at any time.

*/



using namespace apache::thrift;

using namespace apache::thrift::protocol;

using namespace apache::thrift::transport;

using namespace apache::hadoop::hbase::thrift;



typedef struct hbaseRet {

std::string rowValue;

time_t ts;



hbaseRet() {

ts = 0;

}



} hbaseRet;



class CHbaseOperate

{

public:

CHbaseOperate();

virtual ~CHbaseOperate();



private:

boost::shared_ptr<TTransport> socket;

boost::shared_ptr<TTransport> transport;

boost::shared_ptr<TProtocol> protocol;



HbaseClient *client;



std::string hbaseServiceHost;

int hbaseServicePort;

bool isConnected;



public:

bool connect();



bool connect(std::string host, int port);



bool disconnect();



bool putRow(const std::string &tableName,

const std::string &rowKey,

const std::string &column,

const std::string &rowValue);



bool getRow(hbaseRet &result,

const std::string &tableName,

const std::string &rowKey,

const std::string &columnName);

};



#endif

HbaseOperate.cpp:



#include "HbaseOperate.h"

#include "log4cxx/log4cxx.h"

#include "log4cxx/propertyconfigurator.h"



static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("HbaseOperate.cpp"));



/**

* Class to operate Hbase.

*

* @author Darran Zhang (codelast.com)

* @version 11-08-24

* @declaration These codes are only for non-commercial use, and are distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.

* You must not remove this declaration at any time.

*/



using namespace std;



CHbaseOperate::CHbaseOperate() :

socket((TSocket*)NULL), transport((TBufferedTransport*)NULL), protocol((TBinaryProtocol*)NULL), client(NULL), hbaseServicePort(9090), isConnected(false)

{

}



CHbaseOperate::~CHbaseOperate()

{

if (isConnected) {

disconnect();

}

if (NULL != client) {

delete client;

client = NULL;

}

}



/**

* Connect Hbase.

*

*/

bool CHbaseOperate::connect()

{

if (isConnected) {

LOG4CXX_INFO(logger, "Already connected, don't need to connect it again");

return true;

}



try {

socket.reset(new TSocket(hbaseServiceHost, hbaseServicePort));

transport.reset(new TBufferedTransport(socket));

protocol.reset(new TBinaryProtocol(transport));



client = new HbaseClient(protocol);



transport->open();

} catch (const TException &tx) {

LOG4CXX_ERROR(logger, "Connect Hbase error : " << tx.what());

return false;

}



isConnected = true;

return isConnected;

}



/**

* Connect Hbase.

*

*/

bool CHbaseOperate::connect(std::string host, int port)

{

hbaseServiceHost = host;

hbaseServicePort = port;



return connect();

}



/**

* Disconnect from Hbase.

*

*/

bool CHbaseOperate::disconnect()

{

if (!isConnected) {

LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't disconnect from it");

return false;

}



if (NULL != transport) {

try {

transport->close();

} catch (const TException &tx) {

LOG4CXX_ERROR(logger, "Disconnect Hbase error : " << tx.what());

return false;

}

} else {

return false;

}



isConnected = false;

return true;

}



/**

* Put a row to Hbase.

*

* @param tableName [IN] The table name.

* @param rowKey [IN] The row key.

* @param column [IN] The "column family : qualifier".

* @param rowValue [IN] The row value.

* @return True for successfully put the row, false otherwise.

*/

bool CHbaseOperate::putRow(const string &tableName, const string &rowKey, const string &column, const string &rowValue)

{

if (!isConnected) {

LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't put row");

return false;

}



try {

std::vector<Mutation> mutations;

mutations.push_back(Mutation());

mutations.back().column = column;

mutations.back().value = rowValue;

client->mutateRow(tableName, rowKey, mutations);



} catch (const TException &tx) {

LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what());

return false;

}



return true;

}



/**

* Get a Hbase row.

*

* @param result [OUT] The object which contains the returned data.

* @param tableName [IN] The Hbase table name, e.g. "MyTable".

* @param rowKey [IN] The Hbase row key, e.g. "kdr23790".

* @param columnName [IN] The "column family : qualifier".

* @return True for successfully get the row, false otherwise.

*/

bool CHbaseOperate::getRow(hbaseRet &result, const std::string &tableName, const std::string &rowKey, const std::string &columnName)

{

if (!isConnected) {

LOG4CXX_ERROR(logger, "Haven't connected to Hbase yet, can't read data from it");

return false;

}



std::vector<std::string> columnNames;

columnNames.push_back(columnName);



std::vector<TRowResult> rowResult;

try {

client->getRowWithColumns(rowResult, tableName, rowKey, columnNames);

} catch (const TException &tx) {

LOG4CXX_ERROR(logger, "Operate Hbase error : " << tx.what());

return false;

}



if (0 == rowResult.size()) {

LOG4CXX_WARN(logger, "Got no record with the key : [" << rowKey << "]");

return false;

}



std::map<std::string, TCell>::const_iterator it = rowResult[rowResult.size() -1].columns.begin();

result.rowValue = it->second.value;

result.ts = it->second.timestamp;



return true;

}

注意我在程序中使用了Apache log4cxx这个记录日志的库来打印/保存程序运行日志 。如果你不想用,可以自己改为std::cout。

代码有了,使用方法为:可以在你的程序中创建一个全局对象:

CHbaseOperate g_ho;

在需要操作Hbase之前:



g_ho.connect("192.168.55.66", 9090);

其中,“192.168.55.66”和9090分别是你的Hbase Thrift service的服务器地址和端口号,你需要正确地配置好,才能使用。本文开头已经说了,本文不讨论这方面的问题。

在操作完Hbase之后:



g_ho.disconnect();

现在再来说一下读写操作Hbase的两个函数:putRow()和getRow()。

putRow():



bool putRow(const std::string &tableName,

const std::string &rowKey,

const std::string &column,

const std::string &rowValue);

这是向Hbase写入一条记录的函数,参数tableName为Hbase表名,即你要将记录写到哪个Hbase表中;参数rowKey为待写入的记录的key;参数column为待写入的记录的“column family:qualifier”组合,参数rowValue为待写入的记录的value。

getRow():



bool getRow(hbaseRet &result,

const std::string &tableName,

const std::string &rowKey,

const std::string &columnName);

这是从Hbase中读取一条记录的函数,参数tableName为Hbase表名,即你要从哪个Hbase表中读取记录;参数rowKey为你要查询的记录的key;参数columnName为你要查询的记录的“column family:qualifier”组合;参数result为返回的Hbase的数据,它包含记录的value和记录的时间戳:



typedef struct hbaseRet {

std::string rowValue;

time_t ts;



hbaseRet() {

ts = 0;

}



} hbaseRet;

至于操作的结果对不对,可以在Hbase shell中用get, scan等命令来验证,具体方法请看Hbase shell的help。另外,最好再写一些unit test来测试。

如果你要为CHbaseOperate类添加功能,可以参考Hbase.h文件中的函数定义。如你所见,CHbaseOperate类主要也是调用了里面的函数,只不过这个类可以让一些不太熟悉Hbase概念的人可以更方便地操作Hbase罢了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: