Flume NG源码分析(七)ChannelSelector
2015-06-18 17:07
666 查看
前几篇介绍了Flume NG Source组件的基本情况,接下来看看Channel相关的组件,Channel相关组件有:
1. Channel
2. ChannelSelector
3. Interceptor / InterceptorChain
4. ChannelProcessor
5. Transaction
这篇说说ChannelSelector。ChannelSelector的作用是为Source选择下游的Channel。有两种选择方式,复制和多路复用。所谓复制就是把Source中传递过来的Event复制给所有对应的下游的Channel。多路复用是可以把Source传递过来的Event按照不同的属性传递到不同的下游Channel中去。
下面这张图展示了ReplicatingChannelSelector的工作方式: Source中传递过来的Event会被复制到Channel1, Channel2, Channel3中去。然后不同的Channel再传递给各自下游的Sink。ChannelSelector提供了灵活的Source事件分发机制。默认的ChannelSelector就是ReplicatingChannelSelector
一个典型的配置如下: <Source1> 中的Event会被复制到下游的<Channel1>和<Channel2>中去
来看看ChannelSelector的接口定义
1. setChannels方法添加和Source相关的所有的Channel进来
2. getAllChannels返回和Source相关的所有Channels
3. getRequiredChannels返回的是必须要传递的Channel,当传递失败时要通知Source
4. getOptionalChannels返回的是不是必须要传递的Channel,当传递失败时可以忽略
ChannelSelector的类层次结构如下
AbstractChannelSelector是个抽象类,实现了ChannelSelector接口,它的作用是给ChannelSelector这个类层次结构添加属性。
ReplicatingChannelSelector会把Source传递的事件流复制给下游所有的Channel。它维护了一个requiredChannels和optionalChannels列表。在配置的optional里出现的Channel会加入到optionalChannel列表,其他的都进入requiredChannels。
MultiplexingChannelSelector可以根据Source传递过来的事件流的属性来选择相应的下游Channel。
MultiplexingChannelSelector的配置例如下面这个例子
1. 根据Event的Header里配置的<State, Value>属性
2. 假如State属性的值是CA,那么这个Event流入mem-channel-1, 如果是AZ,流入file-channel-2,如果是NY, 流入mem-channel-1和file-channel-2。其他的值(包含null)都流入到mem-channel-1
MultiplexingChannelSelector维护了几个属性来对应上面的配置
configure()方法中会根据配置,填充channelMapping, optionalChannels和defaultChannels几个数据结构
填充了这几个数据结构之后,getRequiredChannels()和getOptionalChannles()的实现就很简单了
1. Channel
2. ChannelSelector
3. Interceptor / InterceptorChain
4. ChannelProcessor
5. Transaction
这篇说说ChannelSelector。ChannelSelector的作用是为Source选择下游的Channel。有两种选择方式,复制和多路复用。所谓复制就是把Source中传递过来的Event复制给所有对应的下游的Channel。多路复用是可以把Source传递过来的Event按照不同的属性传递到不同的下游Channel中去。
下面这张图展示了ReplicatingChannelSelector的工作方式: Source中传递过来的Event会被复制到Channel1, Channel2, Channel3中去。然后不同的Channel再传递给各自下游的Sink。ChannelSelector提供了灵活的Source事件分发机制。默认的ChannelSelector就是ReplicatingChannelSelector
一个典型的配置如下: <Source1> 中的Event会被复制到下游的<Channel1>和<Channel2>中去
# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2> # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2> <Agent>.sources.<Source1>.selector.type = replicating
来看看ChannelSelector的接口定义
1. setChannels方法添加和Source相关的所有的Channel进来
2. getAllChannels返回和Source相关的所有Channels
3. getRequiredChannels返回的是必须要传递的Channel,当传递失败时要通知Source
4. getOptionalChannels返回的是不是必须要传递的Channel,当传递失败时可以忽略
public interface ChannelSelector extends NamedComponent, Configurable { public void setChannels(List<Channel> channels); public List<Channel> getRequiredChannels(Event event); public List<Channel> getOptionalChannels(Event event); public List<Channel> getAllChannels(); }
ChannelSelector的类层次结构如下
AbstractChannelSelector是个抽象类,实现了ChannelSelector接口,它的作用是给ChannelSelector这个类层次结构添加属性。
public abstract class AbstractChannelSelector implements ChannelSelector { private List<Channel> channels; private String name; @Override public List<Channel> getAllChannels() { return channels; } @Override public void setChannels(List<Channel> channels) { this.channels = channels; } @Override public synchronized void setName(String name) { this.name = name; } @Override public synchronized String getName() { return name; } 。。。。。 }
ReplicatingChannelSelector会把Source传递的事件流复制给下游所有的Channel。它维护了一个requiredChannels和optionalChannels列表。在配置的optional里出现的Channel会加入到optionalChannel列表,其他的都进入requiredChannels。
public class ReplicatingChannelSelector extends AbstractChannelSelector { /** * Configuration to set a subset of the channels as optional. */ public static final String CONFIG_OPTIONAL = "optional"; List<Channel> requiredChannels = null; List<Channel> optionalChannels = new ArrayList<Channel>(); @Override public List<Channel> getRequiredChannels(Event event) { /* * Seems like there are lot of components within flume that do not call * configure method. It is conceiveable that custom component tests too * do that. So in that case, revert to old behavior. */ if(requiredChannels == null) { return getAllChannels(); } return requiredChannels; } @Override public List<Channel> getOptionalChannels(Event event) { return optionalChannels; } @Override public void configure(Context context) { String optionalList = context.getString(CONFIG_OPTIONAL); requiredChannels = new ArrayList<Channel>(getAllChannels()); Map<String, Channel> channelNameMap = getChannelNameMap(); if(optionalList != null && !optionalList.isEmpty()) { for(String optional : optionalList.split("\\s+")) { Channel optionalChannel = channelNameMap.get(optional); requiredChannels.remove(optionalChannel); if (!optionalChannels.contains(optionalChannel)) { optionalChannels.add(optionalChannel); } } } } }
MultiplexingChannelSelector可以根据Source传递过来的事件流的属性来选择相应的下游Channel。
MultiplexingChannelSelector的配置例如下面这个例子
1. 根据Event的Header里配置的<State, Value>属性
2. 假如State属性的值是CA,那么这个Event流入mem-channel-1, 如果是AZ,流入file-channel-2,如果是NY, 流入mem-channel-1和file-channel-2。其他的值(包含null)都流入到mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
MultiplexingChannelSelector维护了几个属性来对应上面的配置
private String headerName; private Map<String, List<Channel>> channelMapping; private Map<String, List<Channel>> optionalChannels; private List<Channel> defaultChannels;
configure()方法中会根据配置,填充channelMapping, optionalChannels和defaultChannels几个数据结构
public void configure(Context context) { this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME, DEFAULT_MULTIPLEX_HEADER); Map<String, Channel> channelNameMap = getChannelNameMap(); defaultChannels = getChannelListFromNames( context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap); Map<String, String> mapConfig = context.getSubProperties(CONFIG_PREFIX_MAPPING); channelMapping = new HashMap<String, List<Channel>>(); for (String headerValue : mapConfig.keySet()) { List<Channel> configuredChannels = getChannelListFromNames( mapConfig.get(headerValue), channelNameMap); //This should not go to default channel(s) //because this seems to be a bad way to configure. if (configuredChannels.size() == 0) { throw new FlumeException("No channel configured for when " + "header value is: " + headerValue); } if (channelMapping.put(headerValue, configuredChannels) != null) { throw new FlumeException("Selector channel configured twice"); } } //If no mapping is configured, it is ok. //All events will go to the default channel(s). Map<String, String> optionalChannelsMapping = context.getSubProperties(CONFIG_PREFIX_OPTIONAL + "."); optionalChannels = new HashMap<String, List<Channel>>(); for (String hdr : optionalChannelsMapping.keySet()) { List<Channel> confChannels = getChannelListFromNames( optionalChannelsMapping.get(hdr), channelNameMap); if (confChannels.isEmpty()) { confChannels = EMPTY_LIST; } //Remove channels from optional channels, which are already //configured to be required channels. List<Channel> reqdChannels = channelMapping.get(hdr); //Check if there are required channels, else defaults to default channels if(reqdChannels == null || reqdChannels.isEmpty()) { reqdChannels = defaultChannels; } for (Channel c : reqdChannels) { if (confChannels.contains(c)) { confChannels.remove(c); } } if (optionalChannels.put(hdr, confChannels) != null) { throw new FlumeException("Selector channel configured twice"); } } } }
填充了这几个数据结构之后,getRequiredChannels()和getOptionalChannles()的实现就很简单了
public List<Channel> getRequiredChannels(Event event) { String headerValue = event.getHeaders().get(headerName); if (headerValue == null || headerValue.trim().length() == 0) { return defaultChannels; } List<Channel> channels = channelMapping.get(headerValue); //This header value does not point to anything //Return default channel(s) here. if (channels == null) { channels = defaultChannels; } return channels; } @Override public List<Channel> getOptionalChannels(Event event) { String hdr = event.getHeaders().get(headerName); List<Channel> channels = optionalChannels.get(hdr); if(channels == null) { channels = EMPTY_LIST; } return channels; }
相关文章推荐
- Java-马士兵设计模式学习笔记-工厂模式-简单工厂
- 文本框只同意输入数字
- Oracle自动提交,主键自定生成
- Oracle 性能报告 ASH
- 今目标文荣演讲:免费的力量
- MySql怎样追踪用户操作(增删改)记录
- 【弱省胡策】Round #7 Rectangle 解题报告
- StringBuilder与StringBuffer的区别(转)
- 数据结构实验之栈四:括号匹配
- Android clean后出现很多错误解决办法
- Generalized Low Rank Approximation of Matrices
- Generalized Low Rank Approximation of Matrices
- Javascript之自定义事件
- 【弱省胡策】Round #7 Rectangle 解题报告
- 某班的成绩出来了,现在老师要把班级的成绩打印出来,和 显示当前时间
- 使用CAShapeLayer与UIBezierPath画出想要的图形
- 数据结构实验之栈四:括号匹配 分类: 栈和队列 2015-06-18 17:06 13人阅读 评论(0) 收藏
- Pyqt 控件的信号槽事件定义方法
- Pthon MySQLdb 的安装
- KVM网桥配置