hadoop编程入门学习笔记-4 ChainMapper、DistributedCache和Context
2015-11-17 20:30
471 查看
这是《Hadoop Beginner's Guide》 第四章的学习笔记。本章通过分析UFO sighting dataset的讲解了相关编程技巧。所需的ufo.tsv文件在书中给出的链接已经下不到了,在网上搜索文件名可以找到。书中的脚本用的是ruby,我把它改成了python。书中用的旧的api,因为装的是hadoop2.6.0,用的新api,我对程序也做相应改动。事后来看这种方式还挺有用的,迫使自己先看懂程序,对api不懂的地方就得度娘帮忙了。api说明在http://hadoop.apache.org/docs/r2.6.0/api/index.html。
wcmapper.py
wcreducer.py
summarymapper.py
用到的命令
、max、total、mean等计算。
shpemapper.py
shapetimemapper.py
UFORecordValidationMapper.java
states.txt
UFOCountingRecordValidationMapper.java
定义 enum LineCunters类型,通过context.getCounter得到计数器,使用计数器increment增加计数。通过context.setStatus设置状态。
复制UFOLocation2.java改名,然后修改两个地方。
因为使用了包ufo,所以build.sh放在当前目录,java源文件放在当前目录的下一级目录ufo。
run.sh
运行不同例子只要改ufo.UFOLocation 为ufo.UFOLocation2 、ufo.UFOLocation3就行了
ufo sighting文件
序号 | 字段 | 说明 |
1 | Sighting date | 看到UFO 的时间 |
2 | Recorded date | 记录时间 |
3 | Location | 看到UFO 的地点 |
4 | Shape | 形状,如diamond等 |
5 | Duration | 持续时间 |
6 | Description | 描述 |
流方式统计记录数和字段数
这个很简单,相当于WordCount。wcmapper.py
#!/usr/bin/python # -*- coding: utf-8 -*- """a python script for hadoop streaming map """ import sys def map(input): for line in input: line = line.strip() words = line.split() for word in words: print '%s\t%s' % (word, 1) def main(): map(sys.stdin) if __name__ == "__main__": main()
wcreducer.py
#!/usr/bin/python # -*- coding: utf-8 -*- """a python script for hadoop streaming map """ import sys def reduce(input): current_word = None current_count = 0 word = None for line in input: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: current_count += count else: if current_word: print '%s\t%s' %(current_word, current_count) current_count = count current_word = word print '%s\t%s' % (current_word, current_count) def main(): reduce(sys.stdin) if __name__ == "__main__": main()
summarymapper.py
#!/usr/bin/python # -*- coding: utf-8 -*- """a python script for hadoop streaming map """ import sys def map(input): for line in input: print "total\t1" line = line.strip() words = line.split("\t") if len(words) != 6: print "badline\t1" else: if words[0] != None: print "sighted\t1" if words[1] != None: print "recorded\t1" if words[2] != None: print "location\t1" if words[3] != None: print "shape\t1" if words[4] != None: print "duration\t1" if words[5] != None: print "description\t1" def main(): map(sys.stdin) if __name__ == "__main__": main()
用到的命令
cat ufo.tsv | ./summarymapper.py | sort | ./wcreducer.py > output.txt hadoop dfs -copyFromLocal ufo.tsv /user/hadoop/ufo/ufo.tsv hadoop jar /home/hadoop/cloud/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -file summarymapper.py -mapper summarymapper.py -file wcreducer.py -reducer wcreducer.py -input /user/hadoop/ufo/ufo.tsv -output /user/hadoop/ufo/out
按形状分组统计持续时间
第一个脚本shpemapper.py只是简单的统计形状,相当于WordCount。脚本shapetimemapper.py和shapetimemapper.py实现分组统计。相比于shpemapper.py,shapetimemapper.py多做了一件事情,第一件与shpemapper.py一样,识别形状(words[3]);第二件是用用到正则表达式提取文本(words[4]),转换成整数后累加,输出时将 print shape + "\t1" 改为 print shape + "\t" + str(time)。shapetimereducer.py中实现min、max、total、mean等计算。
shpemapper.py
#!/usr/bin/python # -*- coding: utf-8 -*- """a python script for hadoop streaming map """ import sys def map(input): for line in input: line = line.strip() words = line.split("\t") if len(words) == 6: shape = words[3].strip() if len(shape) > 0: print shape + "\t1" def main(): map(sys.stdin) if __name__ == "__main__": main()
shapetimemapper.py
#!/usr/bin/python # -*- coding: utf-8 -*- """a python script for hadoop streaming map """ import sys import re def map(input): pattern1 = re.compile(r'\d* ?((min)|(sec))') pattern2 = re.compile(r'\d*') for line in input: line = line.strip() words = line.split("\t") if len(words) == 6: shape = words[3].strip() duration = words[4].strip() if shape != None and duration != None: match = pattern1.match(duration) if match != None: time = pattern2.match(match.group()) unit = match.group(1) try: time = int(time.group()) except: #print '??? : ' + duration time = 0 if unit == 'min': time = time * 60 if len(shape) > 0: print shape + '\t' + str(time) def main(): map(sys.stdin) if __name__ == "__main__": main()shapetimereducer.py
#!/usr/bin/python # -*- coding: utf-8 -*- """a python script for hadoop streaming map """ import sys import re def reduce(input): current = None minv = 0 maxv = 0 mean = 0 total = 0 count = 0 for line in input: line = line.strip() word, time = line.split('\t') time = int(time) if word == current: count += 1 total += time if time < minv: minv = time if time > maxv: maxv = time else: if current != None: print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count)) current = word count = 1 total = time minv = time maxv = time print current + '\t' + str(minv) +' ' + str(maxv) + ' ' + str((total/count)) def main(): reduce(sys.stdin) if __name__ == "__main__": main()
ChainMapper
用链的方式把两个Mapper串起来,有点像是servlet里的Filter。示例中用了两个Mapper,第一个验证记录的有效性,第二个对Location进行计数。使用ChainMapper.addMapper 方法添加Mapper。注意job.setJarByClass(ufo.UFORecordValidationMapper.class);否则会报类找不到, 要注意的是新旧api的使用有差异,我在程序中进行了注释。UFORecordValidationMapper.java
package ufo; import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; /* 旧api,继承类org.apache.hadoop.mapred.MapReduceBase,然后实现接口org.apache.hadoop.mapred.Mapper<K1, V1, K2, V2>。 新api,继承类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>。 旧api,map方法的第三、四个形参分别是OutputCollector和Reporter类。 新api,map方法的第三个参数是Context类,新api的Context把两个类的功能合并到一起。 */ public class UFORecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if(validate(line)) context.write(key, value); } private boolean validate(String str) { String words[] = str.split("\t"); if(words.length != 6) return false; else return true; } }UFOLocation.java
package ufo; import java.io.*; import java.util.Iterator; import java.util.regex.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.chain.*; import org.apache.hadoop.mapreduce.lib.reduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UFOLocation { public static class MapClass extends Mapper<Object, Text, Text, LongWritable> { private final static LongWritable one = new LongWritable(1); private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$"); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String location = fields[2].trim(); if(location.length() >= 2) { Matcher matcher = locationPattern.matcher(location); if(matcher.find()) { int start = matcher.start(); String state = location.substring(start, start + 2); context.write(new Text(state.toUpperCase()), one); } } } } /* 新api中,驱动代码通过org.apache.hadoop.mapreduce.Job类实现,通过该类管理各种配置,然后调用waitForCompletion(boolean)方法把代码提交给JobTracker执行。 旧api中,驱动代码通过org.apache.hadoop.mapred.JobConf(Configuration, Class)类实现,通过该类管理各种配置;通过org.apache.hadoop.mapred.JobClient类的runJob(JobConf)方法实现job的提交。 */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "UFOLocation"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); Configuration mapAConf = new Configuration(false); ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, mapAConf); Configuration mapBConf = new Configuration(false); ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, mapBConf); job.setJarByClass(ufo.UFORecordValidationMapper.class); job.setMapperClass(ChainMapper.class); job.setCombinerClass(LongSumReducer.class); job.setReducerClass(LongSumReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
DistributedCache
通过DistributedCache可以共享文件,共享文件分两步实现,第一步是在main中用DistributedCache.addCacheFile添加文件,第二步是在Mapper的获得这个文件,比如Mapper的setup方法将文件读入到一个HashMap。在实例中创建了一个states.txt文件作为州名的简称到全称的转换。states.txt
AL Alabama AK Alaska AZ Arizona AR Arkansas CA CaliforniaUFOLocation2.java
package ufo; import java.io.*; import java.net.*; import java.util.*; import java.util.regex.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.chain.*; import org.apache.hadoop.mapreduce.lib.reduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.filecache.DistributedCache; public class UFOLocation2 { public static class MapClass extends Mapper<Object, Text, Text, LongWritable> { public final static String LINK_STATES_TXT = "__Link_statestxt__"; private final static LongWritable one = new LongWritable(1); private static Pattern locationPattern = Pattern.compile("[a-zA-z]{2}[^a-zA-z]*$"); private Map<String, String> stateNames; @Override public void setup(Context text) throws IOException, InterruptedException { try { setupStateMap(); }catch (IOException e){ System.err.println("Error reading state file."); System.exit(1); } } private void setupStateMap() throws IOException { Map<String, String> states = new HashMap<String, String>(); BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(MapClass.LINK_STATES_TXT))); String line = reader.readLine(); while(line != null) { String[] split = line.split("\t"); states.put(split[0], split[1]); line = reader.readLine(); } stateNames = states; } private String lookupState(String state) { String fullName = stateNames.get(state); return fullName == null ? "Other" : fullName; } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String location = fields[2].trim(); if(location.length() >= 2) { Matcher matcher = locationPattern.matcher(location); if(matcher.find()) { int start = matcher.start(); String state = location.substring(start, start + 2); String fullName = lookupState(state.toUpperCase()); context.write(new Text(fullName), one); } } } } /* 新api中DistributedCadhe需要通过创建符号链接的方式使用。 */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "UFOLocation"); String cacheFilePath = "/user/hadoop/ufo/states.txt"; Path inPath = new Path(cacheFilePath); //#后的为符号链接 String inPathLink = inPath.toUri().toString()+"#"+MapClass.LINK_STATES_TXT; DistributedCache.addCacheFile(new URI(inPathLink), job.getConfiguration()); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); Configuration mapAConf = new Configuration(false); ChainMapper.addMapper(job, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, mapAConf); Configuration mapBConf = new Configuration(false); ChainMapper.addMapper(job, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, mapBConf); job.setJarByClass(ufo.UFORecordValidationMapper.class); job.setMapperClass(ChainMapper.class); job.setCombinerClass(LongSumReducer.class); job.setReducerClass(LongSumReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
Context
在旧api中使用Reporter来实现计数和状态的输出,在新api中通过Context实现。UFOCountingRecordValidationMapper.java
定义 enum LineCunters类型,通过context.getCounter得到计数器,使用计数器increment增加计数。通过context.setStatus设置状态。
package ufo; import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; public class UFOCountingRecordValidationMapper extends Mapper<LongWritable, Text, LongWritable, Text> { public enum LineCounters { BAD_LINES, TOO_MANY_TABS, TOO_FEW_TABS }; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if(validate(line, context)) context.write(key, value); } private boolean validate(String str, Context context) { String words[] = str.split("\t"); if(words.length != 6) { if(words.length <6) { Counter ct = context.getCounter(LineCounters.TOO_FEW_TABS); ct.increment(1); }else{ Counter ct = context.getCounter(LineCounters.TOO_MANY_TABS); ct.increment(1); } Counter ct = context.getCounter(LineCounters.BAD_LINES); ct.increment(1); if(ct.getValue() % 10 == 0){ context.setStatus("Got 10 bad lines."); System.err.println("Read another 10 bad lines."); } return false; }else return true; } }UFOLocation3.java
复制UFOLocation2.java改名,然后修改两个地方。
ChainMapper.addMapper(job, UFOCountingRecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, mapAConf);
job.setJarByClass(ufo.UFOCountingRecordValidationMapper.class);
编译和运行程序的辅助脚本
build.sh因为使用了包ufo,所以build.sh放在当前目录,java源文件放在当前目录的下一级目录ufo。
#/bin/sh HADOOP_LIB_DIR=/home/hadoop/cloud/hadoop/share/hadoop rm -f ./*.class rm -f ./ufo.jar javac -classpath $HADOOP_LIB_DIR/common/hadoop-common-2.6.0.jar:$HADOOP_LIB_DIR/common/lib/commons-cli-1.2.jar:$HADOOP_LIB_DIR/common/lib/hadoop-annotations-2.6.0.jar:$HADOOP_LIB_DIR/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar -d . ufo/UFORecordValidationMapper.java ufo/UFOLocation.java ufo/UFOLocation2.java ufo/UFOCountingRecordValidationMapper.java ufo/UFOLocation3.java #package jar -cvf ufo.jar ./ufo/*.class
run.sh
运行不同例子只要改ufo.UFOLocation 为ufo.UFOLocation2 、ufo.UFOLocation3就行了
#/bin/sh hdfs dfs -rm -r -f /user/hadoop/ufo/out001 hadoop jar ufo.jar ufo.UFOLocation /user/hadoop/ufo/ufo.tsv /user/hadoop/ufo/out001 hdfs dfs -cat /user/hadoop/ufo/out001/part-r-00000
相关文章推荐
- cocosPods 遇到的问题
- Android中DisplayMetrics 获取手机屏幕分辨率
- 史上最全的常用iOS的第三方框架
- 《第一行代码--Android》读书笔记之日志工具Log与Activity
- iOS对于copy的原理以及常见的使用场景
- Android基础入门教程——8.4.3 Android动画合集之属性动画-初见
- android101 获取、备份、插入短信
- iOS中:最完善的:打电话和发短信
- 《第一行代码--Android》读书笔记之前言
- swift学习笔记->结构与枚举
- swift函数格式
- Android 内部存储,外部存储使用范围和介绍
- Swift关于Array的探讨
- 构造函数语意学(inside the c++ object model)
- Android Studio之版本管理工具Git的使用
- Android 实现Json数据解析,并进行应用!
- Android 实现Json数据解析,并进行应用!
- IOS之Foundation--plist简说
- Android dalvik GC相关的属性详解
- Unity Application.persistentDataPath 空值