Flume NG源码分析(四)使用ExecSource从本地日志文件中收集日志
2015-06-16 16:08
603 查看
常见的日志收集方式有两种,一种是经由本地日志文件做媒介,异步地发送到远程日志仓库,一种是基于RPC方式的同步日志收集,直接发送到远程日志仓库。这篇讲讲Flume NG如何从本地日志文件中收集日志。
ExecSource是用来执行本地shell命令,并把本地日志文件中的数据封装成Event事件流在Flume NG中流动。它的典型配置如下,指定source类型是exec,指定Source下游的Channel是哪个,指定要执行的shell命令。最常用的命令就是tail -F命令,可以从本地日志文件中获取新追加的日志。
看一下ExecSource的实现流程
1. ExecSource维护了一个单线程的线程池executor,以及配置的shell命令,计数器等属性
2. ExecRunnable对象实现了Runnable接口,被executor线程池执行。 ExecRunnable实现了获取本地日志的主要流程
3. ExecRunnable维护了一个定时执行的线程池timedFlushService,定时去检查Event列表,如果符合批量输出的要求,就批量flush event
4. ExecRunnable使用Runtime.getRuntime().exec以及java.lang.ProcessBuilder来使用Java平台执行操作系统的Shell命令,并把这个Shell命令创建的进程的输出流重定向到Java平台的流,从而在Java平台可以获取到本地日志文件的数据。这里的Shell命令是tail -F
这里最主要的是步骤是在Java平台中使用Shell命令来获取本地日志文件的数据,主要的代码如下
将java.lang.Process代表的本地进程的输出流重定向到Java的输入流中,当tail -F没有数据时,Java输入流的reader.readLine会阻塞,直到有新数据到达。获取到新数据后,首先是将数据封装成Event,如果超过了批量限制,就flushEventBatch
flushEventBatch会将Event列表交给ChannelProcessor批量处理。
ExecSource是异步收集本地日志的实现,它不保证可靠性,比如Java平台创建的tail -F进程出问题了,那么目标日志文件的收集会收到影响。ExecSource的好处是性能比RPC方式要好,减少了网络的流量,同时避免了对应用程序的倾入性,可以无缝地接入。
ExecSource是用来执行本地shell命令,并把本地日志文件中的数据封装成Event事件流在Flume NG中流动。它的典型配置如下,指定source类型是exec,指定Source下游的Channel是哪个,指定要执行的shell命令。最常用的命令就是tail -F命令,可以从本地日志文件中获取新追加的日志。
producer.sources.s1.type = exec producer.sources.s1.channels = channel producer.sources.s1.command = tail -F /data/logs/test.log
看一下ExecSource的实现流程
1. ExecSource维护了一个单线程的线程池executor,以及配置的shell命令,计数器等属性
2. ExecRunnable对象实现了Runnable接口,被executor线程池执行。 ExecRunnable实现了获取本地日志的主要流程
3. ExecRunnable维护了一个定时执行的线程池timedFlushService,定时去检查Event列表,如果符合批量输出的要求,就批量flush event
4. ExecRunnable使用Runtime.getRuntime().exec以及java.lang.ProcessBuilder来使用Java平台执行操作系统的Shell命令,并把这个Shell命令创建的进程的输出流重定向到Java平台的流,从而在Java平台可以获取到本地日志文件的数据。这里的Shell命令是tail -F
这里最主要的是步骤是在Java平台中使用Shell命令来获取本地日志文件的数据,主要的代码如下
// ExecRuannable.run() try { if(shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // 当tail -F没有数据时,reader.readLine会阻塞,直到有数据到达 while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); eventList.add(EventBuilder.withBody(line.getBytes(charset))); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } }
将java.lang.Process代表的本地进程的输出流重定向到Java的输入流中,当tail -F没有数据时,Java输入流的reader.readLine会阻塞,直到有新数据到达。获取到新数据后,首先是将数据封装成Event,如果超过了批量限制,就flushEventBatch
flushEventBatch会将Event列表交给ChannelProcessor批量处理。
// EventBuilder.withBdoy public static Event withBody(byte[] body, Map<String, String> headers) { Event event = new SimpleEvent(); if(body == null) { body = new byte[0]; } event.setBody(body); if (headers != null) { event.setHeaders(new HashMap<String, String>(headers)); } return event; } // ExecSource.flushEventBatch private void flushEventBatch(List<Event> eventList){ channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); }
ExecSource是异步收集本地日志的实现,它不保证可靠性,比如Java平台创建的tail -F进程出问题了,那么目标日志文件的收集会收到影响。ExecSource的好处是性能比RPC方式要好,减少了网络的流量,同时避免了对应用程序的倾入性,可以无缝地接入。
相关文章推荐
- 接口测试-自动化-Java实现-HttpUtil
- 创建一个div标签
- phpcms v9最常用的22个调用代码
- appcompat_v7\res\values-v21\themes_base.xml:191: error: Error: No resource found that matches the gi
- 【HTTP】HTTP状态码
- jeesite 无“已删除”数据列表
- Disabling contextual LOB creation as createClob() method threw error : java.lang.reflect.InvocationT
- ABAP--SAP是如何回写CL_GUI_ALV_GRID_BASE的MT_MODIFIED_CELLS的
- Libcurl最初的实现tfp上传和下载功能
- GRE作文备考——教育的三大目的
- STL学习笔记之前篇
- gcc -rdynamic参数解释
- recommenderjob原码分析
- Html5 Viewport学习
- linux 信号signal和sigaction理解
- 魅蓝Note有几种颜色 魅蓝Note哪个颜色好看
- 使用PyInstaller打包Python程序
- selenium
- Android学习记录之------R文件的优化
- 最好用的SAT数学解题方法