您的位置:首页 > 移动开发

hadoop实例(java模板):数字逆序输出 (自定义mapper,reducer,自定义key2类型,重写compareTo函数,HDFS操作)

2017-04-11 13:51 555 查看
主要是整理了mapreduce常用的操作模板

主函数(请忽略主类的名字。。忘记改了):

package hadoop.wordCount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.*;

public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Path inpath = new Path(HDFS.HDFS_PATH+"/file/input");
Path outpath = new Path(HDFS.HDFS_PATH+"/file/output");
Job job = new Job(conf, "WordCount");

//先删除之前的输入输出文件,再重新读取输入文件
HDFS.rmInHDFS("/file/output");
HDFS.rmInHDFS("/file/input");
HDFS.mkdirsInHDFS("/file/input");
HDFS.uploadToHDFS("/home/eason/hadoop_work/input/file1","/file/input/file1");
HDFS.uploadToHDFS("/home/eason/hadoop_work/input/file2","/file/input/file2");

//设置输入输出路径
FileInputFormat.setInputPaths(job, inpath);
FileOutputFormat.setOutputPath(job, outpath);

//绑定mapper类和reducer类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

/*
job.setOutputKeyClass和job.setOutputValueClas在默认情况下是同时设置map阶段和reduce阶段的输出,也就是说只有map和reduce输出是一样的时候才不会出问题。
当map和reduce输出是不一样的时候就需要通过job.setMapOutputKeyClass和job.setMapOutputValueClas来设置map阶段的输出。
*/

//设置map阶段输出
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(NullWritable.class);

//设置reduce阶段的输出
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);

//输出调试信息,控制台打印结果
job.waitForCompletion(true);
HDFS.printInHDFS("/file/output/part-r-00000");
}

}

HDFS操作类:
package hadoop.wordCount;

import java.io.*;
import java.net.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class HDFS {
static final String HDFS_PATH="hdfs://192.168.19.138:9000";
// 在HDFS中创建一个目录
public static void mkdirsInHDFS(String folder) throws IOException {
Path path = new Path(folder);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println("Create: " + folder);
}
fs.close();
}
// 删除HDFS中文件
public static void rmInHDFS(String folder) throws IOException {
Path path = new Path(folder);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
fs.deleteOnExit(path);
System.out.println("Delete: " + folder);
fs.close();
}
// 将本地路径的文件上传至HDFS
public static void uploadToHDFS(String localSrc,String hdfsSrc) throws FileNotFoundException,IOException {
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
OutputStream out = fs.create(new Path(hdfsSrc), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
System.out.println("Upload successed! ");
}
// 读取HDFS上文件内容并输出
public static void printInHDFS(String folder) throws FileNotFoundException,IOException {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
final URL url = new URL(HDFS_PATH+folder);
final InputStream in = url.openStream();
IOUtils.copyBytes(in, System.out, 1024, true);
}
}


MyMapper:
package hadoop.wordCount;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

// 定义Mapper类,注意参数类型: <key1,value1,key2,value2> key1为源文件行号,value1为该行的文本
class MyMapper extends Mapper<LongWritable,Text, MyKey, NullWritable>{ //泛型
protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException {
MyKey t=new MyKey();
t.value=Long.parseLong(v1.toString());
context.write(t, NullWritable.get());
}
}

MyReducer:
package hadoop.wordCount;

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

//定义Reducer类,注意参数类型: <key2,value2,key3,value3>
class MyReducer extends Reducer<MyKey, NullWritable, LongWritable, NullWritable>{
protected void reduce(MyKey k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(k2.value), NullWritable.get());
}
}

MyKey:
package hadoop.wordCount;

import java.io.*;

import org.apache.hadoop.io.*;

public class MyKey implements WritableComparable<MyKey> {
public String name;
public Long value;
//构造函数
public MyKey(){
value=(long)0;
name="";
}
public MyKey(Text a,LongWritable b){
name=a.toString();
value=b.get();
}
//以下2个函数必须重写,定义输入输出流类型
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeLong(value);
}
public void readFields(DataInput dataInput) throws IOException {
name=dataInput.readUTF();
value=dataInput.readLong();
}
//作为key的类必须重写,定义比较规则
public int compareTo(MyKey b) {
return (int)(b.value-value);
}
//定义该类作为key3或value3时的写入格式
public String toString() {
return value.toString();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: