您的位置:首页 > 其它

flume高并发优化——(6)开发多文件检索source插件

2017-07-26 00:00 387 查看
最近遇到一个问题,就是我们在exec的命令行中输入 “tail -f /export/home/tomcat/logs/*/*.log” ,没有任何数据录入,为了解决这个问题,我们自己开发了一个flume数据接收端,主要实现的功能是通过检测一个问价夹下所有文件,通过多线程将每个文件通过tail 命令读取到channel中。

1,实现思路



2,flume规则

自定义flume source结构

public class SourceName extends AbstractSource implements EventDrivenSource, Configurable { // 获取配置 @Override public void configure(Context context) { } //开始收集数据 @Override public synchronized void start() { } //结束收集数据 @Override public synchronized void stop() { } }


或者

public class SourceName extends AbstractSource implements Configurable, PollableSource { // 获取配置 @Override public void configure(Context context)throws EventDeliveryException { return null; } //开始收集数据 @Override public synchronized void start() { } //结束收集数据 @Override public synchronized void stop() { } }


自定义fluem sink结构

public class SinkName extends AbstractSink implements Configurable { private static final Logger log = LoggerFactory.getLogger(AbstractSink.class); Context c; @Override public void configure(Context arg0) { this.c = arg0; } //循环调用输出数据 @Override public Status process() throws EventDeliveryException { return Status.READY; } //输出流开始(调用一次) @Override public synchronized void start() { super.start(); } //输出流结束(调用一次) @Override public synchronized void stop() { super.stop(); } }


3,实现

已经打好的jar包下载

/* * 作者:许恕 * 时间:2016年5月3日 * 功能:实现tail 某目录下的所有符合正则条件的文件 * Email:xvshu1@163.com * To detect all files in a folder */ package org.apache.flume.source; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.SystemClock; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation
7fe0
.SourceCounter; import org.apache.flume.tools.HostUtils; import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * step: * 1,config one path * 2,find all file with RegExp * 3,tail one children file * 4,batch to channal * * demo: * demo.sources.s1.type = org.apache.flume.source.ExecTailSource * demo.sources.s1.filepath=/export/home/tomcat/logs/ * demo.sources.s1.filenameRegExp=(.log{1})$ */ public class ExecTailSource extends AbstractSource implements EventDrivenSource, Configurable { private static final Logger logger = LoggerFactory .getLogger(ExecTailSource.class); private SourceCounter sourceCounter; private ExecutorService executor; private List<ExecRunnable> listRuners; private List<Future<?>> listFuture; private long restartThrottle; private boolean restart; private boolean logStderr; private Integer bufferCount; private long batchTimeout; private Charset charset; private String filepath; private String filenameRegExp; @Override public void start() { logger.info("ExecTail source starting with filepath:{}", filepath); List<String> listFiles = getFileList(filepath); if(listFiles==null || listFiles.isEmpty()){ Preconditions.checkState(listFiles != null && !listFiles.isEmpty(), "The filepath's file not have fiels with filenameRegExp"); } executor = Executors.newFixedThreadPool(listFiles.size()); listRuners = new ArrayList<ExecRunnable>(); listFuture = new ArrayList<Future<?>>(); logger.info("files size is {} ", listFiles.size()); // FIXME: Use a callback-like executor / future to signal us upon failure. for(String oneFilePath : listFiles){ ExecRunnable runner = new ExecRunnable(getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset,oneFilePath); listRuners.add(runner); Future<?> runnerFuture = executor.submit(runner); listFuture.add(runnerFuture); logger.info("{} is begin running",oneFilePath); } /* * NB: This comes at the end rather than the beginning of the method because * it sets our state to running. We want to make sure the executor is alive * and well first. */ sourceCounter.start(); super.start(); logger.debug("ExecTail source started"); } @Override public void stop() { if(listRuners !=null && !listRuners.isEmpty()){ for(ExecRunnable oneRunner : listRuners){ if(oneRunner != null) { oneRunner.setRestart(false); oneRunner.kill(); } } } if(listFuture !=null && !listFuture.isEmpty()){ for(Future<?> oneFuture : listFuture){ if (oneFuture != null) { logger.debug("Stopping ExecTail runner"); oneFuture.cancel(true); logger.debug("ExecTail runner stopped"); } } } executor.shutdown(); while (!executor.isTerminated()) { logger.debug("Waiting for ExecTail executor service to stop"); try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for ExecTail executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } sourceCounter.stop(); super.stop(); } @Override public void configure(Context context) { filepath = context.getString("filepath"); Preconditions.checkState(filepath != null, "The parameter filepath must be specified"); logger.info("The parameter filepath is {}" ,filepath); filenameRegExp = context.getString("filenameRegExp"); Preconditions.checkState(filenameRegExp != null, "The parameter filenameRegExp must be specified"); logger.info("The parameter filenameRegExp is {}" ,filenameRegExp); restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE); restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART, ExecSourceConfigurationConstants.DEFAULT_RESTART); logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR, ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR); bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT); charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, ExecSourceConfigurationConstants.DEFAULT_CHARSET)); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } } /** * 获取指定路径下的所有文件列表 * * @param dir 要查找的目录 * @return */ public List<String> getFileList(String dir) { List<String> listFile = new ArrayList<String>(); File dirFile = new File(dir); //如果不是目录文件,则直接返回 if (dirFile.isDirectory()) { //获得文件夹下的文件列表,然后根据文件类型分别处理 File[] files = dirFile.listFiles(); if (null != files && files.length > 0) { //根据时间排序 Arrays.sort(files, new Comparator<File>() { public int compare(File f1, File f2) { return (int) (f1.lastModified() - f2.lastModified()); } public boolean equals(Object obj) { return true; } }); for (File file : files) { //如果不是目录,直接添加 if (!file.isDirectory()) { String oneFileName = file.getName(); if(match(filenameRegExp,oneFileName)){ listFile.add(file.getAbsolutePath()); logger.info("filename:{} is pass",oneFileName); } } else { //对于目录文件,递归调用 listFile.addAll(getFileList(file.getAbsolutePath())); } } } }else{ logger.info("FilePath:{} is not Directory",dir); } return listFile; } /** * @param regex * 正则表达式字符串 * @param str * 要匹配的字符串 * @return 如果str 符合 regex的正则表达式格式,返回true, 否则返回 false; */ private boolean match(String regex, String str) { Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(str); return matcher.find(); } private static class ExecRunnable implements Runnable { public ExecRunnable( ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset,String filepath) { this.channelProcessor = channelProcessor; this.sourceCounter = sourceCounter; this.restartThrottle = restartThrottle; this.bufferCount = bufferCount; this.batchTimeout = batchTimeout; this.restart = restart; this.logStderr = logStderr; this.charset = charset; this.filepath=filepath; this.command = command+filepath; } private String command="tail -f "; private final ChannelProcessor channelProcessor; private final SourceCounter sourceCounter; private volatile boolean restart; private final long restartThrottle; private final int bufferCount; private long batchTimeout; private final boolean logStderr; private final Charset charset; private Process process = null; private SystemClock systemClock = new SystemClock(); private Long lastPushToChannel = systemClock.currentTimeMillis(); ScheduledExecutorService timedFlushService; ScheduledFuture<?> future; private String filepath; private static String getDomain(String filePath){ String[] strs = filePath.split("/"); String domain ; domain=strs[strs.length-2]; if(domain==null || domain.isEmpty()){ domain=filePath; } return domain; } @Override public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; String line = null; final List<Event> eventList = new ArrayList<Event>(); timedFlushService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat( "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); try { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // StderrLogger dies as soon as the input stream is invalid StderrReader stderrReader = new StderrReader(new BufferedReader( new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader-[" + command + "]"); stderrReader.setDaemon(true); stderrReader.start(); future = timedFlushService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { synchronized (eventList) { if(!eventList.isEmpty() && timeout()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Exception occured when processing event batch", e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } } }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); while ((line = reader.readLine()) != null) { synchronized (eventList) { sourceCounter.incrementEventReceivedCount(); HashMap body = new HashMap(); body.put("context",line.toString()); body.put("filepath", filepath); body.put("created", System.currentTimeMillis()); body.put("localHostIp", HostUtils.getLocalHostIp()); body.put("localHostName", HostUtils.getLocalHostName()); body.put("domain", getDomain(filepath)); body.put("command", command); String context = line.toString(); String bodyjson = JSON.toString(body); Event oneEvent = EventBuilder.withBody(bodyjson.getBytes(charset)); eventList.add(oneEvent); if(eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } synchronized (eventList) { if(!eventList.isEmpty()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); if(e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { logger.error("Failed to close reader for ExecTail source", ex); } } exitCode = String.valueOf(kill()); } if(restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { logger.info("Command [" + command + "] exited with " + exitCode); } } while(restart); } private void flushEventBatch(List<Event> eventList){ channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); } private boolean timeout(){ return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout; } private static String[] formulateShellCommand(String shell, String command) { String[] shellArgs = shell.split("\\s+"); String[] result = new String[shellArgs.length + 1]; System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); result[shellArgs.length] = command; return result; } public int kill() { if(process != null) { synchronized (process) { process.destroy(); try { int exitValue = process.waitFor(); // Stop the Thread that flushes periodically if (future != null) { future.cancel(true); } if (timedFlushService != null) { timedFlushService.shutdown(); while (!timedFlushService.isTerminated()) { try { timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for ExecTail executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } } return exitValue; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } return Integer.MIN_VALUE; } return Integer.MIN_VALUE / 2; } public void setRestart(boolean restart) { this.restart = restart; } } private static class StderrReader extends Thread { private BufferedReader input; private boolean logStderr; protected StderrReader(BufferedReader input, boolean logStderr) { this.input = input; this.logStderr = logStderr; } @Override public void run() { try { int i = 0; String line = null; while((line = input.readLine()) != null) { if(logStderr) { // There is no need to read 'line' with a charset // as we do not to propagate it. // It is in UTF-16 and would be printed in UTF-8 format. logger.info("StderrLogger[{}] = '{}'", ++i, line); } } } catch (IOException e) { logger.info("StderrLogger exiting", e); } finally { try { if(input != null) { input.close(); } } catch (IOException ex) { logger.error("Failed to close stderr reader for ExecTail source", ex); } } } } }


总结:

在开源的路上,我们越来越多喜悦,因为我们发现我们的自我定制功能可以最大程度达到我们的要求,而且对于企业而言,依赖性变小,可控性增强,风险降低,现在我们就可以窥见,不远的将来,我们的生活会异常幸福!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