您的位置:首页 > 运维架构 > 网站架构

flume源码分析一:总体架构

2016-11-02 23:51 281 查看
flume使用maven管理,层级呈树状结构,树根节点是flume-parent项目,该项目是整个flume的父项目,其他项目集成于该项目,打开这个项目的pom文件,可以看到子模块:

<modules>
<module>flume-ng-core</module>
<module>flume-ng-configuration</module>
<module>flume-ng-embedded-agent</module>
<module>flume-ng-sinks</module>
<module>flume-ng-sources</module>
<module>flume-ng-node</module>
<module>flume-ng-dist</module>
<module>flume-ng-channels</module>
<module>flume-ng-legacy-sources</module>
<module>flume-ng-clients</module>
<module>flume-ng-sdk</module>
<module>flume-ng-tests</module>
<module>flume-tools</module>
<module>flume-ng-auth</module>
</modules>


其中一些核心模块:

flume-ng-core:核心包,主要定义该项目的一些核心接口。

flume-ng-configuration:系统配置相关

flume-ng-sinks:sink组件的父工程,里面包含了各种具体sink实现工程。

flume-ng-sources:source组件的父工程,里面包含了各种具体source实现工程。

flume-ng-node:flume的入口类Application中包含了main方法。

flume-ng-channels:channel组件的父工程,里面包含了各种具体channel实现工程。

其他还有一些测试,日志,辅助包等工程。

首先看core这个核心包。这个包基本上都是接口,通过这些接口搭建起了flume的核心骨架,这其中最最重要的莫过于flume的三大组件source,channel,sink了,该三个接口都在org.apache.flume包下,下面分别查看这三个接口,首先看source:

public interface Source extends LifecycleAware, NamedComponent {

/**
* Specifies which channel processor will handle this source's events.
*
* @param channelProcessor
* 设置channel处理器
*/
public void setChannelProcessor(ChannelProcessor channelProcessor);

/**
* Returns the channel processor that will handle this source's events.
* 获取channel处理器
*/
public ChannelProcessor getChannelProcessor();

}
可以看到,该接口只用两个方法:获取channelprocessor和设置channelProcessor,我们知道source收集的数据需要放到channel中,而这个channelprocessor就是提供了put event进入channel的一个接口供source使用。下面看channel接口:

public interface Channel extends LifecycleAware, NamedComponent {

/**
* <p>Puts the given event into the channel.</p>
* <p><strong>Note</strong>: This method must be invoked within an active
* {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
* @param event the event to transport.
* @throws ChannelException in case this operation fails.
* @see org.apache.flume.Transaction#begin()
*/
public void put(Event event) throws ChannelException;

/**
* <p>Returns the next event from the channel if available. If the channel
* does not have any events available, this method must return {@code null}.
* </p>
* <p><strong>Note</strong>: This method must be invoked within an active
* {@link Transaction} boundary. Failure to do so can lead to unpredictable
* results.</p>
* @return the next available event or {@code null} if no events are
* available.
* @throws ChannelException in case this operation fails.
* @see org.apache.flume.Transaction#begin()
*/
public Event take() throws ChannelException;

/**
* @return the transaction instance associated with this channel.
*/
public Transaction getTransaction();
}


可以看到,channel接口只定义了三个方法:put event进channel,take event出channel,还有一个记录状态的方法。这也是与channel做为source和sink的中间通道的功能相符。最后看sink组件接口:

public interface Sink extends LifecycleAware, NamedComponent {
/**
* <p>Sets the channel the sink will consume from</p>
* @param channel The channel to be polled
*/
public void setChannel(Channel channel);

/**
* @return the channel associated with this sink
*/
public Channel getChannel();

/**
* <p>Requests the sink to attempt to consume data from attached channel</p>
* <p><strong>Note</strong>: This method should be consuming from the channel
* within the bounds of a Transaction. On successful delivery, the transaction
* should be committed, and on failure it should be rolled back.
* @return READY if 1 or more Events were successfully delivered, BACKOFF if
* no data could be retrieved from the channel feeding this sink
* @throws EventDeliveryException In case of any kind of failure to
* deliver data to the next hop destination.
*/
public Status process() throws EventDeliveryException;

public static enum Status {
READY, BACKOFF
}
}


sink接口也是非常简单,由于sink需要与channel交互,从channel中取出数据,所以该接口定义了存取channel的两个方法,另外一个process方法负责具体的sink实现,可以sink到不同的目的地,由其子类实现。最后一个状态信息的enum。另外顺便分析一下org.apache.flume这个包的其他接口(类):

ChannelException:自定义了channel的一个异常,其实就是继承了exception,不断是super父类方法。

ChannelFactory:顾名思义,channel的工厂类,当然是生产channel的了,主要是create(String name, String type)这个方法用于生产channel。

ChannelFullException:channel满了的时候抛出此异常,满了要么就是sink消费太慢,而source生产太快,另外也可能是由于channel太小。

ChannelSelector:channel选择器,这个选择器是给source使用的,source在发数据(event)的时候,有两种选择:复制和多路复用,复制就是把source中的event复制多份,该source连接了多少个channel就复制给几个channel,多路复用则是按照不同的策略不同event可能发给不同的channel。

clock接口:当前时间,好像没啥用,打酱油的吧。

CounterGroup:收集一些信息,做一些计数功能。

EventDrivenSource:继承自source,source的一种,没有单独的线程来驱动的没有process方法。AbstractEventDrivenSource,avrosource,HTTPSource等都是基于此。应该就是接收别人推过来的数据吧。

NamedComponent:给组件取个名字吧,还能改他的名,获取他的名。

PollableSource:与上面的EventDrivenSource正好相反,有自己线程来驱动的需要实现process方法,比如kafkasource就是这种。

SinkFactory:不用多说,产sink的。

SinkProcessor:该接口与SinkGroup共创大业,sinkgroup内,sinkprocessor提供了默认的processor,负载均衡processor,failover processor。

SinkRunner:该包中难得一见的实体类,SinkRunner可能对应一个sink也可能对应一个sinkgroup。因为如果配置文件中有sinkgroup则这个sinkgroup对应的sink会组成一个group然后封装为一个sinkRunner,然后不在sinkgroup中的sink会自己成为一个sinkRunner。每个SinkRunner的构造方法的参数是一个SinkProcessor是用来处理多个sink的。

sourcefactory:略

SourceRunner:目前有两大类PollableSourceRunner和EventDrivenSourceRunner,分别对应PollableSource和EventDrivenSource,PollableSource相关类需要外部驱动来确定source中是否有消息可以使用,而EventDrivenSource相关类不需要外部驱动,自己实现了事件驱动机制,目前常见的Source类都属于EventDrivenSource类型。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: