hadoop实例(java模板):数字逆序输出 (自定义mapper,reducer,自定义key2类型,重写compareTo函数,HDFS操作)
2017-04-11 13:51
555 查看
主要是整理了mapreduce常用的操作模板
主函数(请忽略主类的名字。。忘记改了):
package hadoop.wordCount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.*;
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Path inpath = new Path(HDFS.HDFS_PATH+"/file/input");
Path outpath = new Path(HDFS.HDFS_PATH+"/file/output");
Job job = new Job(conf, "WordCount");
//先删除之前的输入输出文件,再重新读取输入文件
HDFS.rmInHDFS("/file/output");
HDFS.rmInHDFS("/file/input");
HDFS.mkdirsInHDFS("/file/input");
HDFS.uploadToHDFS("/home/eason/hadoop_work/input/file1","/file/input/file1");
HDFS.uploadToHDFS("/home/eason/hadoop_work/input/file2","/file/input/file2");
//设置输入输出路径
FileInputFormat.setInputPaths(job, inpath);
FileOutputFormat.setOutputPath(job, outpath);
//绑定mapper类和reducer类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
/*
job.setOutputKeyClass和job.setOutputValueClas在默认情况下是同时设置map阶段和reduce阶段的输出,也就是说只有map和reduce输出是一样的时候才不会出问题。
当map和reduce输出是不一样的时候就需要通过job.setMapOutputKeyClass和job.setMapOutputValueClas来设置map阶段的输出。
*/
//设置map阶段输出
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(NullWritable.class);
//设置reduce阶段的输出
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
//输出调试信息,控制台打印结果
job.waitForCompletion(true);
HDFS.printInHDFS("/file/output/part-r-00000");
}
}
HDFS操作类:
package hadoop.wordCount;
import java.io.*;
import java.net.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class HDFS {
static final String HDFS_PATH="hdfs://192.168.19.138:9000";
// 在HDFS中创建一个目录
public static void mkdirsInHDFS(String folder) throws IOException {
Path path = new Path(folder);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println("Create: " + folder);
}
fs.close();
}
// 删除HDFS中文件
public static void rmInHDFS(String folder) throws IOException {
Path path = new Path(folder);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
fs.deleteOnExit(path);
System.out.println("Delete: " + folder);
fs.close();
}
// 将本地路径的文件上传至HDFS
public static void uploadToHDFS(String localSrc,String hdfsSrc) throws FileNotFoundException,IOException {
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
OutputStream out = fs.create(new Path(hdfsSrc), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
System.out.println("Upload successed! ");
}
// 读取HDFS上文件内容并输出
public static void printInHDFS(String folder) throws FileNotFoundException,IOException {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
final URL url = new URL(HDFS_PATH+folder);
final InputStream in = url.openStream();
IOUtils.copyBytes(in, System.out, 1024, true);
}
}
MyMapper:
package hadoop.wordCount;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
// 定义Mapper类,注意参数类型: <key1,value1,key2,value2> key1为源文件行号,value1为该行的文本
class MyMapper extends Mapper<LongWritable,Text, MyKey, NullWritable>{ //泛型
protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException {
MyKey t=new MyKey();
t.value=Long.parseLong(v1.toString());
context.write(t, NullWritable.get());
}
}
MyReducer:
package hadoop.wordCount;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
//定义Reducer类,注意参数类型: <key2,value2,key3,value3>
class MyReducer extends Reducer<MyKey, NullWritable, LongWritable, NullWritable>{
protected void reduce(MyKey k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(k2.value), NullWritable.get());
}
}
MyKey:
package hadoop.wordCount;
import java.io.*;
import org.apache.hadoop.io.*;
public class MyKey implements WritableComparable<MyKey> {
public String name;
public Long value;
//构造函数
public MyKey(){
value=(long)0;
name="";
}
public MyKey(Text a,LongWritable b){
name=a.toString();
value=b.get();
}
//以下2个函数必须重写,定义输入输出流类型
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeLong(value);
}
public void readFields(DataInput dataInput) throws IOException {
name=dataInput.readUTF();
value=dataInput.readLong();
}
//作为key的类必须重写,定义比较规则
public int compareTo(MyKey b) {
return (int)(b.value-value);
}
//定义该类作为key3或value3时的写入格式
public String toString() {
return value.toString();
}
}
主函数(请忽略主类的名字。。忘记改了):
package hadoop.wordCount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.*;
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Path inpath = new Path(HDFS.HDFS_PATH+"/file/input");
Path outpath = new Path(HDFS.HDFS_PATH+"/file/output");
Job job = new Job(conf, "WordCount");
//先删除之前的输入输出文件,再重新读取输入文件
HDFS.rmInHDFS("/file/output");
HDFS.rmInHDFS("/file/input");
HDFS.mkdirsInHDFS("/file/input");
HDFS.uploadToHDFS("/home/eason/hadoop_work/input/file1","/file/input/file1");
HDFS.uploadToHDFS("/home/eason/hadoop_work/input/file2","/file/input/file2");
//设置输入输出路径
FileInputFormat.setInputPaths(job, inpath);
FileOutputFormat.setOutputPath(job, outpath);
//绑定mapper类和reducer类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
/*
job.setOutputKeyClass和job.setOutputValueClas在默认情况下是同时设置map阶段和reduce阶段的输出,也就是说只有map和reduce输出是一样的时候才不会出问题。
当map和reduce输出是不一样的时候就需要通过job.setMapOutputKeyClass和job.setMapOutputValueClas来设置map阶段的输出。
*/
//设置map阶段输出
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(NullWritable.class);
//设置reduce阶段的输出
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
//输出调试信息,控制台打印结果
job.waitForCompletion(true);
HDFS.printInHDFS("/file/output/part-r-00000");
}
}
HDFS操作类:
package hadoop.wordCount;
import java.io.*;
import java.net.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class HDFS {
static final String HDFS_PATH="hdfs://192.168.19.138:9000";
// 在HDFS中创建一个目录
public static void mkdirsInHDFS(String folder) throws IOException {
Path path = new Path(folder);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println("Create: " + folder);
}
fs.close();
}
// 删除HDFS中文件
public static void rmInHDFS(String folder) throws IOException {
Path path = new Path(folder);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
fs.deleteOnExit(path);
System.out.println("Delete: " + folder);
fs.close();
}
// 将本地路径的文件上传至HDFS
public static void uploadToHDFS(String localSrc,String hdfsSrc) throws FileNotFoundException,IOException {
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFS_PATH), conf);
OutputStream out = fs.create(new Path(hdfsSrc), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
System.out.println("Upload successed! ");
}
// 读取HDFS上文件内容并输出
public static void printInHDFS(String folder) throws FileNotFoundException,IOException {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
final URL url = new URL(HDFS_PATH+folder);
final InputStream in = url.openStream();
IOUtils.copyBytes(in, System.out, 1024, true);
}
}
MyMapper:
package hadoop.wordCount;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
// 定义Mapper类,注意参数类型: <key1,value1,key2,value2> key1为源文件行号,value1为该行的文本
class MyMapper extends Mapper<LongWritable,Text, MyKey, NullWritable>{ //泛型
protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException {
MyKey t=new MyKey();
t.value=Long.parseLong(v1.toString());
context.write(t, NullWritable.get());
}
}
MyReducer:
package hadoop.wordCount;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
//定义Reducer类,注意参数类型: <key2,value2,key3,value3>
class MyReducer extends Reducer<MyKey, NullWritable, LongWritable, NullWritable>{
protected void reduce(MyKey k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(k2.value), NullWritable.get());
}
}
MyKey:
package hadoop.wordCount;
import java.io.*;
import org.apache.hadoop.io.*;
public class MyKey implements WritableComparable<MyKey> {
public String name;
public Long value;
//构造函数
public MyKey(){
value=(long)0;
name="";
}
public MyKey(Text a,LongWritable b){
name=a.toString();
value=b.get();
}
//以下2个函数必须重写,定义输入输出流类型
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
dataOutput.writeLong(value);
}
public void readFields(DataInput dataInput) throws IOException {
name=dataInput.readUTF();
value=dataInput.readLong();
}
//作为key的类必须重写,定义比较规则
public int compareTo(MyKey b) {
return (int)(b.value-value);
}
//定义该类作为key3或value3时的写入格式
public String toString() {
return value.toString();
}
}
相关文章推荐
- MapReuce笔记四之hadoop类型和MR操作hdfs数据实例
- hadoop学习-Mapper和Reducer的输出类型
- Hadoop基于WordCount的Mapper、Reducer、Combiner、Partitioner和自定义多文件输出
- hadoop自定义InputFormat,OutputFormat输入输出类型
- 4000 hadoop入门(三)之 javaAPI操作Hdfs,进行文件操作
- Hadoop之——HDFS操作实例
- Hadoop(十):简单了解Hadoop数据类型,输入输出格式及用户如何自定义。
- 利用java操作Hadoop文件 /hdfs
- JAVA操作HDFS API(hadoop)
- hadoop1.1.2java操作hdfs
- NoSQL之Redis(二)---Java操作Redis存储自定义类型数据
- hadoop hdfs 常用操作java代码
- 将数字序列逆序输出(java)
- Hadoop HDFS文件操作的Java代码
- Hadoop初学指南(4)--使用java操作HDFS
- Hadoop之Java通过URL操作HDFS-yellowcong
- Java实现操作JSON的便捷工具类完整实例【重写Google的Gson】
- Hadoop学习二(java api调用操作HDFS)
- Java操作含Clob或者NClob数据类型的存储过程实例
- 了解Hadoop数据类型,输入输出格式及用户如何自定义。