您的位置:首页 > 其它

分布式计算:批处理引擎 MapReduce(第二部分)

2017-03-12 00:08 393 查看

第一:MapReduce Java高级编程

一:Grep问题(多个MR串行运行)

背景介绍

一批TB或者PB量级的文档,需要完成以下功能:

搜索符合某种规则(正则表达式(单词中包含字符a))的单词或者句子;

统计相应的单词或者句子的数目;

按照数目对其进行排序,并输出最终结果。

解决思路

分为两个作业:

作业一:wordCount

统计符合条件的单词数目

作业二:sort

按照单词数目进行全排序,依赖于前一个作业的输出结果

编程代码

package com.dev4free.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
*
* @author syd
*  - 搜索符合某种规则(正则表达式(单词中包含字符a))的单词或者句子;
- 统计相应的单词或者句子的数目;
- 按照数目对其进行排序,并输出最终结果。
*
*/
public class MyWordCountSort {

static Path tempDir = new Path("/mywordcountsort_temp");

/**
* wordCount的mapper阶段
* @author syd
*
*/
public static class MyWordCountMapper extends Mapper<Object, Text, Text, LongWritable>{
private static final LongWritable one = new LongWritable(1);
private Text word = new Text();
public void map(Object key, Text value,Context context) throws IOException,InterruptedException{
StringTokenizer stringTokenizer = new StringTokenizer(value.toString());
while (stringTokenizer.hasMoreTokens()) {
word.set(stringTokenizer.nextToken());
context.write(word, one);
}
}
}

/**
* WordCount的Reducer阶段
* @author syd
*
*/
public static class MyWordCountRedecer extends Reducer<Text, LongWritable, Text, LongWritable>{
private LongWritable result = new LongWritable();
public void reduce(Text key,Iterable<LongWritable> values,Context context) throws IOException,InterruptedException{
long sum = 0;
for (LongWritable value:values) {
sum = sum + value.get();
}
result.set(sum);
context.write(key, result);
}
}

/**
* MySort的mapper阶段处理
* @author syd
*
*/
public static class MySortMapper extends Mapper<Text, LongWritable, LongWritable, Text>{
public void map(Text key,LongWritable value,Context context) throws IOException,InterruptedException{
context.write(value, key);
}
}

public static void main(String[] args) throws Exception {

/**
* wordcount阶段
*/
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
if (otherArgs.length != 2) {
System.out.println("error input");
System.exit(2);
}
Job jobWordCounter = Job.getInstance(configuration,"MyWordCountSort-WordCount");
jobWordCounter.setJarByClass(MyWordCountSort.class);
jobWordCounter.setMapperClass(MyWordCountMapper.class);
jobWordCounter.setReducerClass(MyWordCountRedecer.class);
jobWordCounter.setOutputFormatClass(SequenceFileOutputFormat.class);
jobWordCounter.setOutputKeyClass(Text.class);
jobWordCounter.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(jobWordCounter, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(jobWordCounter, tempDir);
jobWordCounter.waitForCompletion(true);

/**
* Sort阶段
*/
Job jobSort = Job.getInstance(configuration,"MyWordCountSort-Sort");
jobSort.setJarByClass(MyWordCountSort.class);
jobSort.setMapperClass(MySortMapper.class);
jobSort.setInputFormatClass(SequenceFileInputFormat.class);
jobSort.setNumReduceTasks(1);
FileInputFormat.addInputPath(jobSort, tempDir);
FileOutputFormat.setOutputPath(jobSort, new Path(otherArgs[1]));
jobSort.setSortComparatorClass(LongWritable.DecreasingComparator.class);
System.exit(jobSort.waitForCompletion(true)?0:1);
}

}


执行命令

[hadoop@hadoopa conf]$ hadoop jar /home/hadoop/hadoop-0.0.1-SNAPSHOT.jar com.dev4free.hadoop.MyWordCountSort /text1.txt /syd


实验结果

[hadoop@hadoopa test]$ hdfs dfs -cat /text1.txt
i love my country
my country is china
[hadoop@hadoopa test]$ hdfs dfs -cat /mywordcountsort_temp/part-r-00000
SEQorg.apache.hadoop.io.Text!org.apache.hadoop.io.LongWritable▒▒U▒▒\Oꗺ▒-▒^chincountry
i
love
myPuTTYPuTTY[hadoop@hadoopa test]$ PuTTYPuTTY
-bash: PuTTYPuTTY: command not found
[hadoop@hadoopa test]$ hdfs dfs -cat /syd/part-r-00000
2       my
2       country
1       love
1       is
1       i
1       china
[hadoop@hadoopa test]$


二:编写Partitioner(暂略)

1. 背景介绍

2. 解决思路

3. 编程代码

4. 执行代码

5. 实验结果

第二:MapReduce 多语言编程

Hadoop Streaming介绍

与Linux管道机制一致

通过标准输入输出实现进程间通信

标准输入输出是任何语言都有的

Hadoop Streaming与Java编程比较

Java编程

Hadoop最原始开发语言

支持所有功能,是其他编程语言的基础。

Streaming编程

仅用于开发Mapper和Reducer,其他组件需采用Java实现;

天生支持文本格式,但二进制格式支持较弱;

通常用于简单的文本数据处理,加快开发效率

第三:MapReduce调优小技巧

内存参数调优

-Dmapreduce.map.memory.mb=5120

-Dmapreduce.reduce.memory.mb=5120

-Dmapreduce.map.java.opts=”-Xms4096m -Xmx4096m”

-Dmapreduce.reduce.java.opts=”-Xms4096m -Xmx4096m”

容错参数

mapreduce.map.maxattempts:map task重试次数,默认为4

mapreduce.reduce.maxattempts:reduce task重试次数,默认为4

mapreduce.map.failures.maxpercent:允许最大的map失败率默认为0

mapreduce.reduce.failures.maxpercent:允许最大的reduce失败率默认为0

推测执行

mapreduce.map.speculative

mapreduce.reduce.speculative

调整作业的任务数目

mapreduce.input.fileinputformat.split.minsize:每个map task处理的数据量(大于一个block块,以减少map task的任务数)

第四:项目实战

项目介绍

原始数据:/flume/record/2017-03-10/2220/transaction_log.1489155600805。共有12列。

现在将第4列(1489155672)转换成标准时间格式(2017-03-10 22:20:24),然后删除第11列和第12列。

使用python编写的etl.py。通过Hadoop Streaming完成该数据清洗工作

项目编码

[hadoop@hadoopa test]$ cat /home/hadoop/etl.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys,time

def timestamp_datetime(value):
format = '%Y-%m-%d %H:%M:%S'
value = time.localtime(value)
dt = time.strftime(format,value)
return dt

# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
columns = line.split(",");

if len(columns) != 12:
continue

for i in range(len(columns)):
columns[i] = columns[i].strip()

columns[3]=timestamp_datetime(int(columns[3]))
print columns[0]+","+columns[1]+","+columns[2]+","+columns[3]+","+columns[4]+","+columns[5]+","+columns[6]+","+columns[7]+","+columns[8]+","+columns[9]


运行命令

[hadoop@hadoopa ~]$ hadoop jar /home/hadoop/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=0 -input /flume/record/2017-03-10/2220/transaction_log.1489155600805 -output /mapred/etl -mapper /home/hadoop/etl.py -file /home/hadoop/etl.py


运行结果

[hadoop@hadoopa ~]$ hdfs dfs -tail /flume/record/2017-03-10/2220/transaction_log.1489155600805
,5503894278515942,ZHONGTONG,217.90.43.20,xh
0000099993,00000080,00000533,1489155672,456,QingHai,GuiZhou,TAOBAO,4011118331878,SHENTONG,80.173.222.62,mhr
0000099994,00000451,00000349,1489155672,858,HuNan,YunNan,JUHUASUAN,869939198450453,EMS,230.67.36.151,pl
0000099995,00000234,00000122,1489155672,516,JiangSu,ZheJiang,JUHUASUAN,3528806766166620,SHENTONG,8.75.11.240,hu
0000099996,00000407,00000433,1489155672,579,FuJian,JiangXi,TIANMAOCHAOSHI,5259670539139763,SHUNFENG,185.39.72.37,pl
0000099997,00000703,00000626,1489155673,911,GuiZhou,ChongQing,TIANMAOCHAOSHI,372175703839365,EMS,133.87.61.178,fo
0000099998,00000160,00000790,1489155673,113,BeiJing,GuiZhou,TIANMAO,6011159707661458,EMS,119.92.119.128,nl
0000099999,00000049,00000675,1489155673,831,JiLin,JiLin,JUHUASUAN,210016971056835,EMS,61.107.157.41,bhb
0000100000,00000785,00000546,1489155673,175,JiLin,SiChuan,TIANMAO,4173227824100,YUNDA,140.128.83.70,ik
0000100001,00000078,00000609,1489155673,921,HeiLongJiang,ChongQing,TIANMAO,30324255590133,YUNDA,60.193.159.6,ko

[hadoop@hadoopa ~]$ hdfs dfs -tail /mapred/etl/part-00000
9771,00000603,00000434,2017-03-10 22:20:23,839,Aomen,QingHai,TIANMAOCHAOSHI,869996644225012,SHUNFENG
0000099772,00000497,00000718,2017-03-10 22:20:24,475,JiangXi,TianJin,TAOBAO,676262630509,YUNDA
0000099773,00000668,00000997,2017-03-10 22:20:24,717,ShanXi3,XiangGang,TIANMAO,639034920186,SHENTONG
0000099774,00000597,00000571,2017-03-10 22:20:24,965,XiangGang,GuiZhou,TIANMAO,630456160567,YUANTONG
0000099775,00000106,00000630,2017-03-10 22:20:24,262,XiangGang,HeiLongJiang,JUHUASUAN,30122789425358,YUNDA
0000099776,00000473,00000433,2017-03-10 22:20:24,186,XinJiang,AnHui,TIANMAOCHAOSHI,4638479668871500,EMS
0000099777,00000135,00000089,2017-03-10 22:20:24,249,TianJin,ChongQing,TAOBAO,30084958874818,SHENTONG
0000099778,00000737,00000471,2017-03-10 22:20:24,846,HuBei,ZheJiang,TIANMAO,060445261658,YUNDA
0000099779,00000343,00000563,2017-03-10 22:20:25,219,XiZang,Aomen,TIANMAOCHAOSHI,340317530427282,EMS
0000099780,00000963,00000319,2017-03-10 22:20:25,881,AnHui,FuJian,TIANMAOCHAOSHI,6011023388923241,SHENTONG
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