您的位置:首页 > 移动开发

hadoop编程入门学习笔记-4 ChainMapper、DistributedCache和Context

2015-11-17 20:30 471 查看
这是《Hadoop Beginner's Guide》 第四章的学习笔记。本章通过分析UFO sighting dataset的讲解了相关编程技巧。所需的ufo.tsv文件在书中给出的链接已经下不到了,在网上搜索文件名可以找到。书中的脚本用的是ruby,我把它改成了python。书中用的旧的api,因为装的是hadoop2.6.0,用的新api,我对程序也做相应改动。事后来看这种方式还挺有用的,迫使自己先看懂程序,对api不懂的地方就得度娘帮忙了。api说明在http://hadoop.apache.org/docs/r2.6.0/api/index.html。

ufo sighting文件

序号字段说明
1Sighting date看到UFO 的时间
2Recorded date记录时间
3Location看到UFO 的地点
4Shape形状,如diamond等
5Duration 持续时间
6Description描述

流方式统计记录数和字段数

这个很简单,相当于WordCount。

wcmapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """

import sys

def map(input):
for line in input:
line = line.strip()
words = line.split()
for word in words:
print '%s\t%s' % (word, 1)

def main():
map(sys.stdin)

if __name__ == "__main__":
main()


wcreducer.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """

import sys

def reduce(input):
current_word = None
current_count = 0
word = None
for line in input:
line = line.strip()

word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue

if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' %(current_word, current_count)
current_count = count
current_word = word

print '%s\t%s' % (current_word, current_count)

def main():
reduce(sys.stdin)

if __name__ == "__main__":
main()


summarymapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """

import sys

def map(input):
for line in input:
print "total\t1"
line = line.strip()
words = line.split("\t")
if len(words) != 6:
print "badline\t1"
else:
if words[0] != None:
print "sighted\t1"
if words[1] != None:
print "recorded\t1"
if words[2] != None:
print "location\t1"
if words[3] != None:
print "shape\t1"
if words[4] != None:
print "duration\t1"
if words[5] != None:
print "description\t1"

def main():
map(sys.stdin)

if __name__ == "__main__":
main()

用到的命令

cat ufo.tsv | ./summarymapper.py | sort | ./wcreducer.py > output.txt
hadoop dfs -copyFromLocal ufo.tsv /user/hadoop/ufo/ufo.tsv
hadoop jar /home/hadoop/cloud/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -file summarymapper.py -mapper summarymapper.py -file wcreducer.py -reducer wcreducer.py -input /user/hadoop/ufo/ufo.tsv -output /user/hadoop/ufo/out

按形状分组统计持续时间

第一个脚本shpemapper.py只是简单的统计形状,相当于WordCount。脚本shapetimemapper.py和shapetimemapper.py实现分组统计。相比于shpemapper.py,shapetimemapper.py多做了一件事情,第一件与shpemapper.py一样,识别形状(words[3]);第二件是用用到正则表达式提取文本(words[4]),转换成整数后累加,输出时将 print shape + "\t1" 改为 print shape + "\t" + str(time)。shapetimereducer.py中实现min
、max、total、mean等计算。

shpemapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """

import sys

def map(input):
for line in input:
line = line.strip()
words = line.split("\t")
if len(words) == 6:
shape = words[3].strip()
if len(shape) > 0:
print shape + "\t1"

def main():
map(sys.stdin)

if __name__ == "__main__":
main()


shapetimemapper.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """

import sys
import re

def map(input):
pattern1 = re.compile(r'\d* ?((min)|(sec))')
pattern2 = re.compile(r'\d*')
for line in input:
line = line.strip()
words = line.split("\t")
if len(words) == 6:
shape = words[3].strip()
duration = words[4].strip()
if shape != None and duration != None:
match = pattern1.match(duration)
if match != None:
time = pattern2.match(match.group())
unit = match.group(1)
try:
time = int(time.group())
except:
#print '??? : ' + duration
time = 0
if unit == 'min':
time = time * 60
if len(shape) > 0:
print shape + '\t' + str(time)

def main():
map(sys.stdin)

if __name__ == "__main__":
main()
shapetimereducer.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""a python script for hadoop streaming map """

import sys
import re

def reduce(input):
current = None
minv = 0
maxv = 0
mean = 0
total = 0
count = 0

for line in input:
line = line.strip()
word, time = line.split('\t')
time = int(time)

if word == current:
count += 1
total += time
if time < minv:
minv = time
if time > maxv:
maxv = time
else:
if current != None:
print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))
current = word
count = 1
total = time
minv = time
maxv = time
print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count))

def main():
reduce(sys.stdin)

if __name__ == "__main__":
main()

ChainMapper

用链的方式把两个Mapper串起来,有点像是servlet里的Filter。示例中用了两个Mapper,第一个验证记录的有效性,第二个对Location进行计数。使用ChainMapper.addMapper 方法添加Mapper。注意job.setJarByClass(ufo.UFORecordValidationMapper.class);否则会报类找不到, 要注意的是新旧api的使用有差异,我在程序中进行了注释。

UFORecordValidationMapper.java

package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

/*
旧api,继承类org.apache.hadoop.mapred.MapReduceBase,然后实现接口org.apache.hadoop.mapred.Mapper<K1, V1, K2, V2>。
新api,继承类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>。
旧api,map方法的第三、四个形参分别是OutputCollector和Reporter类。
新api,map方法的第三个参数是Context类,新api的Context把两个类的功能合并到一起。
*/

public class UFORecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
if(validate(line))
context.write(key, value);
}

private boolean validate(String str)
{
String words[] = str.split("\t");
if(words.length != 6)
return false;
else
return true;
}
}
UFOLocation.java

package ufo;

import java.io.*;
import java.util.Iterator;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class UFOLocation
{
public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
{
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");

public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if(location.length() >= 2)
{
Matcher matcher = locationPattern.matcher(location);
if(matcher.find())
{
int start = matcher.start();
String state = location.substring(start, start + 2);
context.write(new Text(state.toUpperCase()), one);
}
}
}
}

/*
新api中,驱动代码通过org.apache.hadoop.mapreduce.Job类实现,通过该类管理各种配置,然后调用waitForCompletion(boolean)方法把代码提交给JobTracker执行。
旧api中,驱动代码通过org.apache.hadoop.mapred.JobConf(Configuration, Class)类实现,通过该类管理各种配置;通过org.apache.hadoop.mapred.JobClient类的runJob(JobConf)方法实现job的提交。
*/
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "UFOLocation");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, mapAConf);

Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
Text.class, LongWritable.class, mapBConf);

job.setJarByClass(ufo.UFORecordValidationMapper.class);
job.setMapperClass(ChainMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);
}
}


DistributedCache

通过DistributedCache可以共享文件,共享文件分两步实现,第一步是在main中用DistributedCache.addCacheFile添加文件,第二步是在Mapper的获得这个文件,比如Mapper的setup方法将文件读入到一个HashMap。在实例中创建了一个states.txt文件作为州名的简称到全称的转换。

states.txt

AL	Alabama
AK	Alaska
AZ	Arizona
AR	Arkansas
CA	California
UFOLocation2.java

package ufo;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.chain.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.filecache.DistributedCache;

public class UFOLocation2
{
public static class MapClass extends Mapper<Object, Text, Text, LongWritable>
{
public final static String LINK_STATES_TXT = "__Link_statestxt__";
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$");
private Map<String, String> stateNames;

@Override
public void setup(Context text) throws IOException, InterruptedException
{
try
{
setupStateMap();
}catch (IOException e){
System.err.println("Error reading state file.");
System.exit(1);
}
}

private void setupStateMap() throws IOException
{
Map<String, String> states = new HashMap<String, String>();
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(MapClass.LINK_STATES_TXT)));
String line = reader.readLine();
while(line != null)
{
String[] split = line.split("\t");
states.put(split[0], split[1]);
line = reader.readLine();
}
stateNames = states;
}

private String lookupState(String state)
{
String fullName = stateNames.get(state);
return fullName == null ? "Other" : fullName;
}

public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if(location.length() >= 2)
{
Matcher matcher = locationPattern.matcher(location);
if(matcher.find())
{
int start = matcher.start();
String state = location.substring(start, start + 2);
String fullName = lookupState(state.toUpperCase());
context.write(new Text(fullName), one);
}
}
}
}

/*
新api中DistributedCadhe需要通过创建符号链接的方式使用。
*/
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "UFOLocation");
String cacheFilePath = "/user/hadoop/ufo/states.txt";
Path inPath = new Path(cacheFilePath);
//#后的为符号链接
String inPathLink = inPath.toUri().toString()+"#"+MapClass.LINK_STATES_TXT;
DistributedCache.addCacheFile(new URI(inPathLink), job.getConfiguration());

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

Configuration mapAConf = new Configuration(false);
ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, mapAConf);

Configuration mapBConf = new Configuration(false);
ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class,
Text.class, LongWritable.class, mapBConf);

job.setJarByClass(ufo.UFORecordValidationMapper.class);
job.setMapperClass(ChainMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);
}
}


Context

在旧api中使用Reporter来实现计数和状态的输出,在新api中通过Context实现。

UFOCountingRecordValidationMapper.java

定义 enum LineCunters类型,通过context.getCounter得到计数器,使用计数器increment增加计数。通过context.setStatus设置状态。

package ufo;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class UFOCountingRecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
public enum LineCounters
{
BAD_LINES,
TOO_MANY_TABS,
TOO_FEW_TABS
};

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
if(validate(line, context))
context.write(key, value);
}

private boolean validate(String str, Context context)
{
String words[] = str.split("\t");
if(words.length != 6)
{
if(words.length <6)
{
Counter ct = context.getCounter(LineCounters.TOO_FEW_TABS);
ct.increment(1);
}else{
Counter ct = context.getCounter(LineCounters.TOO_MANY_TABS);
ct.increment(1);
}
Counter ct = context.getCounter(LineCounters.BAD_LINES);
ct.increment(1);

if(ct.getValue() % 10 == 0){
context.setStatus("Got 10 bad lines.");
System.err.println("Read another 10 bad lines.");
}

return false;
}else
return true;
}
}
UFOLocation3.java

复制UFOLocation2.java改名,然后修改两个地方。

ChainMapper.addMapper(job, UFOCountingRecordValidationMapper.class, LongWritable.class, Text.class,
LongWritable.class, Text.class, mapAConf);
job.setJarByClass(ufo.UFOCountingRecordValidationMapper.class);

编译和运行程序的辅助脚本

build.sh

因为使用了包ufo,所以build.sh放在当前目录,java源文件放在当前目录的下一级目录ufo。

#/bin/sh
HADOOP_LIB_DIR=/home/hadoop/cloud/hadoop/share/hadoop

rm -f ./*.class
rm -f ./ufo.jar

javac -classpath $HADOOP_LIB_DIR/common/hadoop-common-2.6.0.jar:$HADOOP_LIB_DIR/common/lib/commons-cli-1.2.jar:$HADOOP_LIB_DIR/common/lib/hadoop-annotations-2.6.0.jar:$HADOOP_LIB_DIR/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar -d . ufo/UFORecordValidationMapper.java ufo/UFOLocation.java ufo/UFOLocation2.java ufo/UFOCountingRecordValidationMapper.java  ufo/UFOLocation3.java

#package

jar -cvf ufo.jar ./ufo/*.class


run.sh

运行不同例子只要改ufo.UFOLocation 为ufo.UFOLocation2 、ufo.UFOLocation3就行了

#/bin/sh
hdfs dfs -rm -r -f /user/hadoop/ufo/out001
hadoop jar ufo.jar ufo.UFOLocation /user/hadoop/ufo/ufo.tsv /user/hadoop/ufo/out001
hdfs dfs -cat /user/hadoop/ufo/out001/part-r-00000
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: