您的位置:首页 > 运维架构

Hadoop链式MapReduce、多维排序、倒排索引、自连接算法、二次排序、Join性能优化、处理员工信息Join实战、URL流量分析、TopN及其排序、求平均值和最大最小值、数据清洗ETL、分析气

2016-02-28 06:19 1161 查看
Hadoop Mapreduce
算法汇总


第52课:Hadoop链式MapReduce编程实战...1

第51课:Hadoop MapReduce多维排序解析与实战...2

第50课:HadoopMapReduce倒排索引解析与实战...3

第49课:Hadoop MapReduce自连接算法及编程实战...4

第48课:Hadoop MapReduce二次排序编程实战...6

第47课:Hadoop MapReduce二次排序算法和实现解析(原理课)...7

第46课:Hadoop Join性能优化编程实战...7

第45课:Hadoop Join性能优化之原理和运行机制详解(原理课)...9

第44课:Hadoop处理员工信息Join实战...9

第43课:Hadoop实战URL流量分析...11

第42课:Hadoop中的TopN及其排序原理剖析及代码实战...12

第41课:Hadoop求平均值和最大最小值案例实战以及测试调试...13

第40课:Hadoop数据去重和数据排序案例实战及数据清洗ETL.13

第39课:MapReduce分析气象数据动手编程实战...14

Hadoop算法水平的体现:

l
Key值的定义

l
Hadoop链式MapReduce的开发

第52课:Hadoop链式MapReduce编程实战

【数据文件Input】

Computer,5000

SmartPhone,3000

Tablet,1500

Tv,50000

Book,18

Clothes,150

Gloves,9

Computer,5000

SmartPhone,3000

Tablet,1500

Tv,50000

Book,18

Clothes,150

Gloves,9

SmartPhone,3000

Tablet,1500

【运行结果Output】商品价格大于100小于10000,然后将相同的商品价格累加,累加总价大于5000的商品输出结果

Computer 10000

SmartPhone 9000

【源代码文件】ChainMapperReducer.java

【Map阶段】K,V值定义

context.write(new Text(splited[0].trim()),newIntWritable(price));//map阶段key值为商品

context.write(key,value);// ChaintDataMapper1、ChaintDataMapper2

【Reduce阶段】K,V值输出

context.write(key,new IntWritable(summary));//ChainDataReducer

context.write(key,value); // ChaintDataMapper3

【链式Mapreduce定义】

ChainMapper.addMapper(job,
ChaintDataMapper1.class,LongWritable.class, Text.class, Text.class, IntWritable.class, newConfiguration());

ChainMapper.addMapper(job,
ChaintDataMapper2.class,Text.class, IntWritable.class, Text.class, IntWritable.class, newConfiguration());

ChainReducer.setReducer(job,ChainDataReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, newConfiguration());

ChainReducer.addMapper(job,ChaintDataMapper3.class, Text.class, IntWritable.class, Text.class, IntWritable.class, newConfiguration());

第51课:HadoopMapReduce多维排序解析与实战

【数据文件Input】

Spark,100

Hadoop,60

Kafka,95

Spark,99

Hadoop,65

Kafka,98

Spark,99

Hadoop,63

Kafka,97

【运行结果Output】按名称分组输出排序结果

Hadoop 60,63,65

Kafka 95,97,98

Spark 99,99,100

【源代码文件】MutipleSorting.java

【Map阶段】K,V值定义

map阶段key值为自定义的intMultiplePair,读入每行数据,如Spark、100放入intMultiplePair的first、second;100作为value放入intWritable

context.write(intMultiplePair,intWritable);

IntMultipleSortingComparator按IntMultiplePair分别按first、second值排序比较大小

【Reduce阶段】K,V值输出

按GroupingMultipleComparator分组排序,仅按first为KEY值分组,将同名称的分数iterator()合并放到result里面,输出排序结果

