您的位置:首页 > 大数据 > 人工智能


2017-06-09 16:22 881 查看
flume 1.7.0推出了taildirSource组件。tail监控目录下匹配上正则表达式的的所有文件,实现断点续传。

[hadoop@h71 conf]$ cat taildir.conf 

a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Describe/configure the source
a1.sources.r1.type = taildir
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/hui/taildir_position.json
#设置tiail的组, 使用空格隔开
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/test1/hehe.txt
a1.sources.r1.filegroups.f2 = /home/hadoop/hui/test2/.*
#.匹配除换行符 \n 之外的任何单字符。*匹配前面的子表达式零次或多次。这里也可以用messages.*
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/hui
a1.sinks.k1.sink.rollInterval = 0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[hadoop@h71 hui]$ tree
|-- messages.1
|-- qiang
|   `-- hui.txt
|-- test1
|   |-- hehe.txt
|   `-- messages.2
`-- test2
|-- messages.3
`-- messages.4

3 directories, 6 files
[hadoop@h71 hui]$ cat test1/hehe.txt 

hello world hehe

[hadoop@h71 hui]$ cat test1/messages.2 

hello world 2

[hadoop@h71 hui]$ cat test2/messages.3 

hello world 3

[hadoop@h71 hui]$ cat test2/messages.4

hello world 4


[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console

[hadoop@h71 hui]$ ls (在hui/目录下生成了1489881718232-1和taildir_position.json文件)

1489881718232-1  messages.1  qiang  taildir_position.json  test1  test2

[hadoop@h71 hui]$ cat 1489881718232-1 

hello world hehe

hello world 3

hello world 4

[hadoop@h71 hui]$ cat taildir_position.json 



[hadoop@h71 hui]$ echo "ni hao world" >> test1/hehe.txt 


[hadoop@h71 hui]$ cat 1489881718232-1 

hello world hehe

hello world 3

hello world 4

ni hao world

[hadoop@h71 hui]$ cat taildir_position.json 









修改ReliableTaildirEventReader 类的 updateTailFiles 方法。

将其中的 tf.getPath().equals(f.getAbsolutePath()) 判断条件去除。只用判断文件不为空即可,不用判断文件的名字。

//        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
if (tf == null) {//文件不存在 position 中则全读。
修改TailFile 类的 updatePos 方法

此处同样的原因,inode 已经能够确定唯一的 文件了,所以不用加 path 作为判定条件了。所以去掉该条件就支持了文件重命名情况。

//     if (this.inode == inode && this.path.equals(path)) {
if (this.inode == inode) {



[hadoop@h71 conf]$ cat taildir.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Describe/configure the source
a1.sources.r1.type = com.urey.flume.source.taildir.TaildirSource
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/hadoop/q1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/.*
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement = 1000
a1.sources.r1.maxBackoffSleep = 5000

# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/hadoop/q1
a1.sinks.k1.sink.rollInterval = 0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[hadoop@h71 ~]$ mkdir q1

[hadoop@h71 hui]$ tree
|-- messages.1
`-- test2
|-- messages.3
|-- messages.4
`-- test1
|-- hehe.txt
`-- messages.2

2 directories, 5 files
[hadoop@h71 hui]$ cat messages.1 

hello world 1

[hadoop@h71 hui]$ cat test2/messages.3

hello world 3

[hadoop@h71 hui]$ cat test2/messages.4 

hello world 4

[hadoop@h71 hui]$ cat test2/test1/hehe.txt 

hello world hehe

[hadoop@h71 hui]$ cat test2/test1/messages.2 

hello world 2


[hadoop@h71 apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c . -f conf/taildir.conf -n a1 -Dflume.root.logger=INFO,console

[hadoop@h71 q1]$ ls

1489910670584-1  taildir_position.json

[hadoop@h71 q1]$ cat 1489910670584-1 

hello world 2

hello world 4

hello world 1

hello world hehe

hello world 3

[hadoop@h71 q1]$ cat taildir_position.json 


[hadoop@h71 hui]$ mv test2/test1/hehe.txt test2/haha.txt

[hadoop@h71 hui]$ cat ../q1/1489910670584-1 

hello world 2

hello world 4

hello world 1

hello world hehe

hello world 3

[hadoop@h71 hui]$ cat ../q1/taildir_position.json 


[hadoop@h71 hui]$ echo "hello world haha" >> test2/haha.txt 

[hadoop@h71 hui]$ cat ../q1/1489910670584-1 

hello world 2

hello world 4

hello world 1

hello world hehe

hello world 3

hello world haha

[hadoop@h71 hui]$ cat ../q1/taildir_position.json 


[hadoop@h71 hui]$ echo "hello china" >> test2/test1/hehe.txt

[hadoop@h71 hui]$ cat ../q1/1489910670584-1 

hello world 2

hello world 4

hello world 1

hello world hehe

hello world 3

hello world haha

hello china

[hadoop@h71 hui]$ cat ../q1/taildir_position.json 


http://blog.csdn.net/u012373815/article/details/62241528 http://www.2cto.com/net/201703/616085.html http://blog.csdn.net/yeruby/article/details/51812759
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息