您的位置:首页 > 大数据 > Hadoop

hadoop MapReduce 基础数字排序

2020-06-04 07:36 204 查看

hadoop MapReduce 基础数字排序

package mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Sort {
//本地运行需要的方法
static {
System.setProperty("hadoop.home.dir", "D:\\soft\\hadoop\\hadoop-2.9.2");
}
//自定义类继承Mapper·并重写map方法
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text > {
/**
*
* @param key 行索引
* @param value 每行数据
* @param context mr中的上下文环境
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//数字:map输出时,Int和Long, shuffle会按数字排序
//字符串:map输出时,Text,shuffle会按字典顺序排
context.write(new LongWritable(Long.parseLong(value.toString())),new Text(""));
}
}

//自定义类继承Reducer·并重写map方法
public static class MyReducer extends Reducer<LongWritable, Text, LongWritable, Text> {

/**
* @param key 每个单词
* @param values 单词个数的集合
* @param context 上下文环境
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key,new Text(""));
}
}

//驱动方法
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//0.初始化一个job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Sort");
/*
打jar包集群方式运行
job.setJarByClass(WordCount.class);
*/
//1.输入文件
FileInputFormat.addInputPath(job, new Path(args[0]));
//2.map并行计算
//如果map的输出key value 的类型个reduce key  value的类型相同可以省略
job.setMapperClass(MyMapper.class);
//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(LongWritable.class);
//3.shuffle流程(内部实现)
//4.reduce计算
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//5.输出文件
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//判断文件是否存在 如存在 则删除
FileSystem fs=FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]),true);
}

//6.提交作业(总入口)
boolean result = job.waitForCompletion(true);
System.out.println(result ? 1 : 0);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: