您的位置:首页 > 编程语言 > C语言/C++

Flume thrift source C++ Demo

2015-12-30 10:35 531 查看

#include "gen-cpp/flume_constants.h"
#include "gen-cpp/flume_types.h"
#include "gen-cpp/ThriftSourceProtocol.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <vector>

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
#define LOOP 200000

int right_num = 0;
int error_num = 0;

class ThriftClient{
public:
/* Thrift protocol needings... */
boost::shared_ptr<TTransport> socket;
boost::shared_ptr<TTransport> transport;
boost::shared_ptr<TProtocol> protocol;
ThriftSourceProtocolClient* pClient;

public:
void sendEvent();
ThriftClient();

};

ThriftClient::ThriftClient():
socket(new TSocket("10.0.6.227",9999)),
transport(new TFramedTransport(socket)),
protocol(new TCompactProtocol(transport))
{
pClient = new ThriftSourceProtocolClient(protocol);
}

//transport(new TBufferedTransport(socket)),
void ThriftClient::sendEvent()
{
std::map<std::string, std::string>  headers;
headers.insert(std::make_pair("head", "head"));
std::string sBody = "TableName:TEST_TABLE ConfigID:5555 ResponseIP:77522222 ProtoType:67 StartTime:3333 Interval:24544 AccessTimes:45 DomainLen:12\n";
if(!transport->isOpen())
{
transport->open();
}
ThriftFlumeEvent tfEvent;
tfEvent.__set_headers(headers);
tfEvent.__set_body(sBody);
Status::type res;
int i=0;
std::vector<ThriftFlumeEvent> eventbatch;
for(;i<LOOP;i++){
//tfEvent.__set_body(sBody);
eventbatch.clear();
int j=1;
for(;j<=50;j++)
eventbatch.push_back(tfEvent);
res =pClient->appendBatch(eventbatch);
if(res == Status::OK){
right_num++;
}else{
error_num++;
printf("WARNING: send event via thrift failed, return code:%d\n",res);
}
}
}

int main(int argc, char * argv[]){
//boost::shared_ptr<TTransport> socket(new TSocket("localhost", 5496));
//boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
//boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
ThriftClient *client = new ThriftClient();
client->sendEvent();
printf("RIGHT: success num:%d\n",right_num);
printf("ERROR: failed num:%d\n",error_num);
client->transport->close();
}


使用flume 的thrift source,用c++编写客户端的代码,批量追加信息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  flume thrift