kafka connect 数据写hdfs详解
2016-07-05 16:24
459 查看
曾经有位学者说过,“read the F**K code”。不多说,从TopicPartitionWriter的write方法中可以得到如下状态机:
Created with Raphaël 2.1.0WRITE_STARTEDWRITE_PARTITION_PAUSEDshouldRotate()SHOULD_ROTATETEMP_FILE_CLOSEDFILE_COMMITTEDyesno
1. 时间到,可配置,例如15分钟/60分钟写入一次等(非定时器实现);
2. 读取的record数量达到配置值;
代码详见,下类的shouldRotate方法:
1. 线程为每个kafka分区分配一个tmp文件;
2. 文件放在/topic/+tmp/分区/目录下;
3. 文件名为随机产生的UUID_tmp.parquet;
4. 不是每条记录都会直接和hdfs交互,parquet有内存缓存,写hdfs时机可以参见:
暗藏恢复处理流程:当新的thread接管分区的处理,或者thread从异常中恢复。会先根据该分区下最后写入的文件名获取最后的offset。
Kafka connect commit offset是不按套路出牌的,读了数据有个记录,然后定时commit。Image这种情况:数据已经写入了目标文件,但是还没有commit,那再读的时候,就会重复读数据。如果知道写入hdfs最后的offset就不一样了,重复的数据直接忽略掉。具体过程请参考下类的resetOffsets方法。
Created with Raphaël 2.1.0WRITE_STARTEDWRITE_PARTITION_PAUSEDshouldRotate()SHOULD_ROTATETEMP_FILE_CLOSEDFILE_COMMITTEDyesno
1. 状态机中各状态说明
WRITE_STARTED
初始状态,无条件进入下一状态WRITE_PARTITION_PAUSED
如果缓冲区中有数据,将缓冲区中数据写入hdfs tmp文件。SHOULD_ROTATE
关闭hdfs tmp文件。TEMP_FILE_CLOSED
数据到目标文件的准备工作,写WAL。WAL_APPENDED
将TMP文件转化为目标文件。FILE_COMMITTED
无条件跳转到 WRITE_PARTITION_PAUSED状态。2. 状态机中条件说明
shouldRotate()
满足两个条件中任意一个就开始写:1. 时间到,可配置,例如15分钟/60分钟写入一次等(非定时器实现);
2. 读取的record数量达到配置值;
代码详见,下类的shouldRotate方法:
kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\TopicPartitionWriter.java
3 . 附加说明
1. TMP文件
每次kafka connector从kafka中poll数据后,先将数据写往hdfs的tmp目录。1. 线程为每个kafka分区分配一个tmp文件;
2. 文件放在/topic/+tmp/分区/目录下;
3. 文件名为随机产生的UUID_tmp.parquet;
4. 不是每条记录都会直接和hdfs交互,parquet有内存缓存,写hdfs时机可以参见:
\org\apache\parquet\hadoop\InternalParquetRecordWriter.java
2. WAL
全名”Write Ahead Log”,当异常发生时,可基于日志进行恢复,恢复过程详细下类的recover方法:kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\TopicPartitionWriter.java
3. 写入目标文件
从Tmp目录到目标目录不会发生大规模的IO,只是做了hdfs文件的重命名,详见下类中的commit方法kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\storage\HdfsStorage.java
4. 目标文件的隐藏地图
目标文件的命名实际很讲究,看/topics/topic名称/ hive分区列和值/ kafka分区+开始offset+结束offset
暗藏恢复处理流程:当新的thread接管分区的处理,或者thread从异常中恢复。会先根据该分区下最后写入的文件名获取最后的offset。
Kafka connect commit offset是不按套路出牌的,读了数据有个记录,然后定时commit。Image这种情况:数据已经写入了目标文件,但是还没有commit,那再读的时候,就会重复读数据。如果知道写入hdfs最后的offset就不一样了,重复的数据直接忽略掉。具体过程请参考下类的resetOffsets方法。
kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\TopicPartitionWriter.java
相关文章推荐
- KDE Connect —— 让 Android 手机成为 KDE 桌面的一部分
- Kafka 之 中级
- Can't connect to MySQL server on 'localhost' (10048)问题解决方法
- Oracle中的Connect/session和process的区别及关系介绍
- hadoop的hdfs文件操作实现上传文件到hdfs
- php运行提示Can't connect to MySQL server on 'localhost'的解决方法
- MySQL错误ERROR 2002 (HY000): Can't connect to local MySQL server through socket
- 分析Node.js connect ECONNREFUSED错误
- NodeJS学习笔记之Connect中间件模块(二)
- NodeJS学习笔记之Connect中间件模块(一)
- NodeJS学习笔记之Connect中间件应用实例
- Linux下Kafka单机安装配置方法(图文)
- java连接hdfs ha和调用mapreduce jar示例
- java实现将ftp和http的文件直接传送到hdfs
- Java访问Hadoop分布式文件系统HDFS的配置说明
- Kafka使用入门教程第1/2页
- Can't connect to MySQL server on localhost (10061)解决方法
- 在Hadoop2.5.0下利用Java读写HDFS
- HDFS 文件操作
- Spark中将对象序列化存储到hdfs