HadoopConsumer——消费kafka中若干topic的消息,追加存储至hdfs的不同文件内
2015-11-18 21:49
615 查看
在kafka源码提供的hadoopconsumer的基础上进行开发,该程序可消费多个topic的消息,追加至hdfs文件中。
间间隔Interval、每次消费的上限limit、以及三个输入输出文件的父目录
Offset.dat:输入文件,存储leader broker以及offset,根目录:input
Message.txt:输出文件,将消费的消费追加写入,根目录:outputfilepath
Output:输出文件夹,mapreduce的输出文件夹,不使用
1) 创建offset.dat存储topic、partition、leader broker IP、Port、以及offset值;
offset初始值为-1,之后保存上一次消费过的offset,下次hadoop-consumer被调用则继
续消费;
2) 为map-reduce的输出创建文件夹,他存储的文件名默认为part-00000;
3) 程序实现了直接向hdfs的文件追加消息的功能,所以还需创建进行追加的文件message.txt;
增加的功能:
首先依据配置文件获取topic列表,以及broker列表,然后获取每个topic的分区数,为每个分区创建以上的三个文件(文件夹);
创建offset.dat时要先获取该partition此刻的leader,然后写入文件;
使用线程池,依据topic的个数创建线程,一个topic开启一个线程去消费消息;
每个线程内的run函数里面sleep一定时间间隔后,无限次地调用SimpleKafkaETLJob的excute方法,不断地消费消息;
之后就是一个个job在执行;
runningJob.isSuccessful()会判断此job是否执行成功,如果不成功,会输出信息;
在KafkaETLJob的createJobConf()函数中给JobConf增加一项配置:partition_id,原来只有topic;
增加的功能:
Next()函数消费消息,通过limit指定消费的数据量;
消费完之后输出消费的总量System.out.println("_readBytes"+_readBytes);
在本地新建文件,将context中得到消息的ArrayList一条一条写入其中,之后将整个文件追加至hdfs的message.txt文件;
而不是之前的一条一条消息追加;
最后重写offset.dat文件,记录本次消费过的offset值,便于下次继续消费;
增加的功能:
the_last_offset:创建一个变量,记录上一次被调用消费到的offset值;
message_list:消费的消息先存储于此,之后将list中的消息写入本地文件,再批量追加至hdfs;
获取此刻该partition的leader broker,使用新的leader创建request和consumer,开始消费;
_limit = props.getInt("kafka.request.limit", -1);
_offset = _offsetRange[0];
if((_offsetRange[0]+_limit) < _offsetRange[1])
the_last_offset = _offsetRange[0]+_limit;
else
the_last_offset = _offsetRange[1];
如果消息量大于limit,则只消费limit条数据,如果不足,则全部消费完;
if (response.hasError()),表明有错,打印错误,返回-1,
2) 获取offset,决定本次消费的范围getOffsetRange:
range[0]:最早的消息的offset,应该是0,
range[1]:此刻为止最新消息的offset,
由于我们是接着上一次继续消费,所以基本不会使用range[0],而是消费offset.dat中的offset和range[1]之间的消息;所以最重要的是防止range[1]的值出错;
所以:if (range[1] == -1) ,说明offset值有误,那么再次调用getLastOffset函数,重新尝试一次;
一般情况下不会,因为创建context对象,在构造函数中,实时获取了此刻的leader,之后就调用getoffset,所以短时间内应该不会变化,或有异常;
3) fetchMore:
增加了容错,
_response = _consumer.fetch(fetchRequest);
/* simpleconsumer自己没有处理leader broker坏掉的情况,所以需要自己处理 */
if (_response.hasError()) {
Offset无效的处理,选择最多消费一次(出现错误则丢掉部分数据,从最新的开始消费),或者至少消费一次(从offset.dat文件中的offset处继续消费,可能重复消费)的机制:
4) Get()获取每一条消息
//add to message list
message_list.add(new String(messagebytes, "UTF-8") + "\n");
添加换行符,加入message list中;
本程序的输入输出文件有:
配置文件:topics.properties,指定要消费的topic列表,broker列表,以及程序被调用的时间间隔Interval、每次消费的上限limit、以及三个输入输出文件的父目录
Offset.dat:输入文件,存储leader broker以及offset,根目录:input
Message.txt:输出文件,将消费的消费追加写入,根目录:outputfilepath
Output:输出文件夹,mapreduce的输出文件夹,不使用
DateGenerator
初始化操作,为每一个topic的每一个partition创建相关的文件夹以及文件,仅调用一次:1) 创建offset.dat存储topic、partition、leader broker IP、Port、以及offset值;
offset初始值为-1,之后保存上一次消费过的offset,下次hadoop-consumer被调用则继
续消费;
2) 为map-reduce的输出创建文件夹,他存储的文件名默认为part-00000;
3) 程序实现了直接向hdfs的文件追加消息的功能,所以还需创建进行追加的文件message.txt;
增加的功能:
首先依据配置文件获取topic列表,以及broker列表,然后获取每个topic的分区数,为每个分区创建以上的三个文件(文件夹);
创建offset.dat时要先获取该partition此刻的leader,然后写入文件;
KafkatoHDFSMain
增加的功能:使用线程池,依据topic的个数创建线程,一个topic开启一个线程去消费消息;
每个线程内的run函数里面sleep一定时间间隔后,无限次地调用SimpleKafkaETLJob的excute方法,不断地消费消息;
SimpleKafkaETLJob
构造函数中需指定topic,然后为该topic的每一个分区创建一个hadoop的job,消费消息并存储;之后就是一个个job在执行;
runningJob.isSuccessful()会判断此job是否执行成功,如果不成功,会输出信息;
在KafkaETLJob的createJobConf()函数中给JobConf增加一项配置:partition_id,原来只有topic;
KafkaETLRecordReader
通过KafkaETLInputFormat BytesWritable获取offset.dat文件的记录作为输入,消费指定topic、指定partition、指定offset的消息,然后先写入本地文件,之后将文件追加至message.txt增加的功能:
Next()函数消费消息,通过limit指定消费的数据量;
消费完之后输出消费的总量System.out.println("_readBytes"+_readBytes);
在本地新建文件,将context中得到消息的ArrayList一条一条写入其中,之后将整个文件追加至hdfs的message.txt文件;
而不是之前的一条一条消息追加;
最后重写offset.dat文件,记录本次消费过的offset值,便于下次继续消费;
KafkaETLContext
使用simpleconsumer的API,获取offset值,实时获取leader broker,一条一条消费消息,并存储至ArrayList中。增加的功能:
the_last_offset:创建一个变量,记录上一次被调用消费到的offset值;
message_list:消费的消息先存储于此,之后将list中的消息写入本地文件,再批量追加至hdfs;
获取此刻该partition的leader broker,使用新的leader创建request和consumer,开始消费;
_limit = props.getInt("kafka.request.limit", -1);
_offset = _offsetRange[0];
if((_offsetRange[0]+_limit) < _offsetRange[1])
the_last_offset = _offsetRange[0]+_limit;
else
the_last_offset = _offsetRange[1];
如果消息量大于limit,则只消费limit条数据,如果不足,则全部消费完;
容错处理:
1) 获取offset值getLastOffset:if (response.hasError()),表明有错,打印错误,返回-1,
2) 获取offset,决定本次消费的范围getOffsetRange:
range[0]:最早的消息的offset,应该是0,
range[1]:此刻为止最新消息的offset,
由于我们是接着上一次继续消费,所以基本不会使用range[0],而是消费offset.dat中的offset和range[1]之间的消息;所以最重要的是防止range[1]的值出错;
所以:if (range[1] == -1) ,说明offset值有误,那么再次调用getLastOffset函数,重新尝试一次;
一般情况下不会,因为创建context对象,在构造函数中,实时获取了此刻的leader,之后就调用getoffset,所以短时间内应该不会变化,或有异常;
3) fetchMore:
增加了容错,
_response = _consumer.fetch(fetchRequest);
/* simpleconsumer自己没有处理leader broker坏掉的情况,所以需要自己处理 */
if (_response.hasError()) {
Offset无效的处理,选择最多消费一次(出现错误则丢掉部分数据,从最新的开始消费),或者至少消费一次(从offset.dat文件中的offset处继续消费,可能重复消费)的机制:
4) Get()获取每一条消息
//add to message list
message_list.add(new String(messagebytes, "UTF-8") + "\n");
添加换行符,加入message list中;
总结:
至此,此程序可以消费任意多个topic的消息,并追加至以topic+partition命名的hdfs文件内。相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- Kafka 之 中级
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- java连接hdfs ha和调用mapreduce jar示例
- java实现将ftp和http的文件直接传送到hdfs
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序
- hadoop map-reduce中的文件并发操作
- Hadoop1.2中配置伪分布式的实例