Hadoop MapReduce学习笔记 SQL查询
2016-09-11 16:12
323 查看
序言
IT软件架构从单机、小规模分布式一直发展到以虚拟化为基础的云化部署形态,技术更新日新月异,所以我们的产品正向云化架构转型,即用廉价X86服务器群获得超强的计算能力且可靠性可接受,其中必然绕不过Apache Hadoop体系的大数据技术栈,于是就有了这篇笔记。近两年也零零星星的接触过大数据技术,去年在Spark上做了一个月左右的原型验证,效果不甚理想,于是深感这方面基础知识不扎实,有必要体系化的学习一下,于是在最近一个月的晚上没怎么加班,在家里从鼻祖HDFS和MapReduce学起,符合预期的踩了几个雷,而且部分雷此前很少被人写过,共享出来增强记忆也方便同行。
地雷:我认为应该这么用,结果必须那么用才能正常工作,均用地雷二字强调,最后一个雷甚至耗掉了我不下三个夜晚的宝贵时光才定位出来。
基础操作
这些操作频繁使用,先摆出来使得在后续章节不重复描述1,如何安装HDFS和YARN(MapReduce二代)环境,照着说明书来即可,我用的是单机为分布式,注意,那个ssh证书的步骤是必须的
官方指导
http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html
启动和停止,分别都要输入三次用户密码来启停namenode、datanode、secondnamenode
sbin/start-dfs.sh
sbin/stop-dfs.sh
示例
zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ sbin/start-dfs.sh Starting namenodes on [localhost] zhangshehui@localhost's password: localhost: starting namenode, logging to /home/zhangshehui/workspace/hadoop/hadoop-2.6.4/logs/hadoop-zhangshehui-namenode-DELL.out zhangshehui@localhost's password: localhost: starting datanode, logging to /home/zhangshehui/workspace/hadoop/hadoop-2.6.4/logs/hadoop-zhangshehui-datanode-DELL.out Starting secondary namenodes [0.0.0.0] zhangshehui@0.0.0.0's password: 0.0.0.0: starting secondarynamenode, logging to /home/zhangshehui/workspace/hadoop/hadoop-2.6.4/logs/hadoop-zhangshehui-secondarynamenode-DELL.out
地雷:HDFS的各种文件默认放置到操作系统的tmp目录下了,重启一下就丢失导致必须重新format来初始化,处置方法则是修改这些目录到正常点儿的目录,配置项分别是
etc/hadoop/core-site.xml中的hadoop.tmp.dir放临时文件
etc/hadoop/hdfs-site.xml中的dfs.namenode.name.dir、dfs.namenode.checkpoint.dir、dfs.datanode.data.dir分别是文件名数据库、文件操作日志、文件存放路径
2,如何查看HDFS上的文件夹,删除它们,下面命令中xxx就是具体路径了,路径规则是unix风格,从根目录/开始
bin/hdfs dfs -ls /xxx
bin/hdfs dfs -rm /xxx
bin/hdfs dfs -rm -R -f /xxx
3,如何下载HDFS上的文件到本地目录
bin/hdfs dfs -get /xxx
4,MapReduce输出到HDFS中的文件不是我们期望的单个大文件,而是每个reducer分别生成一个以part开头的小文件,我们要自己合并。如果你只是下载到本地目录看则用下面的命令即可在下载的同时合并它们,否则还要再写一个只有单个Reducer的MapReduce任务把它们合并为另一个HDFS文件,仍然是part开头,然后再改名。想想还算合理,如果一群reducer最后写文件还要排队肯定会大幅影响效率。
bin/hdfs dfs -getmerge /xxx/part* localfilename
5,如何提交MapReduce任务
bin/hadoop jar X.jar MainClassFullName MainMethodParm1 MainMethodParm2 ...
每次执行完还会给个报表记录各步骤的输入输出数据量,对定位问
4000
题很有帮助
File System Counters FILE: Number of bytes read=3998984 FILE: Number of bytes written=6549132 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=12075444 HDFS: Number of bytes written=15798 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=100000 Map output records=100000 Map output bytes=1789000 Map output materialized bytes=1989006 Input split bytes=108 Combine input records=0 Combine output records=0 Reduce input groups=1000 Reduce shuffle bytes=1989006 Reduce input records=100000 Reduce output records=1000 Spilled Records=200000 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=15 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=601882624 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=6037722 File Output Format Counters Bytes Written=15798
第0个版本,准备桩数据
WordCount是官方示例,但逻辑太过简单了,我打算做一个类数据库查询的能力,以csv格式存储二维的数据,然后执行select a,b,max(c),min(d),avg(e),sum(f) where g=8and h=10这类准SQL查询。
此程序用于生成csv文件来支撑测试,接收文件路径、行数为main函数参数,一个循环随机生成数据到HDFS上去。
package org.zsh.hdfs.test; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class StubDataGenerator { public static void generateData(String strPath, final int size) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(strPath); OutputStream out = null; if(fs.exists(path)){ out = fs.append(path, 4); }else{ out = fs.create(path); } PrintWriter writer = new PrintWriter(new OutputStreamWriter(out)); for(int i=0, progress=-1; i progress){ progress = newProgress; System.out.print(progress + "% "); } } System.out.println(); writer.close(); out.close(); } public static String getLine(int i){ // city, streat, company, service, time, indicatorDelay, indicatorJitter, indicatorLostRatio int city =i/10000; int streat = (i%10000)/1000; int company = (i%1000)/100; int service = (i%100)/10; int indicatorDelay = (int)(100*Math.random()); int indicatorJitter = (int)(100*Math.random()); int indicatorLostRatio = (int)(100*Math.random()); final StringBuilder buffer = new StringBuilder(); buffer.append("city"); buffer.append(city); buffer.append(","); buffer.append("streat"); buffer.append(streat); buffer.append(","); buffer.append("company"); buffer.append(company); buffer.append(","); buffer.append("service"); buffer.append(service); buffer.append(","); buffer.append(System.currentTimeMillis()); buffer.append(","); buffer.append(indicatorDelay); buffer.append(","); buffer.append(indicatorJitter); buffer.append(","); buffer.append(indicatorLostRatio); return buffer.toString(); } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { generateData(args[0], Integer.valueOf(args[1])); } }
编译成StubDataGenerator.jar,执行如下命令即可生成文件
bin/hadoop jar StubDataGenerator.jar org.zsh.hdfs.test.StubDataGenerator /test/data/rawdata_10000.txt 10000
而后确认文件是否真的生成了
bin/hdfs dfs -ls /test/data
还可以下载下来仔细确认下内容是否正确
bin/hdfs dfs -get /test/data/rawdata_10000.txt
less rawdata_10000.txt
city0,streat0,company0,service0,1473085113700,74,98,0
city0,streat0,company0,service0,1473085113701,65,61,44
city0,streat0,company0,service0,1473085113701,43,45,49
city0,streat0,company0,service0,1473085113701,73,59,33
city0,streat0,company0,service0,1473085113701,79,4,27
city0,streat0,company0,service0,1473085113701,15,10,69
city0,streat0,company0,service0,1473085113701,4,3,42
city0,streat0,company0,service0,1473085113701,97,3,52
city0,streat0,company0,service0,1473085113701,15,23,43
city0,streat0,company0,service0,1473085113701,92,11,43
city0,streat0,company0,service1,1473085113701,47,22,70
city0,streat0,company0,service1,1473085113701,73,99,22
city0,streat0,company0,service1,1473085113701,74,29,42
city0,streat0,company0,service1,1473085113702,40,86,92
city0,streat0,company0,service1,1473085113702,9,79,79
...
第1个版本,按第1列分组后求第6列的平均值
1, 首先用main函数启动一切,在Hadoop的术语里叫Driverpackage org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SQLDriver { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sql"); job.setJarByClass(SQLDriver.class); job.setMapperClass(SQLMapper.class); job.setReducerClass(SQLReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
这里的代码基本都是copy apache官方文档的,后面才慢慢弄明白:
a,Job是一切的核心,设置一堆参数后用waitForCompletion方法启动它;
b,MapperClass逐行的把输入转换为方便后续计算的数据结构;
c,ReducerClass依据上述结构生成最终结果;
d,FileInputFormat的功能有些杂,上面那两个方法的参数之一是job,该静态方法最后把参数给设置到job的conf里面了。它的主要功能是读取HDFS上的输入、分片、写最后的输出到HDFS。通过简单走读代码发现,MapReduce的输入输出不一定是HDFS,甚至都不一定是文件,而文件方式仅是最常用的默认实现,而且“挪软件优先于挪数据”的逻辑也在输入分片的结构体中一起返回。
2,Mapper。把输入的每个value字符串用逗号分割,然后取GROUBY_COLUMN_INDEX列为key,取AGG_COLUMN_INDEX列为value,然后输出key-value对儿
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SQLMapper extends Mapper
这里有几个事情值得注意
a,自定义类是Mapper的子类,而Mapper竟然不是Interface,而是个Class,窃以为不妥;
b,Object, Text, Text, DoubleWritable分别是输入key,输入value,输出key,输出value,所以MapReduce从头到尾都是以Key-Value对儿为基本数据结构的,这个是硬性约束,如果你的业务逻辑不是这样,那想办法转换成这种工作模式,或换个框架;
c,有一行输入后你可以根本不输出或输出多行,进而可以做到过滤和行拆分;
d,key和value都不是基本数据类型,如int、String等,一个个“似乎”都要用Hadoop自己定义的数据类型,自定义则要实现相应接口。
3,Reducer。这个逻辑是求平均值,框架会把相同key的value聚在一起形成一个iterator供我使用,我拿来遍历一下算出行数count和总和sum,而后用sum/count得出平均值,用输入key和平均值作为输出。
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SQLReducer extends Reducer { private DoubleWritable result = new DoubleWritable(); protected void reduce(Text inputKey, Iterable inputValueItr, Context context) throws IOException, InterruptedException { int count =0; double sum = 0; for(DoubleWritable inputIntWritable : inputValueItr){ sum+=inputIntWritable.get(); count++; } if(count > 0){ result.set(sum/count); context.write(inputKey, result); } } }
4,常量定义文件,无关紧要了
package org.zsh.mr.test.sql.groupby; public class SQLConst { public static final String COLUMN_SEPERATOR = ","; public static final int GROUBY_COLUMN_INDEX=1; public static final int AGG_COLUMN_INDEX=6; }
5,编译打包为groupbysql_v1_simpleAllDouble.jar,运行
bin/hdfs dfs -rm -R -f /test/data/output1
bin/hadoop jar ../testlib/groupbysql_v1_simpleAllDouble.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt /test/data/output1
zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -ls /test/data/output1
Found 2 items
-rw-r--r-- 1 zhangshehui supergroup 0 2016-09-10 00:35 /test/data/output1/_SUCCESS
-rw-r--r-- 1 zhangshehui supergroup 148 2016-09-10 00:35 /test/data/output1/part-r-00000
zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -cat /test/data/output1/part-r-00000
streat0 49.393
streat1 49.485
streat2 50.448
streat3 48.694
streat4 48.87
streat5 50.446
streat6 49.081
streat7 49.144
streat8 48.76
streat9 50.979
输入是在0到100范围内取随机数,所以均值在50上下符合预期,逻辑正确。
第2个版本,让输入输出的数据类型不同
版本1中功能正常了,但有一个小事如鲠在喉,我的输入其实是int,而输出是double,如果map的输出和Reduce的输入都选择IntWritable它会提示我说期望DoubleWritable,那就意味着一旦在job里设置了job.setOutputValueClass(DoubleWritable.class);
就必须在map和reduce里永远只输出这一个类型了呢?如此则限制也太大了。。
经度娘搜索,我发现还要在driver里再设置一个参数才行!
job.setMapOutputValueClass(IntWritable.class);
于是乎三个主要的类就变成了下面这样
1,driver里额外设置mapper的输出类型
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SQLDriver { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sql"); job.setJarByClass(SQLDriver.class); job.setMapperClass(SQLMapper.class); // job.setMapOutputKeyClass(Object.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(SQLReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion 1bb8c (true) ? 0 : 1); } }
2,mapper的输出value改为IntWritable
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SQLMapper extends Mapper
3,Reducer的输入value改为IntWritable
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SQLReducer extends Reducer { private DoubleWritable result = new DoubleWritable(); protected void reduce(Text inputKey, Iterable inputValueItr, Context context) throws IOException, InterruptedException { int count =0; double sum = 0; for(IntWritable inputIntWritable : inputValueItr){ sum+=inputIntWritable.get(); count++; } if(count > 0){ result.set(sum/count); context.write(inputKey, result); } } }
4,常量定义还放在这里
package org.zsh.mr.test.sql.groupby; public class SQLConst { public static final String COLUMN_SEPERATOR = ","; public static final int GROUBY_COLUMN_INDEX=1; public static final int AGG_COLUMN_INDEX=6; }
编译打包为groupbysql_v2_IntToDouble.jar并运行
bin/hdfs dfs -rm -R -f /test/data/output2
bin/hadoop jar ../testlib/groupbysql_v2_IntToDouble.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt /test/data/output2
zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -ls /test/data/output2
Found 2 items
-rw-r--r-- 1 zhangshehui supergroup 0 2016-09-10 00:53 /test/data/output2/_SUCCESS
-rw-r--r-- 1 zhangshehui supergroup 148 2016-09-10 00:53 /test/data/output2/part-r-00000
zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -cat /test/data/output2/part-r-00000
streat0 49.393
streat1 49.485
streat2 50.448
streat3 48.694
streat4 48.87
streat5 50.446
streat6 49.081
streat7 49.144
streat8 48.76
streat9 50.979
结果与版本1完全相同,逻辑正确。
地雷:强制要求设置Mapper的输出类型,否则报错,但从原理上看是不必要的这行设置是完全不必要的,一个job只能有一个map和一个reduce,它们的输入输出都是可以用反射发现上来的,为什么非要去设置?就像必须出示老年证才能免费乘车一样可笑!
第3个版本,让Mapper传递自定义对象给Reducer用
这个版本3是为版本4做准备和探路的,这要从mapreduce的数据分发说起:a,典型流程:input-split-map-shuffer-reduce-ouput;
b,shuffer是按key分类的,通过网络传输把一个key的所有行分组汇聚到一个另服务器;
c,如果A服务器有1G原始数据要送到M服务器,YARN允许我们在A服务器先汇聚一遍,减少数据量后(比如100M或10M)再送到M服务器,进而大幅减少网络负载,而这个机制就是Combiner;
这个又和自定义Value类型有什么关系呢?因为这个世界不是什么算法都能够分两级或多级汇聚的,比如统计一个省所有人的最大年龄,可以先让各个地市先统计出来个最大年龄,省再取地市最大年龄的最大年龄,这个情况OK;如果统计平均年龄呢,你把三千万人口地市平均年龄和100万人口地市平均年龄放到一起再平均则会得出荒谬的结论。
有一个办法,每个地市分别统计人口总数和年龄总和,省则再现算出全省的人口总和和年龄总和,再用年龄总和除以人口总和就得出准确的平均年龄了。这么干则地市向省政府上报的数据就不再是一个简单的数字,而是两个,考虑到其他无法穷举的计算公式则上报的内容格式应该允许自定义才行。
我们再看Mapper和Reducer的接口定义,是泛型,什么信息也没给
org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
那再看看默认的例子,IntWritable、Text,它们无一例外的都实现了接口Writable,有些还实现了Comparable,结合此前看的网帖,Key是要用来排序的,Key和Value都要在节点间传输,那么Writable用于序列化,而Comparable用于把相同Key的行归类聚集。
开干
1,先定义那个保存行数和总和的avg中间数据结构如下,要支持就是read自己和write自己到流,保证读写顺序一致即可,而那个静态方法我也不知道hadoop什么时候用,文档里这么建议的,不写也能运行。
package org.zsh.mr.test.sql.groupby; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class AvgStatWritable implements Writable { private int count; private double sum; @Override public void write(DataOutput out) throws IOException { out.writeInt(count); out.writeDouble(sum); } @Override public void readFields(DataInput in) throws IOException { this.count = in.readInt(); this.sum = in.readDouble(); } public static AvgStatWritable read(DataInput in) throws IOException { AvgStatWritable w = new AvgStatWritable(); w.readFields(in); return w; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public double getSum() { return sum; } public void setSum(double sum) { this.sum = sum; } }
2,在Driver里声明Map的输出为自定义的这个类型,不然人家就报错。。。
job.setMapOutputValueClass(AvgStatWritable.class);
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SQLDriver { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sql"); job.setJarByClass(SQLDriver.class); job.setMapperClass(SQLMapper.class); // job.setMapOutputKeyClass(Object.class); job.setMapOutputValueClass(AvgStatWritable.class); job.setReducerClass(SQLReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3,Mapper的输出改用新类型
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SQLMapper extends Mapper
4, Reducer的输入改用新类型,当然统计方法也要跟着变
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SQLReducer extends Reducer { private DoubleWritable result = new DoubleWritable(); protected void reduce(Text inputKey, Iterable inputValueItr, Context context) throws IOException, InterruptedException { int count = 0; double sum = 0; for (AvgStatWritable avgStatWritable : inputValueItr) { sum += avgStatWritable.getSum(); count += avgStatWritable.getCount(); } if (count > 0) { result.set(sum / count); context.write(inputKey, result); } } }
5,把const放在这里保持完整性
package org.zsh.mr.test.sql.groupby; public class SQLConst { public static final String COLUMN_SEPERATOR = ","; public static final int GROUBY_COLUMN_INDEX=1; public static final int AGG_COLUMN_INDEX=6; }
6,打包成groupbysql_v3_MiddleAsObject.jar运行,结果完全一样,不再重复贴了
第4个版本,增加Combiner步骤来提升效率
准备工作已就绪,增加Combiner来减少节点间数据量传输1,编写一个新的Reduce类,其输入输出类型相同,且和Mapper的输出相同。其逻辑是cout相加,sum也相加,如此可以保证最终的结果不受影响。
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SQLCombiner extends Reducer { @Override protected void reduce(Text inputKey, Iterable inputValues, Context context) throws IOException, InterruptedException { int count = 0; double sum = 0; for (AvgStatWritable value : inputValues) { count += value.getCount(); sum += value.getSum(); } AvgStatWritable outputValue = new AvgStatWritable(); outputValue.setCount(count); outputValue.setSum(sum); context.write(inputKey, outputValue); } }
2,在Driver里声明有这么一个步骤要做,
job.setCombinerClass(SQLCombiner.class);
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SQLDriver { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sql"); job.setJarByClass(SQLDriver.class); job.setMapperClass(SQLMapper.class); // job.setMapOutputKeyClass(Object.class); job.setMapOutputValueClass(AvgStatWritable.class); job.setCombinerClass(SQLCombiner.class); job.setReducerClass(SQLReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3,打包成groupbysql_v4_MiddleAsObject-Combiner.jar运行,结果完全一致,但运行报告的combiner会有些变数,它把行数大幅缩小后reducer几乎无压力了,Reduce
shuffle bytes从第一个版本的1989006字节降低到现在的226字节,在真实的分布式环境下能降低网络IO近两万倍,而这正是我们所预期的。
File System Counters FILE: Number of bytes read=28072 FILE: Number of bytes written=588528 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1094022 HDFS: Number of bytes written=148 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=10000 Map output records=10000 Map output bytes=200000 Map output materialized bytes=226 Input split bytes=114 Combine input records=10000 //变化点 Combine output records=10 //变化点 Reduce input groups=10 Reduce shuffle bytes=226 Reduce input records=10 Reduce output records=10 Spilled Records=20 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=16 CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 Total committed heap usage (bytes)=543162368 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=547011 File Output Format Counters Bytes Written=148
第5个版本,动态指定分组的列
版本5是为版本6探路的。上面的程序有几个明显缺陷:a,用哪一列做key去group by是写死的,用途太局限;
b,用哪一列做max/min/avg/sum也是写死的,同上;
c,key和value都仅支持一列,远没有数据库的select
group by能力用着爽。
本章节先把动态传参问题解决了,下一章节解决多列问题。Driver里的没main函数可以一如既往的接收参数,但程序是分布式运行的,而且分布式进程都是YARN框架拉起来的,我只告诉了YARN用什么类,怎么传参数过去呢?
a,创建job时有个Configuration参数,而Configuration可以set(String
key, String value)
b,Mapper、Combiner、Reducer都可以重载setup和cleanup方法,这两个方法会传入Configuration参数,于是参数就可以分布式的传递了,感谢YARN想的周到
1,在Driver里Main函数的参数设置到Configuration里去,不过这个sql参数当前仅仅是个数字而已
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SQLDriver { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set(SQLConst.PARM_SQL, args[1]); Job job = Job.getInstance(conf, "sql"); job.setJarByClass(SQLDriver.class); job.setMapperClass(SQLMapper.class); // job.setMapOutputKeyClass(Object.class); job.setMapOutputValueClass(AvgStatWritable.class); job.setCombinerClass(SQLCombiner.class); job.setReducerClass(SQLReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
2,在Map里用它,而Combiner和Reducer则完全无视本次变更
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SQLMapper extends Mapper
3,增加了一个常量做KEY用了
package org.zsh.mr.test.sql.groupby; public class SQLConst { public static final String COLUMN_SEPERATOR = ","; public static final int GROUBY_COLUMN_INDEX=1; public static final int AGG_COLUMN_INDEX=6; public static final String PARM_SQL = "zsh.sql.parmsql"; }
3,编译打包成groupbysql_v5_DynamicSQL.jar,运行如下
bin/hdfs dfs -rm -R -f /test/data/output5
bin/hadoop jar ../testlib/groupbysql_v5_DynamicSQL.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt 3 /test/data/output5
bin/hdfs dfs -cat /test/data/output5/part-r-00000
company0 49.821
company1 49.569
company2 50.574
company3 49.518
company4 48.809
company5 48.219
company6 49.116
company7 49.328
company8 49.539
company9 50.807
bin/hdfs dfs -rm -R -f /test/data/output5
bin/hadoop jar ../testlib/groupbysql_v5_DynamicSQL.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt 4 /test/data/output5
bin/hdfs dfs -cat /test/data/output5/part-r-00000
service0 49.625
service1 50.058
service2 48.916
service3 47.761
service4 52.22
service5 49.27
service6 49.535
service7 47.788
service8 50.317
service9 49.81
bin/hdfs dfs -rm -R -f /test/data/output5
bin/hadoop jar ../testlib/groupbysql_v5_DynamicSQL.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt 2 /test/data/output5
bin/hdfs dfs -cat /test/data/output5/part-r-00000
streat0 49.393
streat1 49.485
streat2 50.448
streat3 48.694
streat4 48.87
streat5 50.446
streat6 49.081
streat7 49.144
streat8 48.76
streat9 50.979
看streat这列时数值与以前一样,逻辑正常
第6个版本,全动态指定分组列和汇聚计算列
分布式传参成功了,那我们把它发挥到极致,动态传入一个类SQL的语句,比如“select 1,3,max(5),sum(6),avg(7) where 0=city8”,约定:a,select子句出现的所有数字都是csv文件中的列号,从0开始计算;
b,where条件中暂且只支持and分割的等号表达式,等号前面的数字是列号,后面是比对参数;
c,函数暂且只支持的max,min,sum,avg;
d,直接输出的列、汇聚计算的列、过滤条件列均支持多列组合。
写这份代码前我还在犹豫有没有必要为测试代码搞这么复杂,因为核心理念已经测试完了,而这个功能的大部分代码与Mapreduce不怎么相关,结果还真踩到一个大雷,把自己炸的翻了两个空翻:)收获颇丰。
1,Driver里面直接把sql语句作为参数传给所有Mapper、Combiner和Reducer,而且从Mapper传给Reducer的Key和Value都是自定义对象
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.zsh.mr.test.sql.groupby.model.RowKey; import org.zsh.mr.test.sql.groupby.model.RowValue; public class SQLDriver { /** * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set(SQLConst.PARM_SQL, args[1]); Job job = Job.getInstance(conf, "sql"); job.setJarByClass(SQLDriver.class); job.setMapperClass(SQLMapper.class); job.setMapOutputKeyClass(RowKey.class); job.setMapOutputValueClass(RowValue.class); job.setCombinerClass(SQLCombiner.class); job.setReducerClass(SQLReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
注意这里的三行关键差异化语句
conf.set(SQLConst.PARM_SQL, args[1]);
job.setMapOutputValueClass(RowValue.class);
job.setMapOutputValueClass(RowValue.class);
2,在Mapper中解析SQL,依据SQL过滤行,使用用户选择的列拼装Key,再用用户选择的列拼装Value
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.zsh.mr.test.sql.groupby.model.RowKey; import org.zsh.mr.test.sql.groupby.model.RowValue; public class SQLMapper extends Mapper
SQL的复杂性都封装在SQLParser中了,在mapper层面只看到Key和Value,sqlParser.isMatchCondition(columns)用来过滤行,若不满足条件则返回空RowValue,后续增加复杂where能力也只需要修改SQLParser
3,在Combiner中对Value做预汇聚,减轻网络压力
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.mapreduce.Reducer; import org.zsh.mr.test.sql.groupby.model.RowKey; import org.zsh.mr.test.sql.groupby.model.RowValue; public class SQLCombiner extends Reducer { private SQLParser sqlParser = null; @Override protected void setup(Context context) throws IOException, InterruptedException { this.sqlParser = new SQLParser(context.getConfiguration().get( SQLConst.PARM_SQL)); } @Override protected void reduce(RowKey inputKey, Iterable inputValueItr, Context context) throws IOException, InterruptedException { final RowValue resultRowValue = this.sqlParser.getEmptyRowValue(); for (RowValue rowValue : inputValueItr) { resultRowValue.combine(rowValue); } context.write(inputKey, resultRowValue); } }
我把RowValue设计成两者之间可以直接Combine,如此则可以保持顶层足够简单明了。
地雷:重大地雷,我们在这里拿inputValueItr遍历出来的对象其实仅仅是同一个对象,不过每次迭代时的值不同而已!!!重复一遍,这个for循环第0次循环和第N次循环得到的rowValue是同一个对象,但它的值不同!!!
我原本的逻辑是这样的,即拿第一行combine后续的所有行,最后返回第一行,如此我在顶层就不用理解Row构造的复杂性了,结果resultRowValue一个对象被反复的调用read方法初始化,且反复的combine自己,导致结果完全错误,后面在RowValue类里再展开讲一遍。
RowValue resultRowValue = null; for (RowValue rowValue : inputValueItr) { if(null == resultRowValue ){ resultRowValue = resultRowValue ; }else{ resultRowValue.combine(rowValue); } } context.write(inputKey, resultRowValue);
4,在Reducer中做最后一步汇聚并输出
package org.zsh.mr.test.sql.groupby; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.zsh.mr.test.sql.groupby.model.RowKey; import org.zsh.mr.test.sql.groupby.model.RowValue; public class SQLReducer extends Reducer { private SQLParser sqlParser = null; @Override protected void setup(Context context) throws IOException, InterruptedException { this.sqlParser = new SQLParser(context.getConfiguration().get( SQLConst.PARM_SQL)); } protected void reduce(RowKey inputKey, Iterable inputValueItr, Context context) throws IOException, InterruptedException { final RowValue resultRowValue = this.sqlParser.getEmptyRowValue(); for (RowValue rowValue : inputValueItr) { resultRowValue.combine(rowValue); } // format column as sql speicifed sequence Text resultKey = new Text(inputKey.getOuputString()); Text resultValue = new Text(resultRowValue.getOuputString()); context.write(resultKey, resultValue); } }
这里踩过的雷和Combiner完全一样,一起定位和修改的
5,展开查看SQL怎么解析的,以及怎么从csv行生成Key和Value的。这里大概有四个过程要简单抽象描述下:
a,SQL解析阶段,把select列和where列都拆分出来,并且识别出来select列是直接输出还是要先汇聚再输出,该信息存储在columnInfoList和whereConditions中;
b,通过输入行生成RowKey,遍历columnInfoList找出需要直接输出的列集合,直接塞到RowKey中使用;
c,通过输入行生成RowValue,遍历columnInfoList找出先汇聚再输出的列,读出对应的输入列,构造汇聚方法对应的中间对象,放入RowValue中;
d,isMatchCondition用来确认输入行是否符合过滤条件要求,被上层mapper直接调用。
package org.zsh.mr.test.sql.groupby; import java.util.ArrayList; import java.util.List; import org.zsh.mr.test.sql.groupby.model.AvgColumn; import org.zsh.mr.test.sql.groupby.model.MaxColumn; import org.zsh.mr.test.sql.groupby.model.MinColumn; import org.zsh.mr.test.sql.groupby.model.RowColumn; import org.zsh.mr.test.sql.groupby.model.RowKey; import org.zsh.mr.test.sql.groupby.model.RowValue; import org.zsh.mr.test.sql.groupby.model.SumColumn; public class SQLParser { private static final String TOKEN_SELECT = "select"; private static final String TOKEN_WHERE = "where"; private static final String TOKEN_AND = "and"; private static final String TOKEN_EQUAL = "="; private static final String TOKEN_SPLIT = ","; private String sql; public List columnInfoList;// = new ArrayList(); private List whereConditions;// = new // ArrayList(); private int maxColumnIndex = 0; public SQLParser(String sqlParm) { this.sql = sqlParm.toLowerCase().trim(); this.columnInfoList = new ArrayList(); this.whereConditions = new ArrayList(); // substring of output columns and where conditions int selectIndex = this.sql.indexOf(TOKEN_SELECT); if (selectIndex < 0) { selectIndex = 0 - TOKEN_SELECT.length(); } int whereIndex = this.sql.indexOf(TOKEN_WHERE); if (whereIndex <= 0) { whereIndex = this.sql.length(); } final String selectColumnsSubStr = this.sql.substring(selectIndex + TOKEN_SELECT.length(), whereIndex); String whereConditionStr = ""; if (this.sql.length() > (whereIndex + TOKEN_WHERE.length())) { whereConditionStr = this.sql.substring(whereIndex + TOKEN_WHERE.length()); } // parse output column expression final String[] selectColumnStrs = selectColumnsSubStr .split(TOKEN_SPLIT); for (int colIndex = 0, len = selectColumnStrs.length; colIndex < len; colIndex++) { // parse column final ColumnInfo columnInfo = new ColumnInfo(colIndex, selectColumnStrs[colIndex].trim()); this.columnInfoList.add(columnInfo); // set max input index this.maxColumnIndex = Math.max(maxColumnIndex, columnInfo.getInputColumnIndex()); } // parse where condition expression // now only support a=x and b=y and c=m and d=n format as an example String[] whereAndStrs = whereConditionStr.split(TOKEN_AND); for (String andConditionStr : whereAndStrs) { String[] segments = andConditionStr.split(TOKEN_EQUAL); if ((null != segments) && (segments.length >= 2)) { whereConditions.add(new EqualsCondition(Integer .valueOf(segments[0].trim()), segments[1].trim())); } } } public boolean isMatchCondition(String[] columns) { boolean result = true; for (EqualsCondition condition : this.whereConditions) { result = result && condition.isMatch(columns); } return result; } public int getMaxColumnIndex() { return this.maxColumnIndex; } public RowKey generateRowKey(String[] columns) { final RowKey result = new RowKey(); final ArrayList rowKeyColumns = result.getColumns(); for (ColumnInfo columnInfo : this.columnInfoList) { if (RowColumn.OUTPUT == columnInfo.aggType) { rowKeyColumns.add(columns[columnInfo.getInputColumnIndex()]); } } return result; } public RowValue generateRowValue(String[] columns) { final RowValue result = new RowValue(); for (ColumnInfo columnInfo : this.columnInfoList) { switch (columnInfo.aggType) { case RowColumn.MAX: result.add(new MaxColumn(columns[columnInfo.inputColumnIndex])); break; case RowColumn.MIN: result.add(new MinColumn(columns[columnInfo.inputColumnIndex])); break; case RowColumn.SUM: result.add(new SumColumn(columns[columnInfo.inputColumnIndex])); break; case RowColumn.AVG: result.add(new AvgColumn(columns[columnInfo.inputColumnIndex])); break; default: // ignore it break; } } return result; } public RowValue getEmptyRowValue() { // generate empty input final String[] inputColumns = new String[this.maxColumnIndex + 1]; for (int i = 0, len = inputColumns.length; i < len; i++) { inputColumns[i] = ""; } return generateRowValue(inputColumns); } class ColumnInfo { private int outputColumnIndex = -1; private int inputColumnIndex = -1; private byte aggType = RowColumn.UNKNOW; public ColumnInfo(int outputColumnIndex, String expression) { this.outputColumnIndex = outputColumnIndex; // find index of char ( and ) expression = expression.trim(); int startIndex = expression.indexOf('('); int endIndex = expression.indexOf(')'); if ((startIndex > 0) && (endIndex > (startIndex + 1))) { this.inputColumnIndex = Integer.valueOf(expression.substring( startIndex + 1, endIndex)); String aggMethodStr = expression.substring(0, startIndex) .trim(); updateAggType("max", aggMethodStr, RowColumn.MAX); updateAggType("min", aggMethodStr, RowColumn.MIN); updateAggType("sum", aggMethodStr, RowColumn.SUM); updateAggType("avg", aggMethodStr, RowColumn.AVG); } else if ((startIndex < 0) && (endIndex < 0)) { this.inputColumnIndex = Integer.valueOf(expression); this.aggType = RowColumn.OUTPUT; } else { // Unknown, ignore it this.aggType = RowColumn.UNKNOW; } } private void updateAggType(String aggMethodStrConst, String inputAggMethodStr, byte relatedAggType) { if (aggMethodStrConst.equals(inputAggMethodStr)) { this.aggType = relatedAggType; } } public int getInputColumnIndex() { return inputColumnIndex; } public int getOutputColumnIndex() { return outputColumnIndex; } public byte getAggType() { return aggType; } } class EqualsCondition { private int columnIndex; private String expectedValue; public EqualsCondition(int columnIndex, String expectedValue) { super(); this.columnIndex = columnIndex; this.expectedValue = expectedValue; } public boolean isMatch(String[] columns) { return expectedValue.equals(columns[columnIndex]); } public int getColumnIndex() { return columnIndex; } } }
SQLParser完全是一个工具类,不涉及序列化和传递,它在每个Mapper、Combiner、Reducer里的setup方法中可以分别依据传入的sql语句单独构造。
6,再看一下Key和Value里面都有啥,怎么序列化的,Key是直接输出列的值列表,Value是汇聚输出列的中间值集合
package org.zsh.mr.test.sql.groupby.model; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.WritableComparable; public class RowKey implements WritableComparable { private ArrayList columns = new ArrayList(); public ArrayList getColumns() { return columns; } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.columns.size()); for (String column : this.columns) { out.writeUTF(column); } } @Override public void readFields(DataInput in) throws IOException { final int size = in.readInt(); this.columns = new ArrayList(size); for (int i = 0; i < size; i++) { this.columns.add(in.readUTF()); } } public static RowKey read(DataInput in) throws IOException { RowKey w = new RowKey(); w.readFields(in); return w; } @Override public int compareTo(RowKey o) { final List thisColumns = this.columns; final List thatColumns = o.columns; final int size = Math.min(thisColumns.size(), thatColumns.size()); int result = 0; for (int i = 0; i < size; i++) { result = thisColumns.get(i).compareTo(thatColumns.get(i)); if (0 != result) { break; } } return result; } public String getOuputString() { final StringBuilder buffer = new StringBuilder(); if (!this.columns.isEmpty()) { buffer.append(this.columns.get(0)); for (int i = 1, size = this.columns.size(); i < size; i++) { buffer.append('\t'); buffer.append(this.columns.get(i)); } } return buffer.toString(); } @Override public String toString() { return getOuputString(); } }
该类有三个职责:
a,代表group by查询输出的一行,比如select a,b,max(c),min(d),则RowKey则保护a,b的一个组合;
b,要能够和其他RowKey比较大小,第一个作用是判断是否为同一行后YARN去聚集RowValue供Reducer处理,第二个作用是YARN拿去排序,不过排序的目的还是服务于第一个目的。
c,要能够支持序列化和反序列化,这是Writable接口定义的,来支撑在服务器间传递,即shuffer。
package org.zsh.mr.test.sql.groupby.model; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.Writable; public class RowValue implements Writable, Cloneable { private List columns = new ArrayList(); @Override public void write(DataOutput out) throws IOException { out.writeInt(this.columns.size()); for (RowColumn column : this.columns) { out.writeByte(column.getAggType()); column.write(out); } } @Override public void readFields(DataInput in) throws IOException { // WARN:must clear at hear, hadoop frame will reuse old object of this // class to redo readFields after map, then provide to reducer or output this.columns.clear(); final int size = in.readInt(); for (int i = 0; i < size; i++) { final byte aggType = in.readByte(); switch (aggType) { case RowColumn.MAX: this.columns.add(MaxColumn.read(in)); break; case RowColumn.MIN: this.columns.add(MinColumn.read(in)); break; case RowColumn.SUM: this.columns.add(SumColumn.read(in)); break; case RowColumn.AVG: this.columns.add(AvgColumn.read(in)); break; default: // ignore it, do not support break; } } } public static RowValue read(DataInput in) throws IOException { RowValue w = new RowValue(); w.readFields(in); return w; } public void combine(RowValue parm) { final int size = Math.min(this.columns.size(), parm.columns.size()); for (int i = 0; i < size; i++) { this.columns.get(i).combine(parm.columns.get(i)); } // TODO, really happen!!! if(this == parm){ this.columns.clear(); throw new IllegalStateException("combine to self"); } } public void add(RowColumn rowColumn) { this.columns.add(rowColumn); } public String getOuputString() { final StringBuilder buffer = new StringBuilder(); if (!this.columns.isEmpty()) { buffer.append(this.columns.get(0).getResult()); for (int i = 1, size = this.columns.size(); i < size; i++) { buffer.append('\t'); buffer.append(this.columns.get(i).getResult()); } } return buffer.toString(); } @Override public String toString() { return getOuputString(); } }
RowValue的职责:
a,能代表select子句中的汇聚列,如select a,b,max(c),min(d)中的三四列;
b,能用来做Combine和最终Reduce;
c,能序列化以支撑服务器间传递
地雷:read方法要支持可重入,即YARN会构造一个该对象并反复使用,调用一次read方法后就代表另一行了,如此反复,所以该例中在read方法中使用this.columns
= new ArrayList<String>(size);语句,而不是在对象构造时初始化list,在read里直接add,我此前代码就是这样的,结果导致columns的size成了一个天文数字。
在Combiner里说过这个地雷,但我对这个比较不满,出于效率考虑框架可以把它作为一个可选功能打开,没必要默认这样,导致行为与预期不符。在实际项目中如果不知道此事,在海量功能代码完成后在调试阶段才发现的话,我们必须在Combiner里新编写一堆代码去生成空的、用来Combine其他RowValue的RowValue,这个工作量可能是巨大的。
比如此例,鉴于有比较久的编码经验且代码量可控,我写代码时根本没做类图设计,直接从最外层开始写代码,由粗到细逐层细化进化出最后的代码,即先写mapper、combiner、reducer,写完这三个类之后可以生成SQLParser、RowKey、RowValue的类定义了,而后再写这三个类方法的实现,写的过程中生成对RowColumn的接口定义,最后再写Max、Min、Avg、Sum的详细实现。
由于此前认为拿inputValueItr的第一行和后续所有行combine是可行的,根本没考虑生成空的RowValue行这么个事儿,抽象层次自顶向下是逐层隔离的,现在你让我给Combiner和Reducer提供一个完全为空的RowValue,我必须从SQLParser、RowValue一路改到AvgColumn的实现,还好我用空字符串作为特殊值处理才不至于把类变得很丑陋。。。
7,最后再看看max、min、avg、sum列的中间数据结构
package org.zsh.mr.test.sql.groupby.model; import org.apache.hadoop.io.Writable; public interface RowColumn extends Writable { public static final byte MAX = 1; public static final byte MIN = 2; public static final byte SUM = 3; public static final byte AVG = 4; public static final byte OUTPUT = 5; public static final byte UNKNOW = 6; public int getAggType(); public void combine(RowColumn rowColumn); public double getResult(); }
能序列化,能combine,能输出
package org.zsh.mr.test.sql.groupby.model; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class MaxColumn implements RowColumn { private double value = Double.MIN_VALUE; public MaxColumn() { super(); } public MaxColumn(String value) { if((null != value) && (! value.isEmpty())){ this.value = Double.valueOf(value); } } public static MaxColumn read(DataInput in) throws IOException { MaxColumn w = new MaxColumn(); w.readFields(in); return w; } @Override public void combine(RowColumn parm) { final MaxColumn parmObj = (MaxColumn) parm; if (Double.MIN_VALUE == this.value) { this.value = parmObj.value; } else if (parmObj.value > this.value) { this.value = parmObj.value; } } @Override public int getAggType() { return RowColumn.MAX; } @Override public void write(DataOutput out) throws IOException { out.writeDouble(this.value); } @Override public void readFields(DataInput in) throws IOException { this.value = in.readDouble(); } @Override public double getResult() { return this.value; } }
package org.zsh.mr.test.sql.groupby.model; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class MinColumn implements RowColumn { private double value = Double.MIN_VALUE; public MinColumn() { super(); } public MinColumn(String value) { if((null != value) && (! value.isEmpty())){ this.value = Double.valueOf(value); } } public static MinColumn read(DataInput in) throws IOException { MinColumn w = new MinColumn(); w.readFields(in); return w; } @Override public void combine(RowColumn parm) { final MinColumn parmObj = (MinColumn) parm; if (Double.MIN_VALUE == this.value) { this.value = parmObj.value; } else if (parmObj.value < this.value) { this.value = parmObj.value; } } @Override public int getAggType() { return RowColumn.MIN; } @Override public void write(DataOutput out) throws IOException { out.writeDouble(this.value); } @Override public void readFields(DataInput in) throws IOException { this.value = in.readDouble(); } @Override public double getResult() { return this.value; } }
package org.zsh.mr.test.sql.groupby.model; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class AvgColumn implements RowColumn { private int count = 0; private double sum = Double.MIN_VALUE; public AvgColumn() { super(); } public AvgColumn(String value) { if ((null != value) && (!value.isEmpty())) { this.sum = Double.valueOf(value); this.count = 1; } } public static AvgColumn read(DataInput in) throws IOException { AvgColumn w = new AvgColumn(); w.readFields(in); return w; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public double getSum() { return sum; } public void setSum(double sum) { this.sum = sum; } @Override public void combine(RowColumn parm) { final AvgColumn parmObj = (AvgColumn) parm; if (Double.MIN_VALUE == parmObj.sum) { this.count = parmObj.count; this.sum = parmObj.sum; } else { this.count += parmObj.count; this.sum += parmObj.sum; } } @Override public int getAggType() { return RowColumn.AVG; } @Override public void write(DataOutput out) throws IOException { out.writeInt(count); out.writeDouble(sum); } @Override public void readFields(DataInput in) throws IOException { this.count = in.readInt(); this.sum = in.readDouble(); } @Override public double getResult() { double result = 0; if (0 != this.count) { result = this.sum / this.count; } return result; } }
package org.zsh.mr.test.sql.groupby.model; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class SumColumn implements RowColumn { private double value = Double.MIN_VALUE; public SumColumn() { super(); } public SumColumn(String value) { if ((null != value) && (!value.isEmpty())) { this.value = Double.valueOf(value); } } public static SumColumn read(DataInput in) throws IOException { SumColumn w = new SumColumn(); w.readFields(in); return w; } @Override public void combine(RowColumn parm) { final SumColumn parmObj = (SumColumn) parm; if (Double.MIN_VALUE == this.value) { this.value = parmObj.value; } else { this.value += parmObj.value; } } @Override public int getAggType() { return RowColumn.SUM; } @Override public void write(DataOutput out) throws IOException { out.writeDouble(this.value); } @Override public void readFields(DataInput in) throws IOException { this.value = in.readDouble(); } @Override public double getResult() { return this.value; } }
这几个类都比较简单,不再详述。
总代码量有点多,或许有开源的库可以用,但我当前不知道也没去搜索,而且再强也赶不上直接不写Hive或Impalla吧,所以写这段代码主要还是为了测试MapReduce。
8,编译打包成groupbysql_v6_FullSql.jar,执行它
bin/hdfs dfs -rm -R -f /test/data/output6
bin/hadoop jar ../testlib/groupbysql_v6_FullSql.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt "select 1,avg(6)" /test/data/output6
bin/hdfs dfs -cat /test/data/output6/part-r-00000
streat0 49.393
streat1 49.485
streat2 50.448
streat3 48.694
streat4 48.87
streat5 50.446
streat6 49.081
streat7 49.144
streat8 48.76
streat9 50.979
由输出看和版本1一致,功能正常(把它调通可是被重用对象的坑折腾的不轻),拿再来搞点复杂的
bin/hdfs dfs -rm -R -f /test/data/output6
bin/hadoop jar ../testlib/groupbysql_v6_FullSql.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt "select 1,3,4,avg(6),min(5),max(5)" /test/data/output6
bin/hdfs dfs -getmerge /test/data/output6/part* output6.txt
less output6.txt
streat0 service0 1473085113700 98.0 74.0 74.0
streat0 service0 1473085113701 24.333333333333332 4.0 97.0
streat0 service0 1473085113705 50.2 7.0 89.0
streat0 service0 1473085113720 1.0 4.0 4.0
streat0 service0 1473085113721 54.888888888888886 9.0 81.0
streat0 service0 1473085113723 67.6 15.0 71.0
streat0 service0 1473085113724 60.2 1.0 56.0
streat0 service0 1473085113726 64.8 12.0 83.0
streat0 service0 1473085113730 44.5 21.0 96.0
streat0 service0 1473085113735 26.0 3.0 95.0
streat0 service0 1473085113736 50.833333333333336 2.0 99.0
...
bin/hdfs dfs -rm -R -f /test/data/output6
bin/hadoop jar ../testlib/groupbysql_v6_FullSql.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt "select 0,3,max(5),min(5),avg(6),sum(7) where 2=company6 and 1=streat8" /test/data/output6
bin/hdfs dfs -getmerge /test/data/output6/part* output6.txt
cat output6.txt
city0 service0 97.0 8.0 56.9 356.0
city0 service1 82.0 12.0 61.0 514.0
city0 service2 91.0 0.0 52.7 523.0
city0 service3 93.0 4.0 43.3 522.0
city0 service4 99.0 18.0 56.3 461.0
city0 service5 94.0 3.0 41.8 601.0
city0 service6 98.0 4.0 53.1 463.0
city0 service7 96.0 10.0 43.4 650.0
city0 service8 95.0 18.0 38.2 360.0
city0 service9 96.0 6.0 46.1 549.0
放眼看去符合预期,就没精力和兴趣详查准确性了,如果想确认可以生成个只有10行的输入来计算对比。
至此大功告成,代码量不多,却具备了大数据SQL能力的雏形,而且达到了探雷的目的,满意。
总结
1,YARN(MapReduce V2)是一种海量数据处理框架,但可以不局限于此,它更是一个分布式特性的部署和调度框架,比如我在HDFS上记录上亿条URL,在Mapper里读取URL,下载解析它们,按Configuration里传来的参数搜索关键字或高级点的模式匹配,而后把符合条件的URL和文本段以Key-Value的形式直接存储到HDFS中,如此则做了一个大规模分布式的爬虫。2,一次MapReduce任务有且只有一个Mapper,可选的挂一个Reducer,可选的挂一个Combiner。假设原始数据分布式在100台机器,典型的数据流是:
a,在100台机器上并行mapper并输到本地临时文件,文件行用key排序的;
b,在100台机器上并行的分别combiner在本地相同key的所有行为一行;
c,如果设置reducer为10个,则每个key都可以分区到一个Reducer,YARN帮我把属于XX
reducer的key及对应value搬运到reducer所在机器(共10台)并写为临时文件;
d,无论在mapper还是reducer的机器上,在排序合并小临时文件的时候都可以执行combiner来及早的缩减行数;
e,最终,reducer把相同key的所有value(combiner的漏网之鱼)合并为一个value并输出。
所以一个带有Combiner的MR任务执行下来大概经过1次HDFS读、5次本地硬盘写、5次硬盘读、1次HDFS写、1次网络传输(不含HDFS读写),总体成本还是很高的,所以在频繁精确读写的场合可以考虑用Redis等外部存储器来做数据传递和分发,YARN的文件级复制对用户透明但比较原始。
3,Mapper和Reducer衔接的那个Key-Value的数据类型可以自定义,但必须明确在job使用和setMapOutputKeyClass(Object.class)和setMapOutputValueClass()中指明,否则报错;而自定义Key要实现WritableComparable接口,Value要实现Writable接口,key和value都要支持序列化,key还要额外支持排序。
4,Combiner和Reducer都会使用Iterator参数遍历单个Key的多个Value,那个Value对象在Iterator循环中是反复重用的,于是:该类的read方法不能有累加效应,必须先清空对象的变量再赋值,比如不能直接调用list.add(),必须先清空list再add,现在的说法叫可重入,输入相同时调用N次和
调用1次效果相同;combiner和reducer时不能复用第一行和后续行直接合并,而必须在iterator所有内容之外新建对象或变量来支撑汇聚计算,否则那个对象的最后状态是iterator最后一个对象的值和自己combine后的状态。
5,使用Combiner可以大幅缩在减节点间传输文件的数据量,但有个前提,必须保证先Combine再Reduce的逻辑效果和直接Reduce效果相同,否则功能都错误了要combine的高效率还有何用?
6,在Job对象里的Configuration对象可以用来向分布式的mapper、combiner、reducer传递参数,后则可在setup方法中获取此对象,如此则继承了单机应用的灵活性。
7,map,combiner,reducer都是可以不输出的,也可以输入一行输出多行,所以可以支撑M:N的转换关系。
8,除HDFS文件外,用户可以使用自定义的输入输出类来对接其他数据源,还没有深入研究。
张社会,2016年9月11日,于家中
转载请注明出处。
相关文章推荐
- hadoop 学习笔记:mapreduce框架详解
- hadoop学习笔记(三)mapreduce程序wordcount
- hadoop 学习笔记:mapreduce框架详解
- hadoop学习笔记之mapreduce中使用hbase
- Hadoop学习笔记(5) MapReduce工作机制
- Hadoop学习笔记---MapReduce
- hadoop学习笔记-4-eclipse运行MapReduce
- Hadoop学习笔记---MapReduce
- hadoop 学习笔记:mapreduce框架详解
- Hadoop 学习笔记 (十) MapReduce实现排序 全局变量
- Hadoop学习笔记(二):MapReduce的特性-计数器、排序
- hadoop 1.2.1 Eclipse mapreduce hello word 学习笔记(二)
- Hadoop学习笔记(一):MapReduce的输入格式
- hadoop 学习笔记:mapreduce框架详解
- Hadoop学习笔记一:MapReduce的工作机制
- 【hadoop学习笔记】How MapReduce Works
- hadoop 学习笔记:mapreduce框架详解
- Hadoop MapReduce 学习笔记(二) 序言和准备2
- hadoop 1.2.1 Eclipse mapreduce hello word 学习笔记
- hadoop 学习笔记 (十) mapreduce2.0