您的位置:首页 > 编程语言

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

2016-12-12 15:33 1111 查看
  不多说,直接上干货!

   下面,是版本1。

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一)

    下面是版本2。

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

这篇博客,给大家,体会不一样的版本编程。

























[b]代码[/b]

package zhouls.bigdata.myMapReduce.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MyKey implements WritableComparable<MyKey>{
//WritableComparable,实现这个方法,要多很多
//readFields是读入,write是写出
private int year;
private int month;
private double hot;
public int getYear() {
return year;
}

public void setYear(int year) {
this.year = year;
}

public int getMonth() {
return month;
}

public void setMonth(int month) {
this.month = month;
}

public double getHot() {
return hot;
}

public void setHot(double hot) {
this.hot = hot;
}//这一大段的get和set,可以右键,source,产生get和set,自动生成。

public void readFields(DataInput arg0) throws IOException { //反序列化
this.year=arg0.readInt();
this.month=arg0.readInt();
this.hot=arg0.readDouble();
}

public void write(DataOutput arg0) throws IOException { //序列化
arg0.writeInt(year);
arg0.writeInt(month);
arg0.writeDouble(hot);
}

//判断对象是否是同一个对象,当该对象作为输出的key
public int compareTo(MyKey o) {
int r1 =Integer.compare(this.year, o.getYear());//比较当前的年和你传过来的年
if(r1==0){
int r2 =Integer.compare(this.month, o.getMonth());
if(r2==0){
return Double.compare(this.hot, o.getHot());
}else{
return r2;
}
}else{
return r1;
}
}

}


package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{//这里就是洗牌

//执行时间越短越好
public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
return (key.getYear()-1949)%numReduceTasks;//对于一个数据集,找到最小,1949
}
}

//1949-10-01 14:21:02    34c
//1949-10-02 14:01:02    36c
//1950-01-01 11:21:02    32c
//1950-10-01 12:21:02    37c
//1951-12-01 12:21:02    23c
//1950-10-02 12:21:02    41c
//1950-10-03 12:21:02    27c
//1951-07-01 12:21:02    45c
//1951-07-02 12:21:02    46c
//1951-07-03 12:21:03    47c


package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MySort extends WritableComparator{

public MySort(){
super(MyKey.class,true);//把MyKey传进了
}

public int compare(WritableComparable a, WritableComparable b) {//这是排序的精髓
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){//年相同
int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
if(r2==0){//月相同
return -Double.compare(k1.getHot(), k2.getHot());//比较气温
}else{
return r2;
}
}else{
return r1;
}

}
}


package zhouls.bigdata.myMapReduce.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator{

public MyGroup(){
super(MyKey.class,true);//把MyKey传进了
}

public int compare(WritableComparable a, WritableComparable b) {//这是分组的精髓
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
return Integer.compare(k1.getMonth(), k2.getMonth());
}else{
return r1;
}

}
}


package zhouls.bigdata.myMapReduce.weather;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

//    1949-10-01 14:21:02    34c WeatherMapper
//    1949-10-02 14:01:02    36c
//    1950-01-01 11:21:02    32c    分区在MyPartitioner.java
//    1950-10-01 12:21:02    37c
//    1951-12-01 12:21:02    23c    排序在MySort.java
//    1950-10-02 12:21:02    41c
//    1950-10-03 12:21:02    27c    分组在MyGroup.java
//    1951-07-01 12:21:02    45c
//    1951-07-02 12:21:02    46c    再,WeatherReducer
//    1951-07-03 12:21:03    47c

//key:每行第一个隔开符(制表符)左边为key,右边为value    自定义类型MyKey,洗牌,
static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
NullWritable v =NullWritable.get();
//    1949-10-01 14:21:02是自定义类型MyKey,即key
//    34c是DoubleWritable,即value

protected void map(Text key, Text value,Context context) throws IOException, InterruptedException {
try {
Date date =sdf.parse(key.toString());
Calendar c =Calendar.getInstance();
//Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象,
//此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance();
c.setTime(date);
int year =c.get(Calendar.YEAR);
int month =c.get(Calendar.MONTH);

double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
MyKey k =new MyKey();
k.setYear(year);
k.setMonth(month);
k.setHot(hot);
context.write(k, new DoubleWritable(hot));
} catch (Exception e) {
e.printStackTrace();
}
}
}

static class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,Context arg2)throws IOException, InterruptedException {
int i=0;
for(DoubleWritable v :arg1){
i++;
String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();//"\t"是制表符
arg2.write(new Text(msg), NullWritable.get());
if(i==3){
break;
}
}
}
}

public static void main(String[] args) {
Configuration config =new Configuration();
//    config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
//    config.set("yarn.resourcemanager.hostname", "HadoopMaster");
//    config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
//    config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如","
try {
FileSystem fs =FileSystem.get(config);

Job job =Job.getInstance(config);
job.setJarByClass(RunJob.class);

job.setJobName("weather");

job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(DoubleWritable.class);

job.setPartitionerClass(MyPartitioner.class);
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroup.class);

job.setNumReduceTasks(3);

job.setInputFormatClass(KeyValueTextInputFormat.class);

//    FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt
//
//    Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather");

FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt

Path outpath =new Path("./out/weather");

if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);

boolean f= job.waitForCompletion(true);
if(f){
}
} catch (Exception e) {
e.printStackTrace();
}
}

}


欢迎大家,加入我的微信公众号:大数据躺过的坑

同时,大家可以关注我的个人博客

http://www.cnblogs.com/zlslch/ http://www.cnblogs.com/lchzls/

以及对应本平台的QQ群:161156071(大数据躺过的坑)



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