日志收集框架 Flume 组件之Source使用
2018-09-26 19:56
1021 查看
上一篇简单介绍了Flume几个组件,今天介绍下组件其一的source,整理这些,也是二次学习的过程,也是梳理知识的过程。
Source 中文译为来源,源 作用:采集数据,然后把数据传输到channel上。 例如:监控某个文件或者某个端口或某个目录,新增数据,新增文件的变化,然后传输到channel。
常用的的source类型,也是平常用的比较多的几种类型,如下:
说明 | |
---|---|
Avro Source | 支持avro协议,内置支持 |
Thrift Source | 支持Thirft rpc ,内置支持 |
Exec Source | 基于Unix的command在标准输出上采集数据 ,如tail -F |
JMS Source | 监控JMS系统,比如Activemq,可以 |
Taildir Source | 监听目录或文件(Flume1.8版本支持) |
Spooling Directory Source | 监听目录下的新增文件 |
Kafka Source | 读取Kafka数据 |
下面不多少,简单实战,没安装的可以google一下,好多安装教程,本文是基于Flume 1.8
Exec Source,前面说过了,exec source 是以tail -F 形式来监听文件的变化的, flume-exec.conf配置:
# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # # Describe/configure the source # 配置类型为exec a1.sources.r1.type = exec # 路径是自己要监听的日志路径 a1.sources.r1.command = tail -F /usr/local/installed/tomcat/logs/system_app.log a1.sources.r1.channels = c1 # # Describe the sink # 下沉sink是以日志的形式来打印 a1.sinks.k1.type = logger # # Use a channel which buffers events in memory # channel采用以内存形式来存放上游source传递过来的数据 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
具体使用步骤: 1、启动 进入到flume安装目录,../bin下,命令如下:
./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf
缺点:agent挂了,则不会记录上次传递数据的位置,还是以tail -F为准,来重新传递数据。
Taildir Source 监听目录文件变化,记录上一次同步后的位置,实现断点续传,可以保证没有重复数据的读取。
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # # Describe/configure the source a1.sources.r1.type = TAILDIR # 保存监听文件的读取位置的文件 a1.sources.r1.positionFile = /opt/flume/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /usr/local/installed/tomcat/logs/system_app.log a1.sources.r1.batchSize = 100 a1.sources.r1.backoffSleepIncrement = 1000 a1.sources.r1.maxBackoffSleep = 5000 # # # Describe the sink a1.sinks.k1.type = logger # # # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # # # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
具体测试,可以往监听的文件里写入数据,看看是否可以监听到数据。
Spooling Directory Source 监听目录文件的变化, flume-spooling.conf 配置
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/self a1.sources.r1.deletePolicy = immediate a1.sources.r1.fileSuffix = completed a1.sources.r1.batchSize = 100 # # Describe the sink a1.sinks.k1.type = logger # # # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # # # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
备注:注意,只监听新增的文件,这个目录下有新增文件会被监听到。目录下子文件夹也不会被监听到,目录下以有的文件更新了,也不会被监听到。
其它的一些类型,可自行测试。
相关文章推荐
- Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志
- 日志收集框架flume的安装及简单使用
- 大数据日志收集框架之Flume入门
- 大数据日志收集框架之Flume入门
- 日志采集框架Flume的安装及使用
- flume学习(六):使用hive来分析flume收集的日志数据
- 日志收集系统Flume调研笔记第2篇 - Flume配置及使用实例
- logstash_forward+flume+elasticsearch+kibana日志收集框架
- Java日志框架使用技巧收集(slf4j、jcl、jul、log4j1、log4j2、logback)
- flume学习(五):使用hive来分析flume收集的日志数据
- Flume 日志收集、使用Flume收集日志到HDFS
- SparkStreaming项目实战系列——2.分布式日志收集框架Flume
- Java 流行框架(Spring/Struts2/Hibernate/iBatis)都在使用什么日志组件
- flume之Taildir Source支持变化追加文件的日志收集
- Java 流行框架(Spring/Struts2/Hibernate/iBatis)都在使用什么日志组件
- 使用hive来分析flume收集的日志数据
- flume学习(六):使用hive来分析flume收集的日志数据
- nginx日志切割并使用flume-ng收集日志
- Flume NG源码分析(四)使用ExecSource从本地日志文件中收集日志
- flume学习(六):使用hive来分析flume收集的日志数据