您的位置:首页 > 其它

Flume-1.6.0 源码分析1:ExecSource分析

2015-10-29 00:00 429 查看
http://www.aboutyun.com/thread-6844-1-1.html

http://blog.csdn.net/wsscy2004/article/details/39696095

http://www.cnblogs.com/dyllove98/p/3233758.html

http://www.blogjava.net/paulwong/archive/2013/10/31/405860.html

http://flume.apache.org/FlumeUserGuide.html

http://www.cnblogs.com/cswuyg/p/4498804.html

首先,这个类的位置:



它的测试类:



下面,我们就开始分析这个类。

首先在源代码中加入

public static void main(String[] args) {
ExecSource eSource = new ExecSource();
eSource.start();
}

编译,没问题,抛到Linux中,然后加入以下依赖包

依赖包: flume-ng-configuration类包 flume-ng-sdk类包

第三方jar: slf4j guava-r07.jar

然后就可以愉快的开始 jdb了。

[root@machine1 flume-ng-exec-source]# jdb org.apache.flume.source.ExecSource
Initializing jdb ...
> stop in org.apache.flume.source.ExecSource.main
Deferring breakpoint org.apache.flume.source.ExecSource.main.
It will be set after the class is loaded.
> run
run org.apache.flume.source.ExecSource
Set uncaught java.lang.Throwable
Set deferred uncaught java.lang.Throwable
>
VM Started: Set deferred breakpoint org.apache.flume.source.ExecSource.main
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Breakpoint hit: "thread=main", org.apache.flume.source.ExecSource.main(), line=168 bci=0
168      ExecSource eSource = new ExecSource();
main[1]

下面开始进入真正的源码分析阶段:

具体细节不一一讲述。

首先,

if (shell != null) {
//shell="";
String[] commandArgs = formulateShellCommand(shell, command);
process = Runtime.getRuntime().exec(commandArgs);
//process=Runtime.getRuntime().exec(command);
} else {
String[] commandArgs = command.split("\\s+");
process = new ProcessBuilder(commandArgs).start();
}

通过这样的方式,打开一个子进程,

reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));

// StderrLogger dies as soon as the input stream is invalid
StderrReader stderrReader = new StderrReader(
new BufferedReader(new InputStreamReader(process.getErrorStream(), charset)), logStderr);
这样拿到两个流,第一个流是内容,第2个流是进程的错误输出。

重点是内容怎么处理。

while ((line = reader.readLine()) != null) {
synchronized (eventList) {
sourceCounter.incrementEventReceivedCount();
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if (eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}

synchronized (eventList) {
if (!eventList.isEmpty()) {
flushEventBatch(eventList);
}
}

一行一行的读,然后放入eventList,必要时做刷新---大小足够多,或者超时,或者结束了。

private void flushEventBatch(List<Event> eventList) {
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}

这个就是刷新的过程,实际上就是交给 channelProcessor处理。

==========================================

同时还启动了一个线程做定时刷新

future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if (!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
}, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

没啥好说的,太简单。

================

错误处理

// StderrLogger dies as soon as the input stream is invalid
StderrReader stderrReader = new StderrReader(
new BufferedReader(new InputStreamReader(process.getErrorStream(), charset)), logStderr);
stderrReader.setName("StderrReader-[" + command + "]");
stderrReader.setDaemon(true);
stderrReader.start();

实际上内容为

@Override
public void run() {
try {
int i = 0;
String line = null;
while ((line = input.readLine()) != null) {
if (logStderr) {
// There is no need to read 'line' with a charset
// as we do not to propagate it.
// It is in UTF-16 and would be printed in UTF-8 format.
logger.info("StderrLogger[{}] = '{}'", ++i, line);
}
}
} catch (IOException e) {
logger.info("StderrLogger exiting", e);
} finally {
try {
if (input != null) {
input.close();
}
} catch (IOException ex) {
logger.error("Failed to close stderr reader for exec source", ex);
}
}

就是有错误输出时,打印出内容。

好久不玩C了,对文件描述符的切换都有点不太利落了,随便看吧。

==========================================

最后,如果

} while (restart);

设置了重启的话,就再从头执行一遍,总体来说,很简单!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Flume 1.6.0