您的位置:首页 > 其它

【Flume】【源码分析】flume中ExecSource源码的详细分析——执行终端命令获取数据

2015-02-05 10:17 567 查看
我们直接看该Source的start方法吧

public void start() {
logger.info("Exec source starting with command:{}", command);

executor = Executors.newSingleThreadExecutor();

runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);

// FIXME: Use a callback-like executor / future to signal us upon failure.
runnerFuture = executor.submit(runner);

/*
* NB: This comes at the end rather than the beginning of the method because
* it sets our state to running. We want to make sure the executor is alive
* and well first.
*/
sourceCounter.start();
super.start();

logger.debug("Exec source started");
}
启动了一个线程来运行,运行的详细过程看runner

它是一个线程,实现了Runnable接口,所以直接看它重写的run方法的逻辑,我们一块一块来看:

if(shell != null) {
String[] commandArgs = formulateShellCommand(shell, command);
process = Runtime.getRuntime().exec(commandArgs);
}  else {
String[] commandArgs = command.split("\\s+");
process = new ProcessBuilder(commandArgs).start();
}
reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), charset));
这里就是执行shell命令,并且将shell命令的输出结果作为输入流读到reader中,InputStreamReader是字节流通向字符流的桥梁,它使用指定的charset读取字节并将其解码为字符,每次调用read方法都会从底层输入流读取一个或多个字节。

while ((line = reader.readLine()) != null) {
synchronized (eventList) {
sourceCounter.incrementEventReceivedCount();
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if(eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}

如果读入的内容非空,先同步eventList,如果eventList超出一定范围未做处理就会flush

还有一个flush的地方

future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if(!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);这是启动了一个线程,每个3000毫秒去检查eventList,如果有event或者超过3秒存留的event,将会送到通道中

private void flushEventBatch(List<Event> eventList){
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}
flush就是现将积攒下来的eventList中的event都处理掉,然后清空

1、将event都放入配置的通道中

for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event);

for (Channel ch : reqChannels) {
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}

List<Channel> optChannels = selector.getOptionalChannels(event);

for (Channel ch: optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}

eventQueue.add(event);
}
}
这里就是将event放到通道中的详细过程了,但是这里大家注意到有两次selector的getchannel的方法,这是因为通道的选择器模式有两种:复用和复制

if(restart) {
logger.info("Restarting in {}ms, exit code {}", restartThrottle,
exitCode);
try {
Thread.sleep(restartThrottle);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
logger.info("Command [" + command + "] exited with " + exitCode);
}
} while(restart);
restart参数的含义是,当shell命令执行的时候进程死了,是否重启该命令的进程,默认是false

配置为true的话,就会将刚才的所有代码循环一遍

总结:

1、event如何产出的?

eventList.add(EventBuilder.withBody(line.getBytes(charset)));
public static Event withBody(byte[] body, Map<String, String> headers) {
Event event = new SimpleEvent();

if(body == null) {
body = new byte[0];
}
event.setBody(body);

if (headers != null) {
event.setHeaders(new HashMap<String, String>(headers));
}

return event;
}
2、event如何放入通道?

private void flushEventBatch(List<Event> eventList){
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  execsource flume