您的位置:首页 > 其它

Flume-ng源码解析之Sink组件

2017-03-10 19:06 483 查看
如果你还没看过Flume-ng源码解析系列中的启动流程和Channel组件,可以点击下面链接:

Flume-ng源码解析之启动流程

Flume-ng源码解析之Channel组件

作为启动流程中第二个启动的组件,我们今天来看看Sink的细节

1 Sink


Sink在agent中扮演的角色是消费者,将event输送到特定的位置


首先依然是看代码,由代码我们可以看出Sink是一个接口,里面最主要的方法是process(),用来处理从Channel中获取的数据。Sink的实例是由SinkFactory.create()生成的。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
public Channel getChannel();
/* 用来处理channel中取来的event*/
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}

在启动流程中我们了解到Application中启动的不是Sink,而是SinkRunner,由名字我们可以看出这是一个驱动类。我们来看看代码,主要看它的start()

public class SinkRunner implements LifecycleAware {

...

@Override
public void start() {
SinkProcessor policy = getPolicy();

policy.start();

runner = new PollingRunner();

runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean();

runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start();

lifecycleState = LifecycleState.START;
}
...

}

我们知道启动SinkRunner实际上就是调用它的start(),而在start()中可以看到主要是启动了一个SinkProcessor,而这个SinkProcessor在创建SinkRunnner的时候已经指定了,如果你想要了解配置文件是如何处理的,可以要去看看conf包里面的类,可以看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。

我们接着看看SinkProcessor

public interface SinkProcessor extends LifecycleAware, Configurable {
Status process() throws EventDeliveryException;
void setSinks(List<Sink> sinks);
}

SinkProcesor是一个接口,他的实现类由SinkProcessorFactory的getProcessor()生成,在AbstractConfigurationProvider中的loadSinkGroup()调用SinkGroup中的configure()生成。

public class SinkGroup implements Configurable, ConfigurableComponent {
List<Sink> sinks;
SinkProcessor processor;
SinkGroupConfiguration conf;

public SinkGroup(List<Sink> groupSinks) {
sinks = groupSinks;
}

public SinkProcessor getProcessor() {
return processor;
}

@Override
public void configure(ComponentConfiguration conf) {
this.conf = (SinkGroupConfiguration) conf;
processor =
SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(),
sinks);
}
}

那么我们以DefalutSinkProcessor为例子看看

public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
private Sink sink;
private LifecycleState lifecycleState;

@Override
public void start() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.start();
lifecycleState = LifecycleState.START;
}

@Override
public void stop() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.stop();
lifecycleState = LifecycleState.STOP;
}

@Override
public LifecycleState getLifecycleState() {
return lifecycleState;
}

@Override
public void configure(Context context) {
}

@Override
public Status process() throws EventDeliveryException {
return sink.process();
}

@Override
public void setSinks(List<Sink> sinks) {
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
}

@Override
public void configure(ComponentConfiguration conf) {

}

}

从上面的代码中我们可以看到SinkProcessor执行的还是sink的start、stop和process方法,那么SinkProcessor的作用是什么,Flume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor,顾名思义,一个是失效备援,一个是负载均衡,那么SinkProcessor不同子类的存在就是为了实现不同的分配操作和策略。而sink的start()通常是启动线程去执行消费操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: