数据采集之Web端导入日志文件到Hadoop HDFS
2017-03-16 00:19
639 查看
前言
接上一篇《数据采集之Web端导入DB数据到Hadoop HDFS》,这一篇简单的记录一下如何在Web端控制导入日志文件到HDFS中,主要用到的技术就是Flume了。网上大多数教程都是写的配置文件,但是现在有需求要通过web来动态自动化的进行操作,也就是说要通过Java代码的形式来实现,而不是直接修改服务器上的配置文件。所以只能想想其他其他方式了。环境
OS Debian 8.7Hadoop 2.6.5
SpringBoot 1.5.1.RELEASE
Flume 1.7.0
Flume简介
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.Flume是一个分布式、高可靠、高可用的日志数据采集、聚合和传输服务。
它有一个简单灵活基于数据流的结构。它也有可靠的和容错的机制,并且可用进行许多故障转移和恢复机制。 它使用一个简单的可扩展数据模型,允许在线分析应用程序。
翻译的不太好,简单的理解就是一个日志数据采集工具吧。
项目依赖
先看下pom.xml文件。<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.infosys.flume</groupId> <artifactId>flume</artifactId> <version>1.0-SNAPSHOT</version> <name>flume</name> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <hadoop.version>2.6.5</hadoop.version> <flume.version>1.7.0</flume.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <!-- Flume --> <dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-hdfs-sink</artifactId> <version>${flume.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- Hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <version>1.1.0</version> <classifier>hadoop2</classifier> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <version>${hadoop.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <outputDirectory>${basedir}</outputDirectory> </configuration> </plugin> </plugins> </build> </project>
其中SpringBoot和Hadoop不多说,主要是org.apache.flume.flume-ng-sinks这个依赖不能少。它提供了很多方便配置flume相关的API。
核心Flume
这里仅作为演示,真实项目肯定不能这样的。因为网上包括官网都很少提供这种API的配置形式的,大都是直接改配置文件。在这里我们只需要将Flume下载好,然后使用默认配置即可。其他的配置在代码中编写。为了偷懒,跟上篇一样,都写在一个java类中了。。
package com.infosys.flume; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.sink.hdfs.HDFSEventSink; import org.apache.flume.source.SpoolDirectorySource; import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.io.File; import java.util.ArrayList; import java.util.List; /** * 描述: * 公司:infosys(印孚瑟斯技术有限公司) * 作者:luhaoyuan <lusyoe@163.com> * 版本:1.0 * 创建日期:2017/3/15. */ @RestController public class FlumeController { private Channel memChannel = new MemoryChannel(); private HDFSEventSink hdfsSink; @PostMapping(value = "/log2Hdfs") public String logToHdfs() throws EventDeliveryException { configSources(); configChannels(); configSinks(); // 存储event到HDFS hdfsSink.process(); return "SUCCESS"; } /** * 配置通道 */ private void configChannels() { Context channelContext = new Context(); memChannel.setName("ch1"); Configurables.configure(memChannel, channelContext); memChannel.start(); } /** * 配置目的地,这里就是导入到HDFS */ private void configSinks() { Context sinkContext = new Context(); sinkContext.put("hdfs.path", HdfsSinkConstants.HDFS_PATH); sinkContext.put("hdfs.fileType", HdfsSinkConstants.HDFS_FILE_TYPE); sinkContext.put("hdfs.filePrefix", "%{fileName}"); // 将原来的文件名(包括路径)作为前缀 sinkContext.put("hdfs.round", "true"); sinkContext.put("hdfs.rollInterval", "30"); // 隔30秒就将事件写入HDFS sinkContext.put("hdfs.useLocalTimeStamp", "true"); // 使用时间戳配置 hdfsSink = new HDFSEventSink(); hdfsSink.setName("myhdfs"); hdfsSink.configure(sinkContext); hdfsSink.setChannel(memChannel); hdfsSink.start(); } /** * 配置日志源,指定一个目录 */ private void configSources() { Context context = new Context(); SpoolDirectorySource spoolDirSource = new SpoolDirectorySource(); spoolDirSource.setName("nginx"); List<Channel> channels = new ArrayList<>(); channels.add(memChannel); ChannelSelector rcs = new ReplicatingChannelSelector(); rcs.setChannels(channels); spoolDirSource.setChannelProcessor(new ChannelProcessor(rcs)); // 日志存放路径 File file = new File("/var/log/nginx"); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, file.getAbsolutePath()); // 将文件名(包括路径)添加到头信息,方便写入的时候获取 context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER, "true"); context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY, "fileName"); Configurables.configure(spoolDirSource, context); spoolDirSource.start(); } }
其实基本上也跟改配置文件类似,除了用Context,也可以用JDK的Properties好像。其中配置大都可用通过前端表单的形式传递过来。
常量
就几个常量,懒的抽了。package com.infosys.flume; /** * 描述: * 公司:infosys(印孚瑟斯技术有限公司) * 作者:luhaoyuan <lusyoe@163.com> * 版本:1.0 * 创建日期:2017/3/15. */ public final class HdfsSinkConstants { public static final String HDFS_PATH = "hdfs://e5:9000/flume/nginx/%y-%m-%d"; //目录按照年月日进行存储 public static final String HDFS_FILE_TYPE = "DataStream"; // 表示文件类型,不会被压缩 }
后记
只是作为练手熟悉的一个Demo,必然是有很多问题。而且还有很多其他的配置,如:拦截器、其他的Source、Channel、Sink这里就没有一一的进行演示了,可参考已有的进行相应配置即可。最后还有貌似每次熟悉一个新东西都会踩很多坑啊。下面简单记录几个所遇到的坑:
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
出现这个异常是因为使用时间戳保持日志时,没有进行设置。添加以下这行就可以了:
// 使用时间戳配置 sinkContext.put("hdfs.useLocalTimeStamp", "true");
权限问题:
如果日志所在的目录因为权限无法读取,这个就需要手动设置了,或者定期的复制到某个有权限的路径下。
相关文章推荐
- hadoop日志分析系统二 第一部分 利用任务调度系统定期的把web系统所产生的日志文件导入到hdfs中
- 数据采集之Web端导入DB数据到Hadoop HDFS
- 数据采集之Web端上传文件到Hadoop HDFS
- hadoop日志分析系统二 第一部分 利用任务调度系统定期的把web系统所产生的日志文件导入到hdfs中
- 使用JAVA将Hadoop HDFS中的日志文件导入HBase中(一)
- 数据採集之Web端上传文件到Hadoop HDFS
- hadoop学习之HDFS(2.7):实例:开发shell脚本定时采集日志数据到hdfs
- Hadoop硬实战之一:使用flume将系统日志文件导入HDFS
- 使用JAVA将Hadoop HDFS中的日志文件导入HBase中(二)
- 大数据学习篇:hadoop深入浅出系列之HDFS(七) ——小文件解决方案
- Hadoop Mapper 阶段将数据直接从 HDFS 导入 Hbase
- Hadoop HDFS 文件访问权限问题导致Java Web 上传文件到Hadoop失败的原因分析及解决方法
- 1.2 使用Hadoop shell命令导入和导出数据到HDFS
- 在hadoop 里安装 sqoop 并把mysql数据导入hdfs
- Hadoop学习笔记——1.java读取Oracle中表的数据,创建新文件写入Hdfs
- 把hive中的数据导入到hdfs或者本地文件的方式
- Hadoop Sqoop;从HDFS导入数据到MYSQL数据库中出现中文字符乱码
- Flume采集数据到HDFS时,生成的文件中,开头信息有乱码
- FileUpload上传Excel文件,Aspose导入数据至数据库(web)
- hive实现txt数据导入,理解hadoop中hdfs、mapreduce