您的位置:首页 > 其它

flume采集日志数据到kafka缓存数据

2020-06-06 05:19 120 查看

===============
一 前言
在一个完整的大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、
结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架,

flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。
尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,
用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一.

二 概述
2.1 什么是Flume
  Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,
或者数集中机制。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,
HBase等集中存储器中。

2.2 Flume特性
1.Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
2.Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中
3.一般的采集需求,通过对flume的简单配置即可实现
4.Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
支持在日志系统中定制各类数据发送方,用于收集数据;
同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。
名词介绍:
Flume OG:Flume original generation,即Flume0.9x版本
Flume NG:Flume next generation,即Flume1.x版本

三 Flume原理

3.1 Flume组件详解
对于每一个Agent来说,它就是一共独立的守护进程(JVM),它从客户端接收数据,如下图所示flume的基本模型

1.Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成
2.每一个agent相当于一个数据(被封装成Event对象)传递员,内部有三个组件:

   1.Source:采集组件,用于跟数据源对接,以获取数据
   2.Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据
   3.Channel:传输通道组件,用于从source将数据传递到sink

1、Flume有一个简单、灵活的基于流的数据流结构
2、Flume具有故障转移机制和负载均衡机制
3、Flume使用一个简单可扩展的数据模型(source、channel、sink)

目前,flume-ng处理数据有两种方式:avro-client、agent
avro-client:一次性将数据传输到指定的avro服务的客户端
agent:一个持续传输数据的服务
Agent主要的组件包括:Source、Channel、Sink
Source:完成对日志数据的手机,分成transtion和event打入到channel之中。
Channel:主要提供一个队列的功能,对source提供的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库或是提交到远程服务器。
数据在组件传输的单位是Event。
————————————————
===source 
 source意为来源、源头。
主要作用:从外界采集各种类型的数据,将数据传递给Channel。
   比如:监控某个文件只要增加数据就立即采集新增的数据、监控某个目录一旦有新文件产生就采集新文件的内容、监控某个端口等等。
常见采集的数据类型:
   Exec Source、Avro Source、NetCat Source、Spooling Directory Source等
详细查看:
  http://flume.apache.org/FlumeUserGuide.html#flume-sources
  或者自带的文档查看。
 
Source具体作用:
AvroSource:监听一个avro服务端口,采集Avro数据序列化后的数据;
Thrift Source:监听一个Thrift 服务端口,采集Thrift数据序列化后的数据;
Exec Source:基于Unix的command在标准输出上采集数据;
tail -F 和tail -f 区别。基于log4j切割文件时的能否读取问题。
JMS Source:Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;
Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;
Kafka Source:从kafka服务中采集数据。
NetCat Source: 绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入
HTTP Source:监听HTTP POST和 GET产生的数据的采集
————————————————

=====channel

Channel
    一个数据的存储池,中间通道。
主要作用
    接受source传出的数据,向sink指定的目的地传输。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。
当sink写入失败后,可以自动重写,不会造成数据丢失,因此很可靠。
channel的类型很多比如:内存中、jdbc数据源中、文件形式存储等。
常见采集的数据类型:
    Memory Channel
    File Channel
    Spillable Memory Channel等
    
详细查看:
    http://flume.apache.org/FlumeUserGuide.html#flume-channels
 
Channel具体作用:

Memory Channel:使用内存作为数据的存储,速度快
File Channel:使用文件来作为数据的存储,安全可靠
Spillable Memory Channel:使用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则flush到文件中。
JDBC Channel:使用jdbc数据源来作为数据的存储。
Kafka Channel:使用kafka服务来作为数据的存储。

===sink

Sink:数据的最终的目的地。
主要作用:接受channel写入的数据以指定的形式表现出来(或存储或展示)。
sink的表现形式很多比如:打印到控制台、hdfs上、avro服务中、文件中等。
常见采集的数据类型:
      HDFS Sink
      Hive Sink
      Logger Sink
      Avro Sink
      Thrift Sink
      File Roll Sink
      HBaseSink
      Kafka Sink等
详细查看:
      http://flume.apache.org/FlumeUserGuide.html#flume-sinks
HDFS Sink需要有hdfs的配置文件和类库。一般采取多个sink汇聚到一台采集机器负责推送到hdfs。
 
Sink具体作用:

Logger Sink:将数据作为日志处理(根据flume中的设置的日志的级别显示)。
HDFS Sink:将数据传输到hdfs集群中。
Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。
Thrift Sink:数据被转换成Thrift Event,然后发送到指定的的服务端口上。
File Roll Sink:数据传输到本地文件中。
Hive Sink:将数据传输到hive的表中。
IRC Sink:数据向指定的IRC服务和端口中发送。
Null Sink:取消数据的传输,即不发送到任何目的地。
HBaseSink:将数据发往hbase数据库中。
MorphlineSolrSink:数据发送到Solr搜索服务器(集群)。
ElasticSearchSink:数据发送到Elastic Search搜索服务器(集群)。
Kafka Sink:将数据发送到kafka服务中。(注意依赖类库)。


===event

event是Flume NG传输的数据的基本单位,也是事务的基本单位。
在文本文件,通常是一行记录就是一个event。
网络消息传输系统中,一条消息就是一个event。
event里有header、body
Event里面的header类型:Map<String, String>
我们可以在source中自定义header的key:value,在某些channel和sink中使用header。

=================================================

实例:通过flume将本地的日志文件,采集出来缓存在kafka.

1.我hadoop环境用的是伪分布,首先要启动zookeeper。

[root@hadoop153 ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/local/src/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... /usr/local/src/zookeeper-3.4.5/bin/zkServer.sh: 第 103 行:[: /tmp/zookeeper: 期待二元表达式
STARTED
[root@hadoop153 ~]# jps
120775 QuorumPeerMain
120839 Jps
通过jps命令查看,zookeeper进程已启动。

2.创建flume配置文件,flume_kafka_spooldir_topic40.conf

# 定义这个agent中各组件的名字
  2 a1.sources = r1
  3 a1.sinks = k1
  4 a1.channels = c1
  5 
  6 # 描述和配置source组件:r1
  7 a1.sources.r1.type = spooldir

# home/database/flume_testdata/flume目录为存放日志文件的目录
  8 a1.sources.r1.spoolDir = /home/database/flume_testdata/flume 
  9 a1.sources.r1.fileHeader = true
 10 
 11 # 描述和配置sink组件:k1
 12 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
 13 a1.sinks.k1.kafka.topic = topic40
 14 a1.sinks.k1.kafka.bootstrap.servers = hadoop153:9092
 15 a1.sinks.k1.kafka.flumeBatchSize = 20
 16 a1.sinks.k1.kafka.producer.acks = 1
 17 a1.sinks.k1.kafka.producer.linger.ms = 1
 18 a1.sinks.ki.kafka.producer.compression.type = snappy
 19 
 20 # 描述和配置channel组件,此处使用是内存缓存的方式
 21 a1.channels.c1.type = memory
 22 a1.channels.c1.capacity = 1000
 23 a1.channels.c1.transactionCapacity = 100
 24 a1.channels.c1.keep-alive = 60
 25 # 描述和配置source  channel   sink之间的连接关系
 26 a1.sources.r1.channels = c1

3.启动kafka 

[root@hadoop153 kafka_2.11-0.10.2.1]# bin/kafka-server-start.sh config/server.properties &
4.启动flume任务

[root@hadoop153 apache-flume-1.7.0-bin]# bin/flume-ng agent --name a1 -c conf -f flumetest/flume_kafka_spooldir_topic40.conf -Dflume.root.logger=INFO,console

5.开启kafka消费

[root@hadoop153 kafka_2.11-0.10.2.1]# bin/kafka-console-consumer.sh --zookeeper Hadoop153:2181 --from-beginning --topic topic40

注意:topic40 要与flume配置文件中的指定topic要统一。

6.往保存日志文件的目录,写数据

[root@hadoop153 flume_testdata]# cp SogouQ.log flume/
7.监控kafka消费数据

[root@hadoop153 kafka_2.11-0.10.2.1]# bin/kafka-console-consumer.sh --zookeeper hadoop153:2181 --from-beginning --topic topic40
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
1,201.105.101.102,http://mystore.jsp/?productid=1,2017020020,1,1
2,201.105.101.103,http://mystore.jsp/?productid=2,2017020022,1,1
3,201.105.101.105,http://mystore.jsp/?productid=3,2017020023,1,1
4,201.105.101.107,http://mystore.jsp/?productid=1,2017020025,1,1
1,201.105.101.102,http://mystore.jsp/?productid=4,2017020021,3,1
1,201.105.101.102,http://mystore.jsp/?productid=1,2017020029,2,1
 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: