您的位置:首页 > 编程语言 > C语言/C++

使用C++(通过Thrift)访问/操作/读写Hbase

2013-05-13 14:02 751 查看



要使用C++访问Hbase,可以走的途径少之又少,据说当前最好的方法就是通过Thrift来实现:http://thrift.apache.org/

所以本文分成几部分:(1)安装Thrift;(2)用Thrift
生成访问Hbase所需的C++文件;(3)在程序中通过Thrift来访问Hbase。

另外,本文只包含读写Hbase数据的例子,不包含配置Hbase的方法,如需这些内容,请自行搜索。

首先声明一下,本文基于以下环境:

操作系统:RHEL5.3,64位

Thrift版本:0.7.0

要访问的Hbase版本:0.20.6

我使用0.90.4的Hbase安装包来生成C++所需的Hbase.h等文件(用新版的应该能兼容旧版的)

文章来源:http://www.codelast.com/

下面开始,一步步来。

(1)安装Thrift

不是一件很轻松的事。如果你的系统比较干净,可能很顺利地就搞定了,如果有依赖库缺失问题或者库冲突问题,那么就只能根据具体情况,一个个问题去fix了,谁知道会有多少麻烦。

我运气比较好,在一个干净的系统上,很快就完成了。

Thrift至少会依赖于两个系统中一般不会带的库:libevent,boost。

libevent到这里下载:http://monkey.org/~provos/libevent/
我使用的版本是:2.0.12-stable

boost到这里下载:http://www.boost.org/
我使用的版本是:1.47.0

文章来源:http://www.codelast.com/

安装libevent:

1

2

3
.
/configure

--prefix=
/usr/local/libevent


make


make

install


安装boost(boost不像一般的Linux源码安装包一样,它的安装不是configure,make,makeinstall,有点怪):

1
.
/bootstrap
.sh--prefix=
/usr/local/boost


不出错的话接着执行以下命令开始编译(也可以通过编辑project-config.jam文件调整编译参数):

1

2
.
/b2


.
/b2

install


安装Thrift:

1

2

3

4
chmod

+xconfigure


.
/configure

--with-boost=
/usr/local

--prefix=
/usr/local/thrift


make


make

install


至此,安装Thrift的工作就完成了。

文章来源:http://www.codelast.com/

(2)用Thrift生成访问Hbase所需的C++文件

访问Hbase需要在你的程序中使用若干.h,.cpp文件,这些文件是用Thrift生成的。

解压Hbase源码安装包:

1

2
tar

zxfhbase-0.90.4.
tar
.gz


cd

hbase-0.90.4


在解压出来的文件中,你可以找到一个名为Hbase.thrift的文件,这个文件定义了如何通过Thrift接口来访问Hbase。用这个Thrift文件,可以生成访问Hbase所需的C++文件:

1
/usr/local/thrift/bin/thrift

--gencpp.
/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase
.thrift


会发现生成了gen-cpp目录:

1
ls

gen-cpp/


输出:

Hbase_constants.cpp
Hbase_constants.hHbase.cppHbase.hHbase_server.skeleton.cppHbase_types.cppHbase_types.h

除了Hbase_server.skeleton.cpp之外,其余文件都是在我们的程序里要用到的,将它们拷贝到我们的工程目录下。

文章来源:http://www.codelast.com/

(3)在程序中使用Thrift来访问Hbase

要能通过Thrift访问Hbase,你必须首先要打开HBase的Thrift服务,请参考其他文档确保这一点是可用的。

下一步,我们在程序中如何读取Hbase的数据?

我们先看看hbase源码安装包中自带的例子:在解压出来的安装包中的examples/thrift/目录下的DemoClient.cpp文件,有如下代码:

01

02

03

04

05

06

07

08

09

10

11

12

13
boost::shared_ptr<TTransport>
socket(
new

TSocket(
"localhost"
,
9090));


boost::shared_ptr<TTransport>
transport(
new

TBufferedTransport(socket));


boost::shared_ptr<TProtocol>
protocol(
new

TBinaryProtocol(transport));


HbaseClient
client(protocol);


try

{


transport->open();


//
dosomething


transport->close();


}

catch
(TException&tx){


printf
(
"ERROR:
%s\n"
,
tx.what());


}


我们就仿照这个例子来做。从DemoClient.cpp可见,我们要先创建三个指针socket,transport和protocol,后两个分别依赖于前两个,最后再创建一个client对象,我们操作Hbase就是使用这个client对象。在操作Hbase前,需要先打开到Hbase
Thriftservice的连接,即transport->open(),在操作完Hbase之后,需要关闭连接,即transport->close(),这下就比较清楚了:我们可以写一个自己的类CHbaseOperate,它应该有一个connect函数和一个disconnect函数,分别用于打开、关闭连接,还应该有读写Hbase的基本功能。读写Hbase的方法,请参考Hbase.h中的函数,例子还是看DemoClient.cpp。

文章来源:http://www.codelast.com/

