【Flume】【源码分析】flume中ExecSource源码的详细分析——执行终端命令获取数据
2015-02-05 10:17
567 查看
我们直接看该Source的start方法吧
它是一个线程,实现了Runnable接口,所以直接看它重写的run方法的逻辑,我们一块一块来看:
如果读入的内容非空,先同步eventList,如果eventList超出一定范围未做处理就会flush
还有一个flush的地方
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if(!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);这是启动了一个线程,每个3000毫秒去检查eventList,如果有event或者超过3秒存留的event,将会送到通道中
private void flushEventBatch(List<Event> eventList){
1、将event都放入配置的通道中
配置为true的话,就会将刚才的所有代码循环一遍
总结:
1、event如何产出的?
public void start() { logger.info("Exec source starting with command:{}", command); executor = Executors.newSingleThreadExecutor(); runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset); // FIXME: Use a callback-like executor / future to signal us upon failure. runnerFuture = executor.submit(runner); /* * NB: This comes at the end rather than the beginning of the method because * it sets our state to running. We want to make sure the executor is alive * and well first. */ sourceCounter.start(); super.start(); logger.debug("Exec source started"); }启动了一个线程来运行,运行的详细过程看runner
它是一个线程,实现了Runnable接口,所以直接看它重写的run方法的逻辑,我们一块一块来看:
if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset));这里就是执行shell命令,并且将shell命令的输出结果作为输入流读到reader中,InputStreamReader是字节流通向字符流的桥梁,它使用指定的charset读取字节并将其解码为字符,每次调用read方法都会从底层输入流读取一个或多个字节。
while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } }
如果读入的内容非空,先同步eventList,如果eventList超出一定范围未做处理就会flush
还有一个flush的地方
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if(!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);这是启动了一个线程,每个3000毫秒去检查eventList,如果有event或者超过3秒存留的event,将会送到通道中
private void flushEventBatch(List<Event> eventList){
channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); }flush就是现将积攒下来的eventList中的event都处理掉,然后清空
1、将event都放入配置的通道中
for (Event event : events) { List<Channel> reqChannels = selector.getRequiredChannels(event); for (Channel ch : reqChannels) { List<Event> eventQueue = reqChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList<Event>(); reqChannelQueue.put(ch, eventQueue); } eventQueue.add(event); } List<Channel> optChannels = selector.getOptionalChannels(event); for (Channel ch: optChannels) { List<Event> eventQueue = optChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList<Event>(); optChannelQueue.put(ch, eventQueue); } eventQueue.add(event); } }这里就是将event放到通道中的详细过程了,但是这里大家注意到有两次selector的getchannel的方法,这是因为通道的选择器模式有两种:复用和复制
if(restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { logger.info("Command [" + command + "] exited with " + exitCode); } } while(restart);restart参数的含义是,当shell命令执行的时候进程死了,是否重启该命令的进程,默认是false
配置为true的话,就会将刚才的所有代码循环一遍
总结:
1、event如何产出的?
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
public static Event withBody(byte[] body, Map<String, String> headers) { Event event = new SimpleEvent(); if(body == null) { body = new byte[0]; } event.setBody(body); if (headers != null) { event.setHeaders(new HashMap<String, String>(headers)); } return event; }2、event如何放入通道?
private void flushEventBatch(List<Event> eventList){ channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); }
相关文章推荐
- Flume 1.7 源码分析(四)从Source写数据到Channel
- Flume-1.6.0 源码分析1:ExecSource分析
- Flume 1.7 源码分析(五)从Channel获取数据写入Sink
- child_process spawn 开辟子进程,执行终端命令,获取数据
- 【Flume】【源码分析】深入flume-ng的三大组件——source,channel,sink
- Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志
- Exec Source使用及源码分析
- C编程获取指定网卡网络数据包并分析(附C语言源码)
- CloudFoundry源码分析:Cloud Controller(2) vmc push命令执行过程分析
- Docker源码分析(二):Docker Client创建与命令执行
- PostgreSQL源码分析(2)– 常用数据类型/SQL语句的解释和执行
- flume源码学习4-SourceRunner与ExecSource实现
- linux下shell脚本执行方法及exec和source命令
- mapreduce任务执行过程详细分析--源码级分析
- qq农场js外挂详细制作(提供源码、有注释、有抓包数据分析、不再更新、不回答提问)
- java执行批处理命令 获取返回 数据
- linux下shell脚本执行方法及exec和source命令
- Docker源码分析(二):Docker Client创建与命令执行
- C编程获取指定网卡网络数据包并分析(附C语言源码)
- C编程获取指定网卡网络数据包并分析(附C语言源码)