您的位置:首页 > 其它

flume-ng 启动过程源码分析

2015-10-29 00:00 483 查看
通过bin/flume-ng 脚本可以看到Flume启动的入口是org.apache.flume.node.Application类,那么就从Application类开始研究。

CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);

File configurationFile = new File(commandLine.getOptionValue('f'));
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");

读取命令行参数,f指定配置文件,n指定agent name ,no-reload-conf 参数决定是否采用动态加载配置文件,若没有配置,reload为true,采用动态加载,否则只加载一次。

List<LifecycleAware> components = Lists.newArrayList();
Application application;
if(reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(agentName,
configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
application.start();

实现动态加载功能采用了发布订阅模式,使用guava中的EventBus实现。

关于EventBus可以参考:http://my.oschina.net/u/2311010/blog/515188

接下来继续看EventBus是如何实现动态加载的。

LifecycleAware生命周期组件,有start(),stop(),getLifecycleState()三个方法,source,channel,sink都实现了该接口



components.add(configurationProvider);添加PollingPropertiesFileConfigurationProvider对象,目前components只添加了一个对象,并作为参数传递给Application的构造方法,接下来看下Application的start()。

public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}//此处的component 就是上面的PollingPropertiesFileConfigurationProvider对象

supervise 方法会对 component 创建一个 MonitorRunnable 进程,并放入默认有10个线程的 monitorService 去执行

Supervisoree process = new Supervisoree();
process.status = new Status();

process.policy = policy;
process.status.desiredState = desiredState;
process.status.error = false;

MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;

supervisedProcesses.put(lifecycleAware, process);

ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
//定时调度MonitorRunnable
monitorFutures.put(lifecycleAware, future);
//MonitorRunnable返回的结果保存到monitorFutures

接下来看MonitorRunnable的run(),根据supervisoree.status.desiredState的状态去调用lifecycleAware的start或者stop方法,此处的lifecycleAware就是PollingPropertiesFileConfigurationProvider对象

public void start() {
LOGGER.info("Configuration provider starting");

Preconditions.checkState(file != null,
"The parameter file must not be null");

executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
.build());

FileWatcherRunnable fileWatcherRunnable =
new FileWatcherRunnable(file, counterGroup);

executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
TimeUnit.SECONDS);

lifecycleState = LifecycleState.START;

LOGGER.debug("Configuration provider started");
}

PollingPropertiesFileConfigurationProvider的start()创建一个单线程,每隔30s执行FileWatcherRunnable的run方法。

public void run() {
LOGGER.debug("Checking file:{} for changes", file);

counterGroup.incrementAndGet("file.checks");

long lastModified = file.lastModified();

if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);

counterGroup.incrementAndGet("file.loads");

lastChange = lastModified;

try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}

如果配置文件最后的修改时间晚于文件上次的修改时间,则会调用eventBus.post(getConfiguration());

getConfiguration()会读取配置文件,把sources,channel,sink相信的配置信息保存在SimpleMaterializedConfiguration对象中,并返回

public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames =
new HashSet<String>(channelComponentMap.keySet());
for(String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.
get(channelName);
if(channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap = channelCache.
get(channelComponent.channel.getClass());
if(nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}

eventBus.post(getConfiguration())会调用Application 的handleConfigurationEvent方法

public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);
}

其中stopAllComponents()用来停止正在运行的组件,顺序是:source、sink、channel,这样可以避免停止组件导致的数据丢失。

startAllComponents(conf)会根据返回的配置文件内容启动所有组件,启动顺序正好于停止顺序相反

private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
logger.info("Starting new configuration:{}", materializedConfiguration);

this.materializedConfiguration = materializedConfiguration;

for (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
try{
logger.info("Starting Channel " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e){
logger.error("Error while starting {}", entry.getValue(), e);
}
}

/*
* Wait for all channels to start.
*/
for(Channel ch: materializedConfiguration.getChannels().values()){
while(ch.getLifecycleState() != LifecycleState.START
&& !supervisor.isComponentInErrorState(ch)){
try {
logger.info("Waiting for channel: " + ch.getName() +
" to start. Sleeping for 500 ms");
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for channel to start.", e);
Throwables.propagate(e);
}
}
}

for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners()
.entrySet()) {
try{
logger.info("Starting Sink " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}

for (Entry<String, SourceRunner> entry : materializedConfiguration
.getSourceRunners().entrySet()) {
try{
logger.info("Starting Source " + entry.getKey());
supervisor.supervise(entry.getValue(),
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
} catch (Exception e) {
logger.error("Error while starting {}", entry.getValue(), e);
}
}

this.loadMonitoring();
}

启动组件会调用supervisor.supervise()方法,supervise方法会调用对应的lifecycleAware的start()方法。

flume的启动和动态加载就已经分析完毕了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息