下面上代码:

HbaseOperate.h:

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69
#ifndef
__HBASE_OPERATE_H


#define
__HBASE_OPERATE_H


#include
<string>


#include
<protocol/TBinaryProtocol.h>


#include
<transport/TSocket.h>


#include
<transport/TTransportUtils.h>


#include
"Hbase.h"


/**


*
ClasstooperateHbase.


*


*
@authorDarranZhang(codelast.com)


*
@version11-08-24


*
@declarationThesecodesareonlyfornon-commercialuse,andaredistributedonan"ASIS"basis,WITHOUTWARRANTYOFANYKIND,eitherexpressorimplied.


*
Youmustnotremovethisdeclarationatanytime.


*/


using

namespace
apache::thrift;


using

namespace
apache::thrift::protocol;


using

namespace
apache::thrift::transport;


using

namespace
apache::hadoop::hbase::thrift;


typedef

struct

hbaseRet{


std::string
rowValue;


time_t

ts;


hbaseRet(){


ts
=0;


}


}
hbaseRet;


class

CHbaseOperate


{


public
:


CHbaseOperate();


virtual

~CHbaseOperate();


private
:


boost::shared_ptr<TTransport>
socket;


boost::shared_ptr<TTransport>
transport;


boost::shared_ptr<TProtocol>
protocol;


HbaseClient
*client;


std::string
hbaseServiceHost;


int

hbaseServicePort;


bool

isConnected;


public
:


bool

connect();


bool


connect(std::stringhost,
int
port);


bool

disconnect();


bool

putRow(
const


std::string&tableName,


const


std::string&rowKey,


const


std::string&column,


const


std::string&rowValue);


bool


getRow(hbaseRet&result,


const


std::string&tableName,


const


std::string&rowKey,


const


std::string&columnName);


};


#endif


文章来源:http://www.codelast.com/

HbaseOperate.cpp:

001

002

003

004

005

006

007

008

009

010

011

012

013

014

015

016

017

018

019

020

021

022

023

024

025

026

027

028

029

030

031

032

033

034

035

036

037

038

039

040

041

042

043

044

045

046

047

048

049

050

051

052

053

054

055

056

057

058

059

060

061

062

063

064

065

066

067

068

069

070

071

072

073

074

075

076

077

078

079

080

081

082

083

084

085

086

087

088

089

090

091

092

093

094

095

096

097

098

099

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168
#include
"HbaseOperate.h"


#include
"log4cxx/log4cxx.h"


#include
"log4cxx/propertyconfigurator.h"


static


log4cxx::LoggerPtrlogger(log4cxx::Logger::getLogger(
"HbaseOperate.cpp"
));


/**


*
ClasstooperateHbase.


*


*
@authorDarranZhang(codelast.com)


*
@version11-08-24


*
@declarationThesecodesareonlyfornon-commercialuse,andaredistributedonan"ASIS"basis,WITHOUTWARRANTYOFANYKIND,eitherexpressorimplied.


*
Youmustnotremovethisdeclarationatanytime.


*/


using

namespace
std;


CHbaseOperate::CHbaseOperate()
:


socket((TSocket*)NULL),
transport((TBufferedTransport*)NULL),protocol((TBinaryProtocol*)NULL),client(NULL),hbaseServicePort(9090),isConnected(
false
)


{


}


CHbaseOperate::~CHbaseOperate()


{


if

(isConnected){


disconnect();


}


if

(NULL!=client){


delete

client;


client
=NULL;


}


}


/**


*
ConnectHbase.


*


*/


bool

CHbaseOperate::connect()


{


if

(isConnected){


LOG4CXX_INFO(logger,


"Alreadyconnected,don'tneedtoconnectitagain"
);


return

true
;


}


try

{


socket.reset(
new

TSocket(hbaseServiceHost,hbaseServicePort));


transport.reset(
new

TBufferedTransport(socket));


protocol.reset(
new

TBinaryProtocol(transport));


client
=
new
HbaseClient(protocol);


transport->open();


}

catch
(
const


TException&tx){


LOG4CXX_ERROR(logger,


"ConnectHbaseerror:"

<<tx.what());


return

false
;


}


isConnected
=
true
;


return

isConnected;


}


/**


*
ConnectHbase.


*


*/


bool

CHbaseOperate::connect(std::stringhost,
int
port)


{


hbaseServiceHost
=host;


hbaseServicePort
=port;


return

connect();


}


/**


*
DisconnectfromHbase.


*


*/


bool

CHbaseOperate::disconnect()


{


if

(!isConnected){


LOG4CXX_ERROR(logger,


"Haven'tconnectedtoHbaseyet,can'tdisconnectfromit"
);


return

false
;


}


if

(NULL!=transport){


try

{


transport->close();


}

catch
(
const


TException&tx){


LOG4CXX_ERROR(logger,


"DisconnectHbaseerror:"

<<tx.what());


return

false
;


}


}

else
{


return

false
;


}


isConnected
=
false
;


return

true
;


}


