您的位置:首页 > 大数据 > Hadoop

HDFS工作机制——自开发分布式数据采集系统

2019-05-25 18:15 2316 查看

需求描述:

在业务系统的服务器上,业务程序会不断生成业务日志(比如网站的页面访问日志)

业务日志是用log4j生成的,会不断地切出日志文件,需要定期(比如每小时)从业务服务器上的日志目录中,探测需要采集的日志文件(access.log不能采),发往HDFS

注意点:业务服务器可能有多台(hdfs上的文件名不能直接用日志服务器上的文件名)

当天采集到的日志要放在hdfs的当天目录中,采集完成的日志文件,需要移动到到日志服务器的一个备份目录中定期检查(每小时检查一下备份目录),将备份时长超出24小时的日志文件清除

数据采集流程分析

1.流程
启动一个定时任务
定时探测日志源目录
获取需要采集得文件
移动这些文件到一个待上传得临时目录
遍历待上传目录中得文件,逐一传输到HDFS得目标路径
同时将传输得文件移动到备份目录

启动一个定时任务:
探测备份目录中得备份数据,检查是否超出最长备份时长,超出,则删除

2.规划各种路径
日志源路径:d:/logs/accesslog/
待上传临时目录:d:/logs/toupload/
备份目录:d:/logs/backup/日期

HDFS 存储路径:/logs/日期
HDFS中文件的前缀:acceaa_log_
HDFS中文件的后缀:.log

准备工作

[root@hdp-01 ~]# start-dfs.sh
Starting namenodes on [hdp-01]
hdp-01: starting namenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-namenode-hdp-01.out
hdp-01: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-01.out
hdp-03: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-03.out
hdp-02: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-02.out
hdp-04: starting datanode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-datanode-hdp-04.out
Starting secondary namenodes [hdp-02]
hdp-02: starting secondarynamenode, logging to /root/apps/hadoop-2.7.2/logs/hadoop-root-secondarynamenode-hdp-02.out

代码如下

DataCollection 

public class DataCollection {
public static void main(String[] args) {
Timer timer = new Timer();
//1.【定时探测日志源目录】,每一小时执行一次
timer.schedule(new CollectTask(),0,60*60*1000L);
//2.【定时删除文件】
timer.schedule(new BackupCleanTask(),0,60*60*1000L);
}
}

collect.properties

LOG_SOURCE_DIR=d:/logs/accesslog/
LOG_TOUPLOAD_DIR=d:/logs/toupload/
LOG_BACKUP_BASE_DIR=d:/logs/backup/
LOG_LEGAL_PREFIX=access.log.
HDFS_URI=hdfs://hdp-01:9000/
HDFS_DEST_BASE_DIR=/logs/
HDFS_FILE_PREFIX=access_log_
HDFS_FILE_SUFFIX=.log

Contants 

/**
* 日志目录参数key
*/
public class Contants {

/**
* 本地要上传文件目录
*/
public static final String LOG_SOURCE_DIR="LOG_SOURCE_DIR";
/**
* 临时上传目录中的文件
*/
public static final String LOG_TOUPLOAD_DIR="LOG_TOUPLOAD_DIR";
/**
* d:/logs/backup/
*/
public static final String LOG_BACKUP_BASE_DIR="LOG_BACKUP_BASE_DIR";
/**
* 需要采集得文件
*/
public static final String LOG_LEGAL_PREFIX="LOG_LEGAL_PREFIX";
/**
* 上传到 HDFS ip+port
*/
public static final String HDFS_URI="HDFS_URI";
/**
* hdfs:logs/目录
*/
public static final String HDFS_DEST_BASE_DIR="HDFS_DEST_BASE_DIR";
/**
* hdfs文件前缀:access_log_
*/
public static final String HDFS_FILE_PREFIX="HDFS_FILE_PREFIX";
/**
* hdfs文件后缀:.log
*/
public static final String HDFS_FILE_SUFFIX="HDFS_FILE_SUFFIX";
}

单例设计方式一:饿汉式单例,程序启动时创建

PropertyHolderHungery

/**
* 单例设计方式一:饿汉式单例,程序启动时创建
*/
public class PropertyHolderHungery {

private static Properties prop=new Properties();

//静态代码块
static {
try {
prop.load(PropertyHolderHungery.class.getClassLoader()
.getResourceAsStream("collect.properties"));
}catch (Exception e){
e.printStackTrace();
}
}
public static Properties getProps()throws Exception{
return prop;
}
}

单例设计模式二:懒汉式,使用的时候创建,还考虑线程安全

PropertyHolderLazy 

/**
* 单例设计模式二:懒汉式,使用的时候创建,还考虑线程安全
*/
public class PropertyHolderLazy {
//默认构造器私有化
private PropertyHolderLazy(){

}
//禁止指令重排序
private volatile static Properties prop=null;

public static Properties getProps()throws Exception{
if(prop==null){
//加锁,保证多线程场景下线程安全问题
synchronized (PropertyHolderLazy.class){
//防止再次new
if(prop==null){
prop=new Properties();
prop.load(PropertyHolderLazy.class.getClassLoader()
.getResourceAsStream("collect.properties"));
}
}
}
return prop;
}
}

CollectTask 

public class CollectTask extends TimerTask {
//构造一个log4j日志对象
public static Log log = LogFactory.getLog(CollectTask.class);
public void run() {
/**
*     1.定时探测日志源目录
*     2.获取需要采集得文件
*     3.移动这些文件到一个待上传得临时目录
*     4.遍历待上传目录中得文件,逐一传输到HDFS得目标路径
*     5.同时将传输得文件移动到备份目录
*/
try{
//获取配置参数
final Properties props = PropertyHolderLazy.getProps();
//获取本次采集时的日期
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
String day = sdf.format(new Date());
//获取本地要上传文件目录
File srcDir = new File(props.getProperty(Contants.LOG_SOURCE_DIR));
//2.【获取需要采集得文件】
File[] listFiles = srcDir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if(name.startsWith(props.getProperty(Contants.LOG_LEGAL_PREFIX))){
return true;
}
return false;
}
});
//记录日志
log.info("探测到如下文件需要采集:"+Arrays.toString(listFiles));
//获取临时上传目录中的文件
File toUploadDir = new File(props.getProperty(Contants.LOG_TOUPLOAD_DIR));
//3.【移动这些文件到一个待上传得临时目录】
for (File file:listFiles) {
//将采集的文件移到临时上传目录,将源目录中需要上传的文件移动到临时上传目录中
FileUtils.moveFileToDirectory(file,toUploadDir,true);
}
//记录日志
log.info("上述文件移动到待上传目录"+toUploadDir.getAbsolutePath());
//构造一个HDFS的客户端对象
FileSystem fs = FileSystem.get(new URI(props.getProperty(Contants.HDFS_URI)), new Configuration(), "root");
//从临时上传目录中列出所有文件
File[] toUploadFiles = toUploadDir.listFiles();
//1.检查hdfs 中的日期是否存在
Path hdfsDestPath = new Path(props.getProperty(Contants.HDFS_DEST_BASE_DIR) + day);
if(!fs.exists(hdfsDestPath)){
fs.mkdirs(hdfsDestPath);
}
//2.检查本地的备份目录是否存在
File backupDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR) + day);
if(!backupDir.exists()){
backupDir.mkdirs();
}
//4.【遍历待上传目录中得文件,逐一传输到HDFS得目标路径】
for (File file:toUploadFiles) {
//传输文件到HDFS并改名
Path destPath = new Path(hdfsDestPath +"/"+props.getProperty(Contants.HDFS_FILE_PREFIX)
+ UUID.randomUUID() +props.getProperty(Contants.HDFS_FILE_SUFFIX));
//将临时上传目录中的文件上传到hdfs中
fs.copyFromLocalFile(new Path(file.getAbsolutePath()),destPath);
//记录日志
log.info("文件传输到hdfs完成:"+file.getAbsolutePath() +"-->"+destPath);
//5.【同时将传输得文件移动到备份目录】
FileUtils.moveFileToDirectory(file,backupDir,true);
//记录日志
log.info("文件备份完成:"+file.getAbsolutePath() +"-->"+backupDir);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

BackupCleanTask 

public class BackupCleanTask extends TimerTask {

public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
long now = new Date().getTime();
//探测备份目录
try {
//获取配置参数
final Properties props = PropertyHolderLazy.getProps();
File backupBaseDir = new File(props.getProperty(Contants.LOG_BACKUP_BASE_DIR));
File[] dayBackDir = backupBaseDir.listFiles();
//判断备份目录是否已经超过24h
for (File dir : dayBackDir) {
long time = sdf.parse(dir.getName()).getTime();
if(now-time>24*60*60*1000L){
//递归删除目录
FileUtils.deleteDirectory(dir);
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}

日志配置

log4j.rootLogger=CONSOLE,stdout,logfile
#stdout控制器
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#输出格式
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]:%L - %m%n
#文件路径输出
log4j.appender.logfile=org.apache.log4j.RollingFileAppender
log4j.appender.logfile.File=d:/logs/collect/collect.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

pom依赖

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>

</dependencies>

控制台输出

...
47 - 探测到如下文件需要采集:[d:\logs\accesslog\access.log.1, d:\logs\accesslog\access.log.2, d:\logs\accesslog\access.log.3]
57 - 上述文件移动到待上传目录d:\logs\toupload
79 - 文件传输到hdfs完成:d:\logs\toupload\access.log.1-->/logs/2019-05-25-12/access_log_9dc0542e-0153-4bb3-b804-d85115c20153.log
83 - 文件备份完成:d:\logs\toupload\access.log.1-->d:\logs\backup\2019-05-25-12
79 - 文件传输到hdfs完成:d:\logs\toupload\access.log.2-->/logs/2019-05-25-12/access_log_5ce3450d-8874-4dd7-a23e-8d6a7a52c6d9.log
83 - 文件备份完成:d:\logs\toupload\access.log.2-->d:\logs\backup\2019-05-25-12
79 - 文件传输到hdfs完成:d:\logs\toupload\access.log.3-->/logs/2019-05-25-12/access_log_f7b7c741-bb87-4778-98ad-e33f06501441.log
83 - 文件备份完成:d:\logs\toupload\access.log.3-->d:\logs\backup\2019-05-25-12
...

效果图

 

(adsbygoogle = window.adsbygoogle || []).push({});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  HDFS