您的位置:首页 > 其它

日志收集框架 Flume 组件之Source使用

2018-09-26 19:56 1021 查看

上一篇简单介绍了Flume几个组件,今天介绍下组件其一的source,整理这些,也是二次学习的过程,也是梳理知识的过程。

Source 中文译为来源,源 作用:采集数据,然后把数据传输到channel上。 例如:监控某个文件或者某个端口或某个目录,新增数据,新增文件的变化,然后传输到channel。

常用的的source类型,也是平常用的比较多的几种类型,如下:

[th]source类型 [/th]
说明
Avro Source 支持avro协议,内置支持
Thrift Source 支持Thirft rpc ,内置支持
Exec Source 基于Unix的command在标准输出上采集数据 ,如tail -F
JMS Source 监控JMS系统,比如Activemq,可以
Taildir Source 监听目录或文件(Flume1.8版本支持)
Spooling Directory Source 监听目录下的新增文件
Kafka Source 读取Kafka数据

下面不多少,简单实战,没安装的可以google一下,好多安装教程,本文是基于Flume 1.8

Exec Source,前面说过了,exec source 是以tail -F 形式来监听文件的变化的, flume-exec.conf配置:

#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Describe/configure the source
# 配置类型为exec
a1.sources.r1.type = exec
# 路径是自己要监听的日志路径
a1.sources.r1.command = tail -F /usr/local/installed/tomcat/logs/system_app.log
a1.sources.r1.channels = c1

# # Describe the sink
# 下沉sink是以日志的形式来打印
a1.sinks.k1.type = logger

# # Use a channel which buffers events in memory
# channel采用以内存形式来存放上游source传递过来的数据
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

具体使用步骤: 1、启动 进入到flume安装目录,../bin下,命令如下:

./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf

缺点:agent挂了,则不会记录上次传递数据的位置,还是以tail -F为准,来重新传递数据。

Taildir Source 监听目录文件变化,记录上一次同步后的位置,实现断点续传,可以保证没有重复数据的读取。

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# # Describe/configure the source
a1.sources.r1.type = TAILDIR
# 保存监听文件的读取位置的文件
a1.sources.r1.positionFile = /opt/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/installed/tomcat/logs/system_app.log
a1.sources.r1.batchSize = 100
a1.sources.r1.backoffSleepIncrement = 1000
a1.sources.r1.maxBackoffSleep = 5000

#
# # Describe the sink
a1.sinks.k1.type = logger
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

具体测试,可以往监听的文件里写入数据,看看是否可以监听到数据。

Spooling Directory Source 监听目录文件的变化, flume-spooling.conf 配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/self
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.fileSuffix = completed
a1.sources.r1.batchSize = 100

# # Describe the sink
a1.sinks.k1.type = logger
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

备注:注意,只监听新增的文件,这个目录下有新增文件会被监听到。目录下子文件夹也不会被监听到,目录下以有的文件更新了,也不会被监听到。

其它的一些类型,可自行测试。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Flume Source 实战 配置