/**


*
PutarowtoHbase.


*


*
@paramtableName[IN]Thetablename.


*
@paramrowKey[IN]Therowkey.


*
@paramcolumn[IN]The"columnfamily:qualifier".


*
@paramrowValue[IN]Therowvalue.


*
@returnTrueforsuccessfullyputtherow,falseotherwise.


*/


bool

CHbaseOperate::putRow(
const


string&tableName,
const

string&rowKey,
const

string&column,
const

string&rowValue)


{


if

(!isConnected){


LOG4CXX_ERROR(logger,


"Haven'tconnectedtoHbaseyet,can'tputrow"
);


return

false
;


}


try

{


std::vector<Mutation>
mutations;


mutations.push_back(Mutation());


mutations.back().column
=column;


mutations.back().value
=rowValue;


client->mutateRow(tableName,
rowKey,mutations);


}

catch
(
const


TException&tx){


LOG4CXX_ERROR(logger,


"OperateHbaseerror:"

<<tx.what());


return

false
;


}


return

true
;


}


/**


*
GetaHbaserow.


*


*
@paramresult[OUT]Theobjectwhichcontainsthereturneddata.


*
@paramtableName[IN]TheHbasetablename,e.g."MyTable".


*
@paramrowKey[IN]TheHbaserowkey,e.g."kdr23790".


*
@paramcolumnName[IN]The"columnfamily:qualifier".


*
@returnTrueforsuccessfullygettherow,falseotherwise.


*/


bool

CHbaseOperate::getRow(hbaseRet&result,
const

std::string&tableName,
const

std::string&rowKey,
const

std::string&columnName)


{


if

(!isConnected){


LOG4CXX_ERROR(logger,


"Haven'tconnectedtoHbaseyet,can'treaddatafromit"
);


return

false
;


}


std::vector<std::string>
columnNames;


columnNames.push_back(columnName);


std::vector<TRowResult>
rowResult;


try

{


client->getRowWithColumns(rowResult,
tableName,rowKey,columnNames);


}

catch
(
const


TException&tx){


LOG4CXX_ERROR(logger,


"OperateHbaseerror:"

<<tx.what());


return

false
;


}


if

(0==rowResult.size()){


LOG4CXX_WARN(logger,


"Gotnorecordwiththekey:["

<<rowKey<<
"]"
);


return

false
;


}


std::map<std::string,
TCell>::const_iteratorit=rowResult[rowResult.size()-1].columns.begin();


result.rowValue
=it->second.value;


result.ts
=it->second.timestamp;


return

true
;


}


注意我在程序中使用了Apache
log4cxx这个记录日志的库来打印/保存程序运行日志,使用方法可参考【此链接】。如果你不想用,可以自己改为std::cout。

代码有了,使用方法为:可以在你的程序中创建一个全局对象:

1
CHbaseOperate
g_ho;


在需要操作Hbase之前:

1
g_ho.connect(
"192.168.55.66"
,
9090);


其中,“192.168.55.66”和9090分别是你的HbaseThriftservice的服务器地址和端口号,你需要正确地配置好,才能使用。本文开头已经说了,本文不讨论这方面的问题。

在操作完Hbase之后:

1
g_ho.disconnect();


文章来源:http://www.codelast.com/

现在再来说一下读写操作Hbase的两个函数:putRow()和getRow()。putRow():

1

2

3

4
bool

putRow(
const


std::string&tableName,


const


std::string&rowKey,


const


std::string&column,


const


std::string&rowValue);


这是向Hbase写入一条记录的函数,参数tableName为Hbase表名,即你要将记录写到哪个Hbase表中;参数rowKey为待写入的记录的key;参数column为待写入的记录的“column
family:qualifier”组合,参数rowValue为待写入的记录的value。

getRow():

1

2

3

4
bool


getRow(hbaseRet&result,


const


std::string&tableName,


const


std::string&rowKey,


const


std::string&columnName);


这是从Hbase中读取一条记录的函数,参数tableName为Hbase表名,即你要从哪个Hbase表中读取记录;参数rowKey为你要查询的记录的key;参数columnName为你要查询的记录的“column
family:qualifier”组合;参数result为返回的Hbase的数据,它包含记录的value和记录的时间戳:

1

2

3

4

5

6

7

8

9
typedef

struct

hbaseRet{


std::string
rowValue;


time_t

ts;


hbaseRet(){


ts
=0;


}


}
hbaseRet;


文章来源:http://www.codelast.com/

至于操作的结果对不对,可以在Hbase
shell中用get,scan等命令来验证,具体方法请看Hbaseshell的help。另外,最好再写一些unittest来测试。

如果你要为CHbaseOperate类添加功能,可以参考Hbase.h文件中的函数定义。如你所见,CHbaseOperate类主要也是调用了里面的函数,只不过这个类可以让一些不太熟悉Hbase概念的人可以更方便地操作Hbase罢了。

原文地址:http://www.codelast.com/?p=3303

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: