Java实现HDFS文本解析写入到Hbase中
2018-02-06 10:33
344 查看
首先先在Hbase 中建表,参考我上一篇Java操作Hbase 的博客。
接着代码:
思路先拼接hdfs路径,然后Spark加载进来,然后分区读取,处理,一条一条写入。
接着代码:
package com.xxx.report.service; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.xxx.report.config.Constants; import com.xxx.report.util.HbaseUtilQA; import com.xxx.report.util.MD5Util; import com.xxx.report.util.TimeUtil; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Iterator; import java.util.List; import java.util.Map; /** * @author yangxin-ryanx */ public class LockAppLog2Hbase implements Serializable{ private static final Logger LOG = LoggerFactory.getLogger(LockAppLog2Hbase.class); private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd"); private static final String CF_NAME = "cf"; private static final String tableName = "rd:app_log"; public void run(String master, String startTime, String endTime) { long start = System.currentTimeMillis(); LOG.info("Start run the log parser..."); startTime = startTime.replace("-", ""); endTime = endTime.replace("-",""); SparkConf conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_YangXin).setMaster(master); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); List<String> list = Lists.newArrayList(); Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.YEAR, Integer.valueOf(startTime.substring(0,4))); calendar.set(Calendar.MONTH, Integer.valueOf(startTime.substring(4,6)) - 1); calendar.set(Calendar.DATE, Integer.valueOf(startTime.substring(6,8))); String date = startTime; while (!date.equals(endTime)){ list.add(date); calendar.add(Calendar.DATE, 1); date = simpleDateFormat.format(calendar.getTime()); } list.add(endTime); for (String day : list){ StringBuffer path = new StringBuffer(); path.append(Constants.PREFIX_PATH_YangXin).append(day).append("/*/*"); JavaRDD<String> rdd = javaSparkContext.textFile(path.toString()); if (rdd.isEmpty()){ continue; } rdd.foreachPartition(new VoidFunction<Iterator<String>>() { private static final long serialVersionUID = 1L; @Override public void call(Iterator<String> logs) throws Exception { while (logs.hasNext()) { String log = logs.next(); try { String snValue = log.split(" ")[10]; if (snValue.equals("-")){ continue; } handleLog(log); } catch (Exception e) { e.printStackTrace(); LOG.error(e.getMessage()); } } } }); HbaseUtilQA.flush(tableName); } long end = System.currentTimeMillis(); System.out.println("耗时:"+(end-start)/1000+"秒"); } private void handleLog(String line){ MD5Util md5Util = new MD5Util(); String snValue = line.split(" ")[10]; String timePart1 = line.split(" ")[0]; String timePart2 = line.split(" ")[1]; String time = timePart1 + " " + timePart2; String rowKey = md5Util.md5Function(snValue, timePart1); Long timeStamp = TimeUtil.getTimeStamp(time); Map<String, String> lockAppMap = Maps.newHashMap(); String key = md5Util.md5Key(line) + "_" + timePart2.replace(".", "").replace(":",""); lockAppMap.put(key, line); HbaseUtilQA.addRecords(tableName, rowKey, timeStamp, CF_NAME.getBytes(), lockAppMap); } public static void main(String[] args){ LockAppLog2Hbase lockAppLog2Hbase = new LockAppLog2Hbase(); String master = args[0]; String startTime = args[1]; String endTime = args[2]; lockAppLog2Hbase.run(master, startTime, endTime); } }
思路先拼接hdfs路径,然后Spark加载进来,然后分区读取,处理,一条一条写入。
相关文章推荐
- 使用批处理实现解析文本数据文件写入数据库表
- 简单通过java的socket&serversocket以及多线程技术实现多客户端的数据的传输,并将数据写入hbase中
- Java 实现追加写入HDFS
- Java实现txt文本的读取、写入、转移!
- Java实现将文本内容、网址链接url,生成二维码与反解析
- 用JAVA实现文本形式的树状结构显示(转)
- 用Java实现文本形式的树状结构显示
- 不使用组件实现多个图片与文本数据同时写入数据库(4)
- 用JAVA实现文本形式的树状结构显示
- 如何用Java 实现 Excel 表达式的解析(摘自:http://topic.csdn.net/t/20030408/17/1634982.html#)
- 用java实现文本形式的树状结构显示
- Java实现目录查找与文本阅读
- 搜索文本内容——Java代码的简单实现(修改版)
- Java实现文件的读取、写入、复制、修改、移动等操作
- java实现对url解析
- 日志类(常用的都是用log4net,这里简陋地实现一个写入文本日志类)
- java 文本写入的例子
- 数据结构之应用 "栈(Stack)" 实现: 解析算术表达式及计算求值 (C#/Java)
- java.beans包里面的两个类简单地实现XML解析
- 数据结构习作之应用 "栈(Stack)" 实现: 解析算术表达式及计算求值 (C#/Java) (技术含量少许)