Flume thrift source C++ Demo
2015-12-30 10:35
531 查看
#include "gen-cpp/flume_constants.h" #include "gen-cpp/flume_types.h" #include "gen-cpp/ThriftSourceProtocol.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TCompactProtocol.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TTransportUtils.h> #include <vector> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; #define LOOP 200000 int right_num = 0; int error_num = 0; class ThriftClient{ public: /* Thrift protocol needings... */ boost::shared_ptr<TTransport> socket; boost::shared_ptr<TTransport> transport; boost::shared_ptr<TProtocol> protocol; ThriftSourceProtocolClient* pClient; public: void sendEvent(); ThriftClient(); }; ThriftClient::ThriftClient(): socket(new TSocket("10.0.6.227",9999)), transport(new TFramedTransport(socket)), protocol(new TCompactProtocol(transport)) { pClient = new ThriftSourceProtocolClient(protocol); } //transport(new TBufferedTransport(socket)), void ThriftClient::sendEvent() { std::map<std::string, std::string> headers; headers.insert(std::make_pair("head", "head")); std::string sBody = "TableName:TEST_TABLE ConfigID:5555 ResponseIP:77522222 ProtoType:67 StartTime:3333 Interval:24544 AccessTimes:45 DomainLen:12\n"; if(!transport->isOpen()) { transport->open(); } ThriftFlumeEvent tfEvent; tfEvent.__set_headers(headers); tfEvent.__set_body(sBody); Status::type res; int i=0; std::vector<ThriftFlumeEvent> eventbatch; for(;i<LOOP;i++){ //tfEvent.__set_body(sBody); eventbatch.clear(); int j=1; for(;j<=50;j++) eventbatch.push_back(tfEvent); res =pClient->appendBatch(eventbatch); if(res == Status::OK){ right_num++; }else{ error_num++; printf("WARNING: send event via thrift failed, return code:%d\n",res); } } } int main(int argc, char * argv[]){ //boost::shared_ptr<TTransport> socket(new TSocket("localhost", 5496)); //boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); //boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); ThriftClient *client = new ThriftClient(); client->sendEvent(); printf("RIGHT: success num:%d\n",right_num); printf("ERROR: failed num:%d\n",error_num); client->transport->close(); }
使用flume 的thrift source,用c++编写客户端的代码,批量追加信息。
相关文章推荐
- 基于HBase Thrift接口的一些使用问题及相关注意事项的详解
- Flume环境部署和配置详解及案例大全
- Play! Akka Flume实现的完整数据收集
- flume自定义Interceptor
- #Note# Analyzing Twitter Data with Apache Hadoo...
- sparksql与hive整合
- flume、kafka、storm常用命令
- 开源日志系统比较
- 用thrift实现多语言相互调用
- Thrift的required和optional源码分析
- Flume向HDFS写数据实例
- Python 通过thrift接口连接Hbase读取存储数据
- flume+log4j整合到web项目
- 详细图解 Flume介绍、安装配置-1
- flume部署
- flume实时抓取log数据,并传到kafka中
- flume NG 中文 Welcome to Apache Flume 第一页 醉了
- flume 高可用性 高可靠性 agent source
- flume介绍及扩展开发心得
- Apache Thrift原理及windows使用