context.write(newText(key.getFirst()), new Text(result));// reduce阶段输出K,V

【自定义KEY及相关类】

class IntMultiplePair定义2个属性 firstsecond;

classIntMultipleSortingComparator重写compare方法

if(!x.getFirst().equals(y.getFirst())){

returnx.getFirst().compareTo(y.getFirst());

} else {

return x.getSecond() - y.getSecond();

}

classGroupingMultipleComparator重写compare方法

return x.getFirst().compareTo(y.getFirst());

classMyMultipleSortingPartitioner重写getPartition方法

return (arg0.getFirst().hashCode() &Integer.MAX_VALUE)%arg2;

job配置

job.setPartitionerClass(MyMultipleSortingPartitioner.class);

job.setSortComparatorClass(IntMultipleSortingComparator.class);

job.setGroupingComparatorClass(GroupingMultipleComparator.class);

第50课:HadoopMapReduce倒排索引解析与实战

【数据文件Input】

#cat file1.txt

Spark is so powerful

#cat file2.txt

Spark is the most excitingthing happening in big data today

#cat file3.txt

Hello Spark Hello again Spark

【运行结果Output】

Hello file3.txt:2

Spark file3.txt:2;file1.txt:1;file2.txt:1

again file3.txt:1

big file2.txt:1

data file2.txt:1

exciting file2.txt:1

happening file2.txt:1

in file2.txt:1

is file2.txt:1;file1.txt:1

most file2.txt:1

powerful file1.txt:1

so file1.txt:1

the file2.txt:1

thing file2.txt:1

today file2.txt:1

【源代码文件】 InvertedIndex.java

【Map阶段】K,V值定义

setup方法获取文件名 fileName =inputSplit.getPath().getName();

读入每行数据,切割成单词以后,“单词名:文件名”作为KEY,number作为value计数为1次
context.write(new Text(keyForCombiner), number);
while(stringTokenizer.hasMoreTokens()){

String keyForCombiner = stringTokenizer.nextToken() + ":" +fileName;

context.write(new Text(keyForCombiner), number);

【本地Combiner阶段】class DataCombiner

本地遍历相同的“单词名:文件名”的values,进行累加计数;然后拆开key,将单词拿出来放入key值中,而将“文件名:累计数”放入value中

context.write(new Text(keyArray[0]),new Text(keyArray[1]+":"+sum));

for(Text item : values){

sum += Integer.valueOf(item.toString());

}

String[]keyArray = key.toString().split(":");

context.write(new Text(keyArray[0]), newText(keyArray[1]+":"+sum));

【Reduce阶段】K,V值输出

输出K,V,K值是单词,values以“文件名:累计数;文件名:累计数”的方式合并输出。context.write(key,new Text(result.toString().substring(0, result.toString().length() -1)));
for(Text item : values){

result.append(item + ";");

}

context.write(key, newText(result.toString().substring(0, result.toString().length() -1)));

}

job配置

job.setCombinerClass(DataCombiner.class);

第49课:HadoopMapReduce自连接算法及编程实战

【数据文件Input】0列是孩子 1列是父亲

Tom Lucy

Tom Jack

Jone Lucy

Jone Jack

Lucy Mary

Lucy Ben

Jack Alice

Jack Jesse

Terry Alice

Terry Jesse

Philip Terry

Philip Alma

Mark Terry

Mark Alma

【运行结果Output】 找祖父亲

Tom Alice

Tom Jesse

Jone Alice

Jone Jesse

Tom Ben

Tom Mary

Jone Ben

Jone Mary

Philip Alice

Philip Jesse

Mark Alice

Mark Jesse

【源代码文件】 SelfJoin.java

【Map阶段】K,V值定义

读入每行数据,及切割单词,在map阶段输出2次K,V值,分别对应左表、右表;

l 左表Key为1列的父亲,value为“1_孩子”;

l 右表Key为0列的孩子,value为“0_父亲”;

context.write(newText(array[1].trim()), new Text("1_"+array[0].trim())); //left

context.write(new Text(array[0].trim()), newText("0_"+array[1].trim())); //right

【Reduce阶段】K,V值输出

遍历相同key值的values,根据map阶段定义的分隔符“_”切分values,1是孩子,将splited[1]放入grandChildList列表;0是父亲,将splited[1]放入grandParentList列表;然后遍历输出祖孙和祖父的笛卡儿积。

while(iterator.hasNext()){

String item = iterator.next().toString();

String[] splited = item.split("_");

if(splited[0].equals("1")){

grandChildList.add(splited[1]);

} else {

grandParentList.add(splited[1]);

} }

if(grandChildList.size() > 0 && grandParentList.size() > 0){

for (String grandChild:grandChildList ) {

for (String grandParent: grandParentList) {

context.write(new Text(grandChild), new Text(grandParent));

….

第48课:HadoopMapReduce二次排序编程实战

【数据文件Input】

12 8

32 21

54 32

65 21

501 12

81 2

81 6

81 9

81 7

81 1

100 100

【运行结果Output】 按字符串方式排序

100 100

12 8

32 21

501 12

54 32

65 21

81 1

81 2

81 6

81 7

81 9

【源代码文件】 SecondarySort.java

【Map阶段】K,V值定义

将读入每行记录按“\t”切分,第一个及第二个值放入IntPair实例对象,将IntPair实例对象作为KEY值,value是每行的数据。

IntPairitem =new IntPair(splited[0],splited[1]);

context.write(item, value);

【Reduce阶段】K,V值输出

Reduce按key值的compareTo方法经过排序汇总输出,reduce输出的K,V:key值为空,直接将从map读入的value值(每行的数据)输出。

context.write(NullWritable.get(), item);

【自定义KEY值及相关类】

class IntPair定义属性first、second

重写compareTo方法

if (!this.first.equals(o.getFirst())){

return this.first.compareTo(o.first);

} else {

if(!this.Second.equals(o.Second)) {

returnthis.Second.compareTo(o.Second);

} else {

return 0;



class MyPartitioner定义分区数

return (key.hashCode() &Integer.MAX_VALUE) % numPartitioneS;

classSecondarySortGroupComparator分组排序 super(IntPair.class,true);

job配置

job.setPartitionerClass(MyPartitioner.class);

job.setGroupingComparatorClass(SecondarySortGroupComparator.class);

第47课:HadoopMapReduce二次排序算法和实现解析(原理课)

第46课:HadoopJoin性能优化编程实战

【数据文件Input】

#cat members.txt

1 Spark 1

2 Hadoop 1

3 flink 3

4 Kafka 1

5 Tachyon 2

#cat address.txt

1 America

2 China

3 Germa

【运行结果Output】

4 Kafka America

2 Hadoop America

1 Spark America

5 Tachyon China

3 flink Germa

【源代码文件】 JoinImproved.java

【Map阶段】K,V值定义

读入每行数据,按“\t”进行单词切分,

l 如果切分以后的数组长度为2,则读取的是地址文件,将地址ID、地址名称数据存入member实例,同时将地址ID存入memberKey,打标记为true;map输出的K,V值:key为自定义的memberKey(地址),value为自定义的member(地址)

l 如长度不为2,读取的是成员文件,将成员编号、成员名、地址ID数据存入member实例,同时将地址ID存入memberKey,打标记为false;map输出的K,V值:key为自定义的memberKey(成员),value为自定义的member(成员)

if(dataSplited.length == 2){

Member_Information member = newMember_Information();

member.setAddresNo(dataSplited[0]);

member.setAddressName(dataSplited[1]);

MemberKey memberKey = new MemberKey();

memberKey.setKeyID(Integer.valueOf(dataSplited[0]));

memberKey.setFlag(true);

context.write(memberKey, member);

} else {

Member_Information member = newMember_Information();

member.setMemberNo(dataSplited[0]);

member.setMemberName(dataSplited[1]);

member.setAddresNo(dataSplited[2]);

MemberKey memberKey = new MemberKey();

memberKey.setKeyID(Integer.valueOf(dataSplited[2]));

memberKey.setFlag(false);

context.write(memberKey, member);

}

【Reduce阶段】K,V值输出

Reduce按memberKey值compareTo的方法进行排序汇总,MemberKey排序比较:如果MemberKey的地址ID相同,则比较打的标记,如标记为true,则返回-1,这样地址信息就排到第一条;如果标记为false,返回1;我们只需对地址信息之后的成员数据一条条进行处理。这样就进行了Join性能优化,避免了将成员及地址信息放入List列表中,再在list列表中筛选出地址信息,在List数据量大的情况下导致OOM。

Reduce输出K,V:Key值为空值,values为Member_Information的toString值

for(Member_Informationitem : values){

if(0 == counter){

member = new Member_Information(item);

} else {

Member_Information mem = newMember_Information(item);

mem.setAddressName(member.getAddressName());

context.write(NullWritable.get(), newText(mem.toString()));

}

counter++;

}

【自定义KEY值及相关类】

class MemberKey定义属性keyID,flag

重写compareTo方法

publicint compareTo(MemberKey member) {

if(this.keyID == member.keyID){

if(this.flag== member.flag){

return0;

}else {

returnthis.flag? -1:1;

}

}else {

returnthis.keyID - member.keyID > 0? 1 :-1;

}

}

class Member_Information

定义属性存放成员及地址信息

privateString memberNo = "";

privateString memberName = "";

privateString addresNo = "";

privateString addressName = "";

重写toString方法

return this.memberNo + " " + this.memberName + "" + this.addressName;

class GroupComparator重写compare方法

MemberKeyx = (MemberKey)a;

MemberKeyy = (MemberKey)b;

if(x.getKeyID()== y.getKeyID()){

return0;

}else {

returnx.getKeyID() > y.getKeyID()? 1 : -1;

}

Job配置

job.setGroupingComparatorClass(GroupComparator.class);

第45课:HadoopJoin性能优化之原理和运行机制详解(原理课)

第44课:Hadoop处理员工信息Join实战

【数据文件Input】

workers.txt

7499 allen salesman 76981981-02-20 1600 300 30

7782 clark managers 76391981-06-09 2450 10

7654 martin salesman 76981981-03-20 1250 1400 30 boston

7900 james clerk 76981981-01-09 950 30

7788 scott analyst 75661981-09-01 3000 100 20

department.txt

30 sales chicago

20 research dallas

10 accounting newyork

【运行结果Output】

10 10 accounting

10 7782 clark 10

20 7788 scott 20

20 20 research

30 30 sales

30 7900 james 30

30 7654 martin 30

30 7499 allen 30

【源代码文件】 JoinWorkersInformation.java

【Map阶段】K,V值定义

读入每行数据,按“\t”进行单词切分,

l 如果切分以后的数组长度小于3,则读取的是部门文件,将部门ID、部门名称、部门标记0数据存入MemberInformation实例;map输出的K,V值:key为部门ID,value为自定义的MemberInformation(部门)

l 如长度大于3,读取的是职员文件,将职员编号、职员名、部门ID、职员标记1数据存入MemberInformation实例;map输出的K,V值:key为部门ID,value为自定义的MemberInformation(职员)

if (data.length <=3){ //department

MemberInformation department= newMemberInformation ();

department.setDepartmentNo(data[0]);

department.setDepartmentName(data[1]);

department.setFlag(0);

context.write(newLongWritable(Long.valueOf(department.getDepartmentNo())), department);

}else { // worker

MemberInformation worker= new MemberInformation ();

worker.setWorkerNo(data[0]);

worker.setWorkerName(data[1]);

worker.setDepartmentNo(data[7]);

worker.setFlag(1);

context.write(newLongWritable(Long.valueOf(worker.getDepartmentNo())), worker);

… }

【Reduce阶段】K,V值输出

reduce根据key部门ID的值进行汇总,如标记为0,标识是部门信息,取到部门的数据,如果不为0,表示是职员数据,在workerList列表中增加职员记录。遍历职员列表,输出K,V:key值为0值,value为职员的toString信息

List<MemberInformation> workerList=newArrayList<MemberInformation>();

for (MemberInformation item : values) {

if (0 ==item.getFlag()) {

department=new MemberInformation(item);

}else {

workerList.add(new MemberInformation(item) );

}

}

for (MemberInformation worker:workerList){ worker.setDepartmentNo(department.getDepartmentNo()); worker.setDepartmentName(department.getDepartmentName());

resultValue.set(worker.toString());

context.write(resultKey,resultValue );

}

classMemberInformation

定义属性

private String workerNo ="";

private String workerName ="";

private String departmentNo ="";

private String departmentName ="";

private int flag = 0; // 0 department,1 worker

重写toString方法

return this.workerNo + " "+this.workerName+" "+this.departmentNo + " "+this.departmentName;

第43课:Hadoop实战URL流量分析

【数据文件Input】URLLog.txt

127.0.0.1 - - [03/Jul/2015:23:36:38 +0800] "GET /course/detail/3.htmHTTP/1.0" 200 38435 0.038

182.131.89.195 - - [03/Jul/2015:23:37:43 +0800] "GET / HTTP/1.0" 301- 0.000

127.0.0.1 - - [03/Jul/2015:23:38:27 +0800] "POST/service/notes/addViewTimes_23.htm HTTP/1.0" 200 2 0.003

【运行结果Output】

GET / 3

GET /course/detail/3.htm 1

GET /course/list/73.htm 1

GET/html/notes/20140318/24.html 1

GET/html/notes/20140609/542.html 1

GET/html/notes/20140609/544.html 1

【源代码文件】 URLLog.java

【Map阶段】K,V值定义

Map输出的K,V:将每行数据使用handleLine函数处理,区分GET/POST,截取相应URL字符串赋值给key值,resultValue值计数为1

context.write(text,resultValue);

privateString handleLine(String line) {

StringBufferbuffer = new StringBuffer();

if(line.length()>0){

if(line.contains("GET")){ buffer.append(line.substring(line.indexOf("GET"),line.indexOf("HTTP/1.0")).trim());

}elseif ( line.contains("POST")) { buffer.append(line.substring(line.indexOf("POST"),line.indexOf("HTTP/1.0")).trim());



returnbuffer.toString();

}

【Reduce阶段】K,V值输出

Reduce汇总输出K,V;key为提取的URL字符串,totalresultValue为累加计数。

context.write(key, totalresultValue);

第42课:Hadoop中的TopN及其排序原理剖析及代码实战

【数据文件Input】TopN1.txt

1,9819,100,121

2,8918,2000,111

3,2813,1234,22

4,9100,10,1101

5,3210,490,111

6,1298,28,1211

7,1010,281,90

8,1818,9000,20

TopN2.txt

10,3333,10,100

11,9321,1000,293

12,3881,701,20

13,6791,910,30

14,8888,11,39

【运行结果Output】

1 9000

2 2000

3 1234

【源代码文件】 TopNSorted.java

【Map阶段】K,V值定义

l
setup方法:配置TopN取值:length =context.getConfiguration().getInt("topn", 5);

l cleanup方法:context.write(newText(String.valueOf(topN[i])), new Text(String.valueOf(topN[i])));
输出K,V,key值为map中Arrays.sort(topN)排序以后的TopN值,value为数值
l conf设置TopN配置conf.setInt("topn", 3);

【Reduce阶段】K,V值输出

l cleanup方法: context.write(new Text(String.valueOf(length- i + 1)), new Text(String.valueOf(topN[i])));
输出K,V,key值为reduce汇总时
Arrays.sort(topN)排序以后的TopN值,value为数值

第41课:Hadoop求平均值和最大最小值案例实战以及测试调试

【数据文件Input】

#cat dataBiggestsmallest.txt

1

33

55

66

77

45

34567

50

88776

345

5555555

23

32

【运行结果Output】

maxValue 5555555

minValue 1

【源代码文件】 BiggestSmallest.java

【Map阶段】K,V值定义

Map的K,V:key值设置为keyFoReducer,value为每行的数据

context.write(keyFoReducer,data);

【Reduce阶段】K,V值输出

Reduce K,V输出最大值及最小值

for(LongWritable item : values){

if (item.get()>maxValue){

maxValue=item.get();

}

if (item.get()<minValue){

minValue=item.get();

}

}

context.write(new Text("maxValue"),new LongWritable(maxValue));

context.write(new Text("minValue"),new LongWritable(minValue));

}

第40课:Hadoop数据去重和数据排序案例实战及数据清洗ETL

【数据文件Input】SortedData.txt

423535

45

666

77

888

22

3

4

5

7777888

99999

【运行结果Output】

1 3

2 4

3 5

4 22

5 45

6 77

7 666

8 888

9 99999

10 423535

11 7777888

【源代码文件】 SortData .java

【Map阶段】K,V值定义

读入每行数据,map输出K,V:key值为每行的数据,value设置为1

context.write(data, eValue);

【Reduce阶段】K,V值输出

Reduce输出K,V值:key值为position计数数值,value:map的key排序以后作为reduce的Value值输出,

for(LongWritableitem : values){

context.write(position,key);

position.set(position.get()+ 1);

}

第39课:MapReduce分析气象数据动手编程实战

【数据文件Input】

0067011990999991950051507004888888889999999N9+00001+9999999999999999999999

0067011990999991950051507004888888889999999N9+00001+9999999999999999999999

0067011990999991950051512004888888889999999N9+00221+9999999999999999999999

0067011990999991950051518004888888889999999N9-00111+9999999999999999999999

0067011990999991949032412004888888889999999N9+01111+9999999999999999999999

0067011990999991950032418004888888880500001N9+00001+9999999999999999999999

0067011990999991950051507004888888880500001N9+00781+9999999999999999999999

第15-19个字符表示year,例如1950年、1949年等;

第45-50个字符表示的是温度,例如-00111、+00001

第50位只能是0、1、4、5、9等几个数字;

【运行结果Output】

1949111

[b]1950 -11
[/b]

【源代码文件】 TemperatureComputation.java

【Map阶段】K,V值定义

每行记录0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
l
MAP阶段, hadoop Mapreduce框架自动将上述的每行记录赋值给map的value值,(key值为每行记录偏移量)

l 将value值赋值给变量data,通过字符串截取出第15-19个字符的year,45-50个字符的是温度,以及清洗掉不符合规则的记录。

if (temperature !=MiSSING &&validDataFlag.matches("[01459]")){ //正则表达式,清洗数据

context.write(new Text(year), newIntWritable(temperature)); //返回集合,key年 value
同年的温度集合

}

l
MAP阶段输出的KEY值为year;输出的value值为温度。

【Reduce阶段】K,V值输出

l Reduce阶段,上述Map汇总过来的每年的温度值value是一个集合,例如一年每天合计有365个温度,因此Reduce读入的value类型Iterable<IntWritable>;key是年份

l 对于value的温度集合遍历,找出最小值的温度。

for(IntWritableitem:data){

coldestTemperature=Math.min(coldestTemperature, item.get());

} //遍历温度集合,比较温度大小

Reduce 阶段汇总输出的KEY值是为year;输出的value值为最小值的温度(即最低温度)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: