您的位置:首页 > 大数据 > Hadoop

kafka connect 数据写hdfs详解

2016-07-05 16:24 459 查看
曾经有位学者说过,“read the F**K code”。不多说,从TopicPartitionWriter的write方法中可以得到如下状态机:

Created with Raphaël 2.1.0WRITE_STARTEDWRITE_PARTITION_PAUSEDshouldRotate()SHOULD_ROTATETEMP_FILE_CLOSEDFILE_COMMITTEDyesno

1. 状态机中各状态说明

WRITE_STARTED

初始状态,无条件进入下一状态

WRITE_PARTITION_PAUSED

如果缓冲区中有数据,将缓冲区中数据写入hdfs tmp文件。

SHOULD_ROTATE

关闭hdfs tmp文件。

TEMP_FILE_CLOSED

数据到目标文件的准备工作,写WAL。

WAL_APPENDED

将TMP文件转化为目标文件。

FILE_COMMITTED

无条件跳转到 WRITE_PARTITION_PAUSED状态。

2. 状态机中条件说明

shouldRotate()

满足两个条件中任意一个就开始写:

1. 时间到,可配置,例如15分钟/60分钟写入一次等(非定时器实现);

2. 读取的record数量达到配置值;

代码详见,下类的shouldRotate方法:

kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\TopicPartitionWriter.java


3 . 附加说明

1. TMP文件

每次kafka connector从kafka中poll数据后,先将数据写往hdfs的tmp目录。

1. 线程为每个kafka分区分配一个tmp文件;

2. 文件放在/topic/+tmp/分区/目录下;

3. 文件名为随机产生的UUID_tmp.parquet;

4. 不是每条记录都会直接和hdfs交互,parquet有内存缓存,写hdfs时机可以参见:

\org\apache\parquet\hadoop\InternalParquetRecordWriter.java


2. WAL

全名”Write Ahead Log”,当异常发生时,可基于日志进行恢复,恢复过程详细下类的recover方法:

kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\TopicPartitionWriter.java


3. 写入目标文件

从Tmp目录到目标目录不会发生大规模的IO,只是做了hdfs文件的重命名,详见下类中的commit方法

kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\storage\HdfsStorage.java


4. 目标文件的隐藏地图

目标文件的命名实际很讲究,看

/topics/topic名称/ hive分区列和值/ kafka分区+开始offset+结束offset


暗藏恢复处理流程:当新的thread接管分区的处理,或者thread从异常中恢复。会先根据该分区下最后写入的文件名获取最后的offset。

Kafka connect commit offset是不按套路出牌的,读了数据有个记录,然后定时commit。Image这种情况:数据已经写入了目标文件,但是还没有commit,那再读的时候,就会重复读数据。如果知道写入hdfs最后的offset就不一样了,重复的数据直接忽略掉。具体过程请参考下类的resetOffsets方法。

kafka-connect-hdfs-2.0.0\src\main\java\io\confluent\connect\hdfs\TopicPartitionWriter.java
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka connect hdfs