MapReduce自定义输入格式
2016-07-01 17:23
309 查看
输入数据如下,是一个Excel表,具体数据是一个月内上网产生的流量记录,我们要做的是统计24小时每一小时的总流量。
统计结果如下:
首先使用apache poi解析Excel表格(测试数据在这里下载),每一行数据以tab隔开组成一个字符串,解析完成后以数组形式
返回,具体实现如下:
ExcelDeal.java
编写自定义输入格式类,每个自定义输入格式类都需要继承FileInputFormat抽象类并实现抽象方法creatRecordReader,而
FileInputFormat继承自InputFormat,InputFormat类源码如下:
由于creatRecordReader方法返回的是一个RecordReader的实例,所以我们需要编写一个RecordReader类型的类,
RecordReader类源码如下:
因此我们的输入格式类ExcelInputFormat.java如下:
编写完Excel解析类与输入格式类后,我们来完成下MR类,具体实现如下:
Hadoop API:http://hadoop.apache.org/docs/current/api/allclasses-noframe.html
学之,以记之。
统计结果如下:
首先使用apache poi解析Excel表格(测试数据在这里下载),每一行数据以tab隔开组成一个字符串,解析完成后以数组形式
返回,具体实现如下:
ExcelDeal.java
import java.io.IOException; import java.io.InputStream; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.poi.hssf.usermodel.HSSFDateUtil; import org.apache.poi.hssf.usermodel.HSSFRow; import org.apache.poi.hssf.usermodel.HSSFSheet; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.ss.usermodel.Cell; public class ExcelDeal { public static String[] readExcel(InputStream is) throws IOException { @SuppressWarnings("resource") HSSFWorkbook hssfWorkbook = new HSSFWorkbook(is); List<String> list = new ArrayList<String>(); for (int numSheet = 0; numSheet < hssfWorkbook.getNumberOfSheets(); numSheet++) { HSSFSheet hssfSheet = hssfWorkbook.getSheetAt(numSheet); if (hssfSheet == null) { continue; } for (int rowNum = 1; rowNum <= hssfSheet.getLastRowNum(); rowNum++) { String templine = new String(); HSSFRow hssfRow = hssfSheet.getRow(rowNum); if (hssfRow == null) { continue; } //遍历每一行数据 for (Cell cell : hssfRow) { templine += getValue(cell)+"\t"; } list.add(templine); } } return list.toArray(new String[0]); } //根据数据类型获取每个表格中的数据 private static String getValue(Cell cell) { if (cell.getCellType() == Cell.CELL_TYPE_BOOLEAN) { return String.valueOf(cell.getBooleanCellValue()); } else if (cell.getCellType() == Cell.CELL_TYPE_NUMERIC) { if (HSSFDateUtil.isCellDateFormatted(cell)) { SimpleDateFormat sdf = null; sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); Date date = cell.getDateCellValue(); return sdf.format(date); } return String.valueOf(cell.getNumericCellValue()); } else { return String.valueOf(cell.getStringCellValue()); } } }
编写自定义输入格式类,每个自定义输入格式类都需要继承FileInputFormat抽象类并实现抽象方法creatRecordReader,而
FileInputFormat继承自InputFormat,InputFormat类源码如下:
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }
由于creatRecordReader方法返回的是一个RecordReader的实例,所以我们需要编写一个RecordReader类型的类,
RecordReader类源码如下:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { public abstract void initialize(InputSplit split, ) throws IOException, InterruptedException; public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; public abstract float getProgress() throws IOException, InterruptedException; public abstract void close() throws IOException; }
因此我们的输入格式类ExcelInputFormat.java如下:
import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class ExcelInputFormat extends FileInputFormat<LongWritable, Text>{ @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { return new ExcelRecordReader(); } public class ExcelRecordReader extends RecordReader<LongWritable, Text>{ private LongWritable key = new LongWritable(-1); private Text value = new Text(); private InputStream inputStream;//文件输入流 private String[] strArray;//解析结果数组 @Override public void close() throws IOException { if(inputStream!=null){ inputStream.close(); } } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { FileSplit split = (FileSplit) arg0; Configuration job = arg1.getConfiguration(); Path filePath = split.getPath(); FileSystem fileSystem = filePath.getFileSystem(job); inputStream = fileSystem.open(split.getPath()); strArray = ExcelDeal.readExcel(inputStream); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int next = (int) key.get() + 1; if(next<strArray.length&&strArray[next]!=null){ key.set(next); value.set(strArray[next]); return true; } return false; } } }
编写完Excel解析类与输入格式类后,我们来完成下MR类,具体实现如下:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class FlowCount { public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ public void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] records = line.split("\t"); String month = records[1].substring(11, 13); long flow = Long.parseLong(records[3]); context.write(new Text(month), new LongWritable(flow)); } } public static class FlowCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ public void reduce(Text key,Iterable<LongWritable> value,Context context) throws IOException, InterruptedException{ long sum = 0; for (LongWritable longWritable : value) { sum += longWritable.get(); } context.write(key, new LongWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); @SuppressWarnings("deprecation") Job job = new Job(conf,"FlowCount"); job.setJarByClass(FlowCount.class); job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(ExcelInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
Hadoop API:http://hadoop.apache.org/docs/current/api/allclasses-noframe.html
学之,以记之。
相关文章推荐
- markdown的图片外链
- C++ string类的实现
- 从apache mod_php到php-fpm[转]
- Google Protocol Buffer 的使用和原理
- git学习笔记(一)
- jmeter 3.0安装配置
- nodejs系列学习:http请求html/css/js-----(二)
- 带有安全认证的webservice
- poj1325
- Kth Largest Element in an Array
- 简单的CSS样板
- vc++ 6.0转vs2010出现的问题
- 常用Oracle问题诊断sql脚本
- 从源码角度彻底分析layout_weight使用
- gcc/g++
- 根据PID号判断所属的容器
- 怎样通过后台控制前台<input>可编辑或不可编辑
- HttpApplication可用的方法
- Doctype作用?严格模式与混杂模式如何区分?它们有何意义?
- 【转自IBM】剖析linux内核中的同步机制。值得一读。