您的位置:首页 > 其它

Flume NG源码分析(七)ChannelSelector

2015-06-18 17:07 666 查看
前几篇介绍了Flume NG Source组件的基本情况,接下来看看Channel相关的组件,Channel相关组件有:

1. Channel

2. ChannelSelector

3. Interceptor / InterceptorChain

4. ChannelProcessor

5. Transaction



这篇说说ChannelSelector。ChannelSelector的作用是为Source选择下游的Channel。有两种选择方式,复制和多路复用。所谓复制就是把Source中传递过来的Event复制给所有对应的下游的Channel。多路复用是可以把Source传递过来的Event按照不同的属性传递到不同的下游Channel中去。

下面这张图展示了ReplicatingChannelSelector的工作方式: Source中传递过来的Event会被复制到Channel1, Channel2, Channel3中去。然后不同的Channel再传递给各自下游的Sink。ChannelSelector提供了灵活的Source事件分发机制。默认的ChannelSelector就是ReplicatingChannelSelector



一个典型的配置如下: <Source1> 中的Event会被复制到下游的<Channel1>和<Channel2>中去

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating


来看看ChannelSelector的接口定义

1. setChannels方法添加和Source相关的所有的Channel进来

2. getAllChannels返回和Source相关的所有Channels

3. getRequiredChannels返回的是必须要传递的Channel,当传递失败时要通知Source

4. getOptionalChannels返回的是不是必须要传递的Channel,当传递失败时可以忽略

public interface ChannelSelector extends NamedComponent, Configurable {

  public void setChannels(List<Channel> channels);

  public List<Channel> getRequiredChannels(Event event);

  public List<Channel> getOptionalChannels(Event event);

  public List<Channel> getAllChannels();

}


ChannelSelector的类层次结构如下



AbstractChannelSelector是个抽象类,实现了ChannelSelector接口,它的作用是给ChannelSelector这个类层次结构添加属性。

public abstract class AbstractChannelSelector implements ChannelSelector {

  private List<Channel> channels;
  private String name;

  @Override
  public List<Channel> getAllChannels() {
    return channels;
  }

  @Override
  public void setChannels(List<Channel> channels) {
    this.channels = channels;
  }

  @Override
  public synchronized void setName(String name) {
    this.name = name;
  }

  @Override
  public synchronized String getName() {
    return name;
  }
。。。。。
}


ReplicatingChannelSelector会把Source传递的事件流复制给下游所有的Channel。它维护了一个requiredChannels和optionalChannels列表。在配置的optional里出现的Channel会加入到optionalChannel列表,其他的都进入requiredChannels。

public class ReplicatingChannelSelector extends AbstractChannelSelector {

  /**
   * Configuration to set a subset of the channels as optional.
   */
  public static final String CONFIG_OPTIONAL = "optional";
  List<Channel> requiredChannels = null;
  List<Channel> optionalChannels = new ArrayList<Channel>();

  @Override
  public List<Channel> getRequiredChannels(Event event) {
    /*
     * Seems like there are lot of components within flume that do not call
     * configure method. It is conceiveable that custom component tests too
     * do that. So in that case, revert to old behavior.
     */
    if(requiredChannels == null) {
      return getAllChannels();
    }
    return requiredChannels;
  }

  @Override
  public List<Channel> getOptionalChannels(Event event) {
    return optionalChannels;
  }

  @Override
  public void configure(Context context) {
    String optionalList = context.getString(CONFIG_OPTIONAL);
    requiredChannels = new ArrayList<Channel>(getAllChannels());
    Map<String, Channel> channelNameMap = getChannelNameMap();
    if(optionalList != null && !optionalList.isEmpty()) {
      for(String optional : optionalList.split("\\s+")) {
        Channel optionalChannel = channelNameMap.get(optional);
        requiredChannels.remove(optionalChannel);
        if (!optionalChannels.contains(optionalChannel)) {
          optionalChannels.add(optionalChannel);
        }
      }
    }
  }
}


MultiplexingChannelSelector可以根据Source传递过来的事件流的属性来选择相应的下游Channel。

MultiplexingChannelSelector的配置例如下面这个例子

1. 根据Event的Header里配置的<State, Value>属性

2. 假如State属性的值是CA,那么这个Event流入mem-channel-1, 如果是AZ,流入file-channel-2,如果是NY, 流入mem-channel-1和file-channel-2。其他的值(包含null)都流入到mem-channel-1

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1


MultiplexingChannelSelector维护了几个属性来对应上面的配置

private String headerName;

  private Map<String, List<Channel>> channelMapping;
  private Map<String, List<Channel>> optionalChannels;
  private List<Channel> defaultChannels;


configure()方法中会根据配置,填充channelMapping, optionalChannels和defaultChannels几个数据结构

public void configure(Context context) {
    this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME,
        DEFAULT_MULTIPLEX_HEADER);

    Map<String, Channel> channelNameMap = getChannelNameMap();

    defaultChannels = getChannelListFromNames(
        context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap);

    Map<String, String> mapConfig =
        context.getSubProperties(CONFIG_PREFIX_MAPPING);

    channelMapping = new HashMap<String, List<Channel>>();

    for (String headerValue : mapConfig.keySet()) {
      List<Channel> configuredChannels = getChannelListFromNames(
          mapConfig.get(headerValue),
          channelNameMap);

      //This should not go to default channel(s)
      //because this seems to be a bad way to configure.
      if (configuredChannels.size() == 0) {
        throw new FlumeException("No channel configured for when "
            + "header value is: " + headerValue);
      }

      if (channelMapping.put(headerValue, configuredChannels) != null) {
        throw new FlumeException("Selector channel configured twice");
      }
    }
    //If no mapping is configured, it is ok.
    //All events will go to the default channel(s).
    Map<String, String> optionalChannelsMapping =
        context.getSubProperties(CONFIG_PREFIX_OPTIONAL + ".");

    optionalChannels = new HashMap<String, List<Channel>>();
    for (String hdr : optionalChannelsMapping.keySet()) {
      List<Channel> confChannels = getChannelListFromNames(
              optionalChannelsMapping.get(hdr), channelNameMap);
      if (confChannels.isEmpty()) {
        confChannels = EMPTY_LIST;
      }
      //Remove channels from optional channels, which are already
      //configured to be required channels.

      List<Channel> reqdChannels = channelMapping.get(hdr);
      //Check if there are required channels, else defaults to default channels
      if(reqdChannels == null || reqdChannels.isEmpty()) {
        reqdChannels = defaultChannels;
      }
      for (Channel c : reqdChannels) {
        if (confChannels.contains(c)) {
          confChannels.remove(c);
        }
      }

      if (optionalChannels.put(hdr, confChannels) != null) {
        throw new FlumeException("Selector channel configured twice");
      }
    }

  }

}


填充了这几个数据结构之后,getRequiredChannels()和getOptionalChannles()的实现就很简单了

public List<Channel> getRequiredChannels(Event event) {
    String headerValue = event.getHeaders().get(headerName);
    if (headerValue == null || headerValue.trim().length() == 0) {
      return defaultChannels;
    }

    List<Channel> channels = channelMapping.get(headerValue);

    //This header value does not point to anything
    //Return default channel(s) here.
    if (channels == null) {
      channels = defaultChannels;
    }

    return channels;
  }

  @Override
  public List<Channel> getOptionalChannels(Event event) {
    String hdr = event.getHeaders().get(headerName);
    List<Channel> channels = optionalChannels.get(hdr);

    if(channels == null) {
      channels = EMPTY_LIST;
    }
    return channels;
  }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: