您的位置:首页 > 其它

第一个MapReduce程序----wordcount(编写并运行)

2016-03-17 22:33 519 查看
感谢段海涛老师

先引入common和mapreduce及其依赖的jar包

WordCountMapper

package club.drguo.hadoop.mapreduce;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        //获取到一行文件的内容
        String line = value.toString();
        //切分一行的内容作为一个单词数组,以空格分隔
        String[] words = StringUtils.split(line, " ");
        //遍历输出<word,1>
        for(String word : words){
            context.write(new Text(word), new LongWritable(1));
        }
    }
}
WordCountReducer

package club.drguo.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//定义一个累加计数器
long count = 0;
for(LongWritable value : values){
count += value.get();
}
//输出<单词:count>键值对
context.write(key, new LongWritable(count));
}
}
WordCountRunner

package club.drguo.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 用来描述一个作业JOB(使用哪个mapper类,reducer类,输出文件在哪,输出结果在哪)
* 然后提交JOB到hadoop集群
* @author guo
*
*/
//club.drguo.hadoop.mapreduce.WordCountRunner
public class WordCountRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//设置job中的资源所在的jar包(指明main方法所在类)
job.setJarByClass(WordCountRunner.class);
//job要使用哪个mapper类
job.setMapperClass(WordCountMapper.class);
//job要使用哪个reducer类
job.setReducerClass(WordCountReducer.class);
//job的mapper类输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//job的reducer类输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定要处理的数据所存放的路径(不是指定某一个文件,而是该路径下的所有文件)
FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/data/input");
//先判断一下,如果结果已存在删除
FileSystem fileSystem = FileSystem.get(configuration);
Path path = new Path("/data/output/firstmr");
if(fileSystem.exists(path)){
fileSystem.delete(path, true);//true递归删除
}
//指定处理结果所存放的路径
FileOutputFormat.setOutputPath(job, path);
//显示进度
boolean b = job.waitForCompletion(true);
System.out.println(b?"完成":"未完成");
}
}



写完之后导出jar包

先生成一个测试的数据,之后上传到hdfs,然后运行,最后查看结果。也可直接在eclipse中run

guo@guo:~$ touch test.log
guo@guo:~$ vi test.log
guo@guo:~$ vi test.log
guo@guo:~$ hdfs dfs -put ./test.log /data/input
guo@guo:~$ hdfs dfs ls /data/input
ls: Unknown command
Did you mean -ls?  This command begins with a dash.
guo@guo:~$ hdfs dfs -ls /data/input
Found 1 items
-rw-r--r--   1 guo supergroup         37 2016-03-17 22:22 /data/input/test.log
guo@guo:~$ hadoop jar /home/guo/firstwordcount.jar club.drguo.hadoop.mapreduce.WordCountRunner
16/03/17 22:24:47 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/03/17 22:24:47 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/17 22:24:48 INFO input.FileInputFormat: Total input paths to process : 1
16/03/17 22:24:48 INFO mapreduce.JobSubmitter: number of splits:1
16/03/17 22:24:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1458222408729_0002
16/03/17 22:24:48 INFO impl.YarnClientImpl: Submitted application application_1458222408729_0002
16/03/17 22:24:48 INFO mapreduce.Job: The url to track the job: http://guo:8088/proxy/application_1458222408729_0002/ 16/03/17 22:24:48 INFO mapreduce.Job: Running job: job_1458222408729_0002
16/03/17 22:24:52 INFO mapreduce.Job: Job job_1458222408729_0002 running in uber mode : false
16/03/17 22:24:52 INFO mapreduce.Job:  map 0% reduce 0%
16/03/17 22:24:55 INFO mapreduce.Job:  map 100% reduce 0%
16/03/17 22:24:59 INFO mapreduce.Job:  map 100% reduce 100%
16/03/17 22:25:00 INFO mapreduce.Job: Job job_1458222408729_0002 completed successfully
16/03/17 22:25:00 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=71
        FILE: Number of bytes written=234977
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=143
        HDFS: Number of bytes written=41
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=1306
        Total time spent by all reduces in occupied slots (ms)=1475
        Total time spent by all map tasks (ms)=1306
        Total time spent by all reduce tasks (ms)=1475
        Total vcore-milliseconds taken by all map tasks=1306
        Total vcore-milliseconds taken by all reduce tasks=1475
        Total megabyte-milliseconds taken by all map tasks=1337344
        Total megabyte-milliseconds taken by all reduce tasks=1510400
    Map-Reduce Framework
        Map input records=5
        Map output records=3
        Map output bytes=59
        Map output materialized bytes=71
        Input split bytes=106
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=71
        Reduce input records=3
        Reduce output records=3
        Spilled Records=6
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=66
        CPU time spent (ms)=790
        Physical memory (bytes) snapshot=439631872
        Virtual memory (bytes) snapshot=3839033344
        Total committed heap usage (bytes)=324009984
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=37
    File Output Format Counters
        Bytes Written=41
完成
guo@guo:~$ hdfs dfs -cat /data/output/firstmr/*
hello    2
kitty    1
world    1


也可直接在eclipse中run,但要先导出jar包并在WordCountRunner加入这个jar包的位置等信息。还要导入hdfs的jar包及其依赖,不然会报Exception in thread "main" java.io.IOException: No FileSystem for scheme: hdfs

package club.drguo.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 用来描述一个作业JOB(使用哪个mapper类,reducer类,输出文件在哪,输出结果在哪)
* 然后提交JOB到hadoop集群
* @author guo
*
*/
//club.drguo.hadoop.mapreduce.WordCountRunner
public class WordCountRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
configuration.set("mapreduce.job.jar", "wordcount.jar");//加入这行,我是把jar包直接导在项目中了
Job job = Job.getInstance(configuration);
//设置job中的资源所在的jar包(指明main方法所在类)
job.setJarByClass(WordCountRunner.class);
//job要使用哪个mapper类
job.setMapperClass(WordCountMapper.class);
//job要使用哪个reducer类
job.setReducerClass(WordCountReducer.class);
//job的mapper类输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//job的reducer类输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定要处理的数据所存放的路径(不是指定某一个文件,而是该路径下的所有文件)
FileInputFormat.setInputPaths(job, "hdfs://localhost:9000/data/input");
//先判断一下,如果结果已存在删除
FileSystem fileSystem = FileSystem.get(configuration);
Path path = new Path("/data/output/firstmr");
if(fileSystem.exists(path)){
fileSystem.delete(path, true);//true递归删除
}
//指定处理结果所存放的路径
FileOutputFormat.setOutputPath(job, path);
//显示进度
boolean b = job.waitForCompletion(true);
System.out.println(b?"完成":"未完成");
}
}


也可本地运行,把yarn目录下的jar包引入后,把WordCountRunner的地址改为本地,直接run就可以了。

package club.drguo.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 本地
* @author guo
*
*/
//club.drguo.hadoop.mapreduce.WordCountRunner
public class WordCountRunner2 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//设置job中的资源所在的jar包(指明main方法所在类)
job.setJarByClass(WordCountRunner2.class);
//job要使用哪个mapper类
job.setMapperClass(WordCountMapper.class);
//job要使用哪个reducer类
job.setReducerClass(WordCountReducer.class);
//job的mapper类输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//job的reducer类输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定要处理的数据所存放的路径(不是指定某一个文件,而是该路径下的所有文件)
FileInputFormat.setInputPaths(job, "/home/guo/data/input");
//指定处理结果所存放的路径
FileOutputFormat.setOutputPath(job, new Path("/home/guo/data/output"));
//显示进度
boolean b = job.waitForCompletion(true);
System.out.println(b?"完成":"未完成");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: