flume的TaildirSource介绍及升级改造
2017-06-09 16:22
881 查看
flume 1.7.0推出了taildirSource组件。tail监控目录下匹配上正则表达式的的所有文件,实现断点续传。
但是我后来发现cdh版的flume-1.6.0也已经有这个组件了,而官方的Apache版的apache-flume-1.6.0-bin却没有这个组件。并且Apache版的flume1.7.0比cdh版的flume1.6.0的org.apache.flume.source.taildir包中多出TaildirMatcher.java,并且其他的代码也有所不同,应该Apache版的flume1.7.0比cdh版的flume1.6.0更加完善,两者的差异在哪里,有时间再研究研究吧。因为我的机器上已经装有cdh版的apache-flume-1.6.0-cdh5.5.2-bin,所以就直接用这个而没有重装Apache版的flume1.7.0了,并且apache-flume-1.6.0-cdh5.5.2-bin目前已经够用了。
[hadoop@h71 conf]$ cat taildir.conf
hello world hehe
[hadoop@h71 hui]$ cat test1/messages.2
hello world 2
[hadoop@h71 hui]$ cat test2/messages.3
hello world 3
[hadoop@h71 hui]$ cat test2/messages.4
hello world 4
启动flume进程:
[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[hadoop@h71 hui]$ ls (在hui/目录下生成了1489881718232-1和taildir_position.json文件)
1489881718232-1 messages.1 qiang taildir_position.json test1 test2
[hadoop@h71 hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
[hadoop@h71 hui]$ cat taildir_position.json
[{"inode":1532721,"pos":17,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]
往test1/hehe.txt中传入数据:
[hadoop@h71 hui]$ echo "ni hao world" >> test1/hehe.txt
再观察1489881718232-1和taildir_position.json文件
[hadoop@h71 hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
ni hao world
[hadoop@h71 hui]$ cat taildir_position.json
[{"inode":1532721,"pos":30,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]
限制:
1.可以配置一个监听目录,会监听该目录下所有的文件,但是如果配置目录下面嵌套了子目录,则无法监听。需要修改源码,我们可以递归地对配置目录的所有子目录的所有文件进行监听
2.taildirSource组件不支持文件改名的。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读。(应用场景:Linux的系统日志轮询功能)
解决限制一:
网上已经有修改后的代码了,下载地址:https://github.com/qwurey/flume-source-taildir-recursive
将代码下载下来并导入myeclipse中
解决限制二:
修改ReliableTaildirEventReader 类的 updateTailFiles 方法。
将其中的 tf.getPath().equals(f.getAbsolutePath()) 判断条件去除。只用判断文件不为空即可,不用判断文件的名字。
此处同样的原因,inode 已经能够确定唯一的 文件了,所以不用加 path 作为判定条件了。所以去掉该条件就支持了文件重命名情况。
添加自定义source入口,也就是将源码拷贝过来,然后将修改过的代码打包为自定义taildir的jar包运行flume。将taidir.jar导入到flume的lib目录下。
我已将taildir.jar上传,地址:http://download.csdn.net/download/m0_37739193/9968369
[hadoop@h71 conf]$ cat taildir.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Describe/configure the source
a1.sources.r1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/q1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/.*
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement = 1000
a1.sources.r1.maxBackoffSleep = 5000
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/q1
a1.sinks.k1.sink.rollInterval = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[hadoop@h71 ~]$ mkdir q1
[hadoop@h71 hui]$ tree
hello world 1
[hadoop@h71 hui]$ cat test2/messages.3
hello world 3
[hadoop@h71 hui]$ cat test2/messages.4
hello world 4
[hadoop@h71 hui]$ cat test2/test1/hehe.txt
hello world hehe
[hadoop@h71 hui]$ cat test2/test1/messages.2
hello world 2
启动flume:
[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[hadoop@h71 q1]$ ls
1489910670584-1 taildir_position.json
[hadoop@h71 q1]$ cat 1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[hadoop@h71 q1]$ cat taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]
[hadoop@h71 hui]$ mv test2/test1/hehe.txt test2/haha.txt
[hadoop@h71 hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[hadoop@h71 hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]
[hadoop@h71 hui]$ echo "hello world haha" >> test2/haha.txt
[hadoop@h71 hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
[hadoop@h71 hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"}]
[hadoop@h71 hui]$ echo "hello china" >> test2/test1/hehe.txt
[hadoop@h71 hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
hello china
[hadoop@h71 hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"},{"inode":1532714,"pos":12,"file":"/home/hadoop/hui/test2/test1/hehe.txt"}]
参考:
http://blog.csdn.net/u012373815/article/details/62241528 http://www.2cto.com/net/201703/616085.html http://blog.csdn.net/yeruby/article/details/51812759
但是我后来发现cdh版的flume-1.6.0也已经有这个组件了,而官方的Apache版的apache-flume-1.6.0-bin却没有这个组件。并且Apache版的flume1.7.0比cdh版的flume1.6.0的org.apache.flume.source.taildir包中多出TaildirMatcher.java,并且其他的代码也有所不同,应该Apache版的flume1.7.0比cdh版的flume1.6.0更加完善,两者的差异在哪里,有时间再研究研究吧。因为我的机器上已经装有cdh版的apache-flume-1.6.0-cdh5.5.2-bin,所以就直接用这个而没有重装Apache版的flume1.7.0了,并且apache-flume-1.6.0-cdh5.5.2-bin目前已经够用了。
[hadoop@h71 conf]$ cat taildir.conf
a1.sources = r1 a1.channels = c1 a1.sinks = k1 # Describe/configure the source #source的类型为TAILDIR,这里的类型大小写都可以 a1.sources.r1.type = taildir a1.sources.r1.channels = c1 #存储tial最后一个位置存储位置 a1.sources.r1.positionFile = /home/hadoop/hui/taildir_position.json #设置tiail的组, 使用空格隔开 a1.sources.r1.filegroups = f1 f2 #设置每个分组的绝对路径 a1.sources.r1.filegroups.f1 = /home/hadoop/hui/test1/hehe.txt a1.sources.r1.filegroups.f2 = /home/hadoop/hui/test2/.* #.匹配除换行符 \n 之外的任何单字符。*匹配前面的子表达式零次或多次。这里也可以用messages.* a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/hadoop/hui a1.sinks.k1.sink.rollInterval = 0 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1[hadoop@h71 hui]$ tree
. |-- messages.1 |-- qiang | `-- hui.txt |-- test1 | |-- hehe.txt | `-- messages.2 `-- test2 |-- messages.3 `-- messages.4 3 directories, 6 files[hadoop@h71 hui]$ cat test1/hehe.txt
hello world hehe
[hadoop@h71 hui]$ cat test1/messages.2
hello world 2
[hadoop@h71 hui]$ cat test2/messages.3
hello world 3
[hadoop@h71 hui]$ cat test2/messages.4
hello world 4
启动flume进程:
[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[hadoop@h71 hui]$ ls (在hui/目录下生成了1489881718232-1和taildir_position.json文件)
1489881718232-1 messages.1 qiang taildir_position.json test1 test2
[hadoop@h71 hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
[hadoop@h71 hui]$ cat taildir_position.json
[{"inode":1532721,"pos":17,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]
往test1/hehe.txt中传入数据:
[hadoop@h71 hui]$ echo "ni hao world" >> test1/hehe.txt
再观察1489881718232-1和taildir_position.json文件
[hadoop@h71 hui]$ cat 1489881718232-1
hello world hehe
hello world 3
hello world 4
ni hao world
[hadoop@h71 hui]$ cat taildir_position.json
[{"inode":1532721,"pos":30,"file":"/home/hadoop/hui/test1/hehe.txt"},{"inode":1532719,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"}]
限制:
1.可以配置一个监听目录,会监听该目录下所有的文件,但是如果配置目录下面嵌套了子目录,则无法监听。需要修改源码,我们可以递归地对配置目录的所有子目录的所有文件进行监听
2.taildirSource组件不支持文件改名的。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读。(应用场景:Linux的系统日志轮询功能)
解决限制一:
网上已经有修改后的代码了,下载地址:https://github.com/qwurey/flume-source-taildir-recursive
将代码下载下来并导入myeclipse中
解决限制二:
修改ReliableTaildirEventReader 类的 updateTailFiles 方法。
将其中的 tf.getPath().equals(f.getAbsolutePath()) 判断条件去除。只用判断文件不为空即可,不用判断文件的名字。
// if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { if (tf == null) {//文件不存在 position 中则全读。修改TailFile 类的 updatePos 方法
此处同样的原因,inode 已经能够确定唯一的 文件了,所以不用加 path 作为判定条件了。所以去掉该条件就支持了文件重命名情况。
// if (this.inode == inode && this.path.equals(path)) { if (this.inode == inode) {修改这两个地方就支持了文件重命名的问题,实现了目录下多文件监控,断点续传。
添加自定义source入口,也就是将源码拷贝过来,然后将修改过的代码打包为自定义taildir的jar包运行flume。将taidir.jar导入到flume的lib目录下。
我已将taildir.jar上传,地址:http://download.csdn.net/download/m0_37739193/9968369
[hadoop@h71 conf]$ cat taildir.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Describe/configure the source
a1.sources.r1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/q1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/.*
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement = 1000
a1.sources.r1.maxBackoffSleep = 5000
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/q1
a1.sinks.k1.sink.rollInterval = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[hadoop@h71 ~]$ mkdir q1
[hadoop@h71 hui]$ tree
. |-- messages.1 `-- test2 |-- messages.3 |-- messages.4 `-- test1 |-- hehe.txt `-- messages.2 2 directories, 5 files[hadoop@h71 hui]$ cat messages.1
hello world 1
[hadoop@h71 hui]$ cat test2/messages.3
hello world 3
[hadoop@h71 hui]$ cat test2/messages.4
hello world 4
[hadoop@h71 hui]$ cat test2/test1/hehe.txt
hello world hehe
[hadoop@h71 hui]$ cat test2/test1/messages.2
hello world 2
启动flume:
[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console
[hadoop@h71 q1]$ ls
1489910670584-1 taildir_position.json
[hadoop@h71 q1]$ cat 1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[hadoop@h71 q1]$ cat taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]
[hadoop@h71 hui]$ mv test2/test1/hehe.txt test2/haha.txt
[hadoop@h71 hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
[hadoop@h71 hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532718,"pos":17,"file":"/home/hadoop/hui/test2/test1/hehe.txt"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"}]
[hadoop@h71 hui]$ echo "hello world haha" >> test2/haha.txt
[hadoop@h71 hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
[hadoop@h71 hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"}]
[hadoop@h71 hui]$ echo "hello china" >> test2/test1/hehe.txt
[hadoop@h71 hui]$ cat ../q1/1489910670584-1
hello world 2
hello world 4
hello world 1
hello world hehe
hello world 3
hello world haha
hello china
[hadoop@h71 hui]$ cat ../q1/taildir_position.json
[{"inode":1532727,"pos":14,"file":"/home/hadoop/hui/test2/test1/messages.2"},{"inode":1532720,"pos":14,"file":"/home/hadoop/hui/test2/messages.4"},{"inode":1532722,"pos":14,"file":"/home/hadoop/hui/messages.1"},{"inode":1532712,"pos":14,"file":"/home/hadoop/hui/test2/messages.3"},{"inode":1532718,"pos":34,"file":"/home/hadoop/hui/test2/haha.txt"},{"inode":1532714,"pos":12,"file":"/home/hadoop/hui/test2/test1/hehe.txt"}]
参考:
http://blog.csdn.net/u012373815/article/details/62241528 http://www.2cto.com/net/201703/616085.html http://blog.csdn.net/yeruby/article/details/51812759
相关文章推荐
- flume使用(五):taildirSource重复获取数据和不释放资源解决办法
- Flume中的TaildirSource
- flume中的TaildirSource(1)
- Flume TailDirSource问题
- flume1.7.0-taildirSource 支持多文件监控和断点续传
- [ETL] Flume 理论与demo(Taildir Source & Hdfs Sink)
- flume中的TaildirSource(2)
- flume使用(四):taildirSource多文件监控实时采集
- Flume 1.7.0 新特性 - Taildir Source
- flume之Taildir Source支持变化追加文件的日志收集
- Flume-ng TaildirSource 填坑
- flume 1.7 TailDir source重复获取数据集不释放资源解决办法
- TaildirSource(Flume中实现)
- flume配置-生产环境下 Taildir Source to kafka Sink
- Flume中的TaildirSource
- 【Flume】TailDirSource源码理解
- Flume内置channel,source,sink三组件介绍
- Flume tailDir合并异常行处理
- flume tailsource 重读问题
- Docker容器内flume source tail + sed 缓存问题