flume常用属性的说明
2016-06-17 18:08
260 查看
agent
agent的名称:a2agent的source、channel、sink的名称
a2.sources = s20 s21 a2.channels = c20 c21 a2.sinks = k20 k21
source
avro source的必选属性:type、channels、bind、port注意source是channels,sink是channel
a2.sources.s20.type = avro a2.sources.s20.channels = c20 c21 #source to channel a2.sources.s20.bind = 0.0.0.0 #hostname/ip adress a2.sources.s20.port = 11471 #port
compression-type用于解压缩传入数据的压缩格式。唯一支持的数据格式是zlib,如果要接受zlib压缩的数据,设置该参数为deflate。如果该参数设置为deflate,那么传入的数据必须被压缩,否则source不能解析传入的数据
a2.sources.s20.compression-type = deflate #none/deflate
拦截器:设置在source和channel之间。source接收到的事件,在写入channel之前,拦截器都可以进行转换或者删除这些事件
a2.sources.s20.interceptors = hostint #空格隔开的拦截器列表。此处表示拦截器名为hostint a2.sources.s20.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder #hostint拦截器的类型是Host Interceptor a2.sources.s20.interceptors.hostint.preserveExisting = true #该属性值为true时表示如果主机头已存在,应该被保留 a2.sources.s20.interceptors.hostint.useIP = true #该属性值为true时表示用ip地址,否则表示用主机名
选择器:同一个数据源分发到不同的目的地,官网提供两种类型 replicating channel selector(default) 和multiplexing channel selector ,replicating会将source过来的events发往所有的channel,而multiplexing可以选择发往哪些channel。还有一种定制类型custom channel selector ,给类型需要自己实现ChannelSelector接口
a2.sources.s20.selector.type = multiplexing a2.sources.s20.selector.header = topic #以header中的topic对应的的值作为条件 a2.sources.s20.selector.mapping.mobilegame = c20 c21 #如果header中的topic的值为mobilegame,使用c20,c21 channel a2.sources.s20.selector.mapping.webgame = c20 c21 #header中topic的值为webgame a2.sources.s20.selector.mapping.clientgame = c20 c21 #header中topic的值为clientgame a2.sources.s20.selector.mapping.monitor = c20 c21 #header中topic的值为monitor a2.sources.s20.selector.mapping.arc_services = c20 c21 #header中topic的值为啊arc_services a2.sources.s20.selector.mapping.web_analytics = c20 c21 #header中topic的值为web_analytics a2.sources.s20.selector.default = c20 #默认使用c20这个channel
scribe source 的必选属性:type
a2.sources.s21.type = org.apache.flume.source.scribe.ScribeSource a2.sources.s21.port = 11471 #Scribe连接的端口号 a2.sources.s21.workerThreads = 10 #处理线程数 a2.sources.s21.channels = c20 c21 a2.sources.s21.interceptors = hostint a2.sources.s21.interceptors.hostint.type = org.apache.flume.interceptor.HostInterceptor$Builder a2.sources.s21.interceptors.hostint.preserveExisting = true a2.sources.s21.interceptors.hostint.useIP = true a2.sources.s21.selector.type = multiplexing a2.sources.s21.selector.header = topic a2.sources.s21.selector.mapping.monitor = c20 c21 a2.sources.s20.selector.mapping.arc_services = c20 c21 a2.sources.s20.selector.mapping.web_analytics = c20 c21 a2.sources.s21.selector.default = c20
channel
channels是一个Agent上存储events的仓库,Source向其中添加events,而Sink从中取走移除events。常见的channel有:Memory Channel、File Channel 和 Spillable Memory Channel。Memory Channel:Source添加的events都暂存在内存队列中,适合高吞吐量的数据流,但是agent失败会丢数据。
file channel的必选属性:type
a2.channels.c20.type = file a2.channels.c20.checkpointDir = /export/flume/realtime/file_channel20/checkpoint #存放检查点目录,checkpointDir是一个目录 a2.channels.c20.dataDirs = /export/flume/realtime/file_channel20/data #存放数据的目录,dataDirs可以是多个目录,以逗号隔开。用独立的多个磁盘上的多个目录可以提高file channel的性能 a2.channels.c20.capacity = 200000000 #channel的最大容量 a2.channels.c20.keep-alive = 10 #Amount of time (in sec) to wait for a put operation a2.channels.c20.transactionCapacity = 100000 #The maximum size of transaction supported by the channel a2.channels.c20.checkpointInterval = 20000 #Amount of time (in millis) between checkpoints a2.channels.c21.type = file a2.channels.c21.checkpointDir = /export/flume/realtime/file_channel21/checkpoint a2.channels.c21.dataDirs = /export/flume/realtime/file_channel21/data a2.channels.c21.capacity = 200000000 a2.channels.c21.keep-alive = 10 a2.channels.c21.transactionCapacity = 100000 a2.channels.c21.checkpointInterval = 20000
sink
hdfs sink的必选属性:type、channel、hdfs.patha2.sinks.k20.type = hdfs a2.sinks.k20.channel = c20 a2.sinks.k20.hdfs.path = hdfs://CDH-cluster-main/export/gamelog/%Y-%m-%d/%{category}/%H a2.sinks.k20.hdfs.filePrefix = %{category}_%{host} #文件名前缀 a2.sinks.k20.hdfs.fileType = DataStream a2.sinks.k20.hdfs.useLocalTimeStamp = true a2.sinks.k20.hdfs.writeFormat = Text a2.sinks.k20.hdfs.batchSize = 5000 a2.sinks.k20.hdfs.rollInterval = 3600 #滚动到的当前文件所需的秒数 a2.sinks.k20.hdfs.rollCount = 0 #写入文件之前滚动的事件量,为0表示不基于事件数量滚动 a2.sinks.k20.hdfs.rollSize = 0 #出发滚动的文件大小,为0表示不基于文件大小滚动 a2.sinks.k20.hdfs.idleTimeout = 300 #超时后不活跃的文件关闭(0表示禁止关闭空闲文件) a2.sinks.k20.hdfs.retryInterval = 180 #关闭一个文件连续的时间间隔,以秒为单位。每个关闭调用多个RPC Namenode传送成本,所以设置过低会导致很 #namenode节点上的负载。如果设置为0或更少,flume不会试图关闭该文件如果第一次尝试失败 #并可能离开这个文件打开或“.tmp”的扩展。 a2.sinks.k20.hdfs.callTimeout = 120000 #允许hdfs操作如打开、刷新、关闭的毫秒数,如果很多HDFS超时时这个数字应该增加
Event Serializers:file_roll sink和hdfs sink都支持Event Serializers接口
a2.sinks.k20.serializer.appendNewline = false #换行符是否在写时附加到每个事件,默认的假定事件不包含换行。
kafka sink 的必选属性:type、brokerList
a2.sinks.k21.type = org.apache.flume.sink.kafka.KafkaSink a2.sinks.k21.channel = c21 a2.sinks.k21.brokerList = 10.14.251.155:9092,10.14.251.156:9092,10.14.251.157:9092 a2.sinks.k21.batchSize = 10 #每个批次处理的消息量,大的批量提高吞吐量,但是增加延时 a2.sinks.k21.serializer.appendNewline = false
Flume Sink Processors:对group中所有的sink提供负载均衡功能,或者是当一个sink失败时实现故障转移。目前支持default, failover 和 load_balance 三种。必选属性:sinks,processor.type。其中processor.type默认为default。
#load_balance type a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k4 k5 k6 k7 k8 k9 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true #失败的sinks支持指数级别 a1.sinkgroups.g1.processor.selector = round_robin #通过轮询来实现负载分发
相关文章推荐
- source命令执行SQL脚本文件
- Java IO与NIO的一些文件拷贝测试
- Go 语言 Channel 实现原理精要
- 方便的IE源代码工具Instant Source
- mysql source 命令导入大的sql文件的方法
- Mysql导入导出工具Mysqldump和Source命令用法详解
- Java中channel用法总结
- Flume环境部署和配置详解及案例大全
- go语言channel实现多核并行化运行的方法
- Windows 下Spark 快速搭建Spark源码阅读环境
- Dalian University of Technology sourcelist
- Play! Akka Flume实现的完整数据收集
- log4j + flume 1.6 集成
- 论开源<2>---开源运动的国家目标
- 源码阅读——十个C开源项目
- flume自定义Interceptor
- 使用Flume聚合Tomcat 日志
- #Note# Analyzing Twitter Data with Apache Hadoo...
- apache flume 配置存储在Linux本地服务器
- 让/etc/profile文件修改后立即生效