您的位置:首页 > 数据库 > SQL

Hadoop MapReduce学习笔记 SQL查询

2016-09-11 16:12 323 查看

序言

IT软件架构从单机、小规模分布式一直发展到以虚拟化为基础的云化部署形态,技术更新日新月异,所以我们的产品正向云化架构转型,即用廉价X86服务器群获得超强的计算能力且可靠性可接受,其中必然绕不过Apache Hadoop体系的大数据技术栈,于是就有了这篇笔记。

近两年也零零星星的接触过大数据技术,去年在Spark上做了一个月左右的原型验证,效果不甚理想,于是深感这方面基础知识不扎实,有必要体系化的学习一下,于是在最近一个月的晚上没怎么加班,在家里从鼻祖HDFS和MapReduce学起,符合预期的踩了几个雷,而且部分雷此前很少被人写过,共享出来增强记忆也方便同行。

 

地雷:我认为应该这么用,结果必须那么用才能正常工作,均用地雷二字强调,最后一个雷甚至耗掉了我不下三个夜晚的宝贵时光才定位出来。

 

基础操作

这些操作频繁使用,先摆出来使得在后续章节不重复描述

1,如何安装HDFS和YARN(MapReduce二代)环境,照着说明书来即可,我用的是单机为分布式,注意,那个ssh证书的步骤是必须的

官方指导

http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html

启动和停止,分别都要输入三次用户密码来启停namenode、datanode、secondnamenode

sbin/start-dfs.sh

sbin/stop-dfs.sh

 

示例

zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ sbin/start-dfs.sh
Starting namenodes on [localhost]
zhangshehui@localhost's password:
localhost: starting namenode, logging to /home/zhangshehui/workspace/hadoop/hadoop-2.6.4/logs/hadoop-zhangshehui-namenode-DELL.out
zhangshehui@localhost's password:
localhost: starting datanode, logging to /home/zhangshehui/workspace/hadoop/hadoop-2.6.4/logs/hadoop-zhangshehui-datanode-DELL.out
Starting secondary namenodes [0.0.0.0]
zhangshehui@0.0.0.0's password:
0.0.0.0: starting secondarynamenode, logging to /home/zhangshehui/workspace/hadoop/hadoop-2.6.4/logs/hadoop-zhangshehui-secondarynamenode-DELL.out


 

地雷:HDFS的各种文件默认放置到操作系统的tmp目录下了,重启一下就丢失导致必须重新format来初始化,处置方法则是修改这些目录到正常点儿的目录,配置项分别是

etc/hadoop/core-site.xml中的hadoop.tmp.dir放临时文件

etc/hadoop/hdfs-site.xml中的dfs.namenode.name.dir、dfs.namenode.checkpoint.dir、dfs.datanode.data.dir分别是文件名数据库、文件操作日志、文件存放路径

 

2,如何查看HDFS上的文件夹,删除它们,下面命令中xxx就是具体路径了,路径规则是unix风格,从根目录/开始

bin/hdfs dfs -ls /xxx

bin/hdfs dfs -rm /xxx

bin/hdfs dfs -rm -R -f /xxx

 

3,如何下载HDFS上的文件到本地目录

bin/hdfs dfs -get /xxx

 

4,MapReduce输出到HDFS中的文件不是我们期望的单个大文件,而是每个reducer分别生成一个以part开头的小文件,我们要自己合并。如果你只是下载到本地目录看则用下面的命令即可在下载的同时合并它们,否则还要再写一个只有单个Reducer的MapReduce任务把它们合并为另一个HDFS文件,仍然是part开头,然后再改名。想想还算合理,如果一群reducer最后写文件还要排队肯定会大幅影响效率。

bin/hdfs dfs -getmerge  /xxx/part*  localfilename

 

5,如何提交MapReduce任务

bin/hadoop  jar  X.jar  MainClassFullName  MainMethodParm1  MainMethodParm2 ...

 

每次执行完还会给个报表记录各步骤的输入输出数据量,对定位问
4000
题很有帮助

File System Counters
FILE: Number of bytes read=3998984
FILE: Number of bytes written=6549132
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=12075444
HDFS: Number of bytes written=15798
HDFS: Number of read operations=13
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=100000
Map output records=100000
Map output bytes=1789000
Map output materialized bytes=1989006
Input split bytes=108
Combine input records=0
Combine output records=0
Reduce input groups=1000
Reduce shuffle bytes=1989006
Reduce input records=100000
Reduce output records=1000
Spilled Records=200000
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=15
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=601882624
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=6037722
File Output Format Counters
Bytes Written=15798

 

第0个版本,准备桩数据

WordCount是官方示例,但逻辑太过简单了,我打算做一个类数据库查询的能力,以csv格式存储二维的数据,然后执行select a,b,max(c),min(d),avg(e),sum(f) where g=8
and h=10这类准SQL查询。

此程序用于生成csv文件来支撑测试,接收文件路径、行数为main函数参数,一个循环随机生成数据到HDFS上去。

package org.zsh.hdfs.test;

import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class StubDataGenerator {

public static void generateData(String strPath, final int size) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);

Path path = new Path(strPath);
OutputStream out = null;
if(fs.exists(path)){
out = fs.append(path, 4);
}else{
out = fs.create(path);
}

PrintWriter writer = new PrintWriter(new OutputStreamWriter(out));

for(int i=0, progress=-1; i progress){
progress = newProgress;
System.out.print(progress + "% ");
}
}
System.out.println();

writer.close();
out.close();

}

public static String getLine(int i){
// city, streat, company, service, time, indicatorDelay, indicatorJitter, indicatorLostRatio
int city =i/10000;
int streat = (i%10000)/1000;
int company = (i%1000)/100;
int service = (i%100)/10;
int indicatorDelay = (int)(100*Math.random());
int indicatorJitter = (int)(100*Math.random());
int indicatorLostRatio = (int)(100*Math.random());

final StringBuilder buffer = new StringBuilder();
buffer.append("city");
buffer.append(city);
buffer.append(",");
buffer.append("streat");
buffer.append(streat);
buffer.append(",");
buffer.append("company");
buffer.append(company);
buffer.append(",");
buffer.append("service");
buffer.append(service);
buffer.append(",");
buffer.append(System.currentTimeMillis());
buffer.append(",");
buffer.append(indicatorDelay);
buffer.append(",");
buffer.append(indicatorJitter);
buffer.append(",");
buffer.append(indicatorLostRatio);

return buffer.toString();
}

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
generateData(args[0], Integer.valueOf(args[1]));
}

}


编译成StubDataGenerator.jar,执行如下命令即可生成文件

bin/hadoop jar StubDataGenerator.jar org.zsh.hdfs.test.StubDataGenerator /test/data/rawdata_10000.txt 10000

 

而后确认文件是否真的生成了

bin/hdfs dfs -ls /test/data

 

还可以下载下来仔细确认下内容是否正确

bin/hdfs dfs -get /test/data/rawdata_10000.txt

less rawdata_10000.txt

city0,streat0,company0,service0,1473085113700,74,98,0

city0,streat0,company0,service0,1473085113701,65,61,44

city0,streat0,company0,service0,1473085113701,43,45,49

city0,streat0,company0,service0,1473085113701,73,59,33

city0,streat0,company0,service0,1473085113701,79,4,27

city0,streat0,company0,service0,1473085113701,15,10,69

city0,streat0,company0,service0,1473085113701,4,3,42

city0,streat0,company0,service0,1473085113701,97,3,52

city0,streat0,company0,service0,1473085113701,15,23,43

city0,streat0,company0,service0,1473085113701,92,11,43

city0,streat0,company0,service1,1473085113701,47,22,70

city0,streat0,company0,service1,1473085113701,73,99,22

city0,streat0,company0,service1,1473085113701,74,29,42

city0,streat0,company0,service1,1473085113702,40,86,92

city0,streat0,company0,service1,1473085113702,9,79,79

...

 

第1个版本,按第1列分组后求第6列的平均值

1, 首先用main函数启动一切,在Hadoop的术语里叫Driver

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SQLDriver {

/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sql");

job.setJarByClass(SQLDriver.class);

job.setMapperClass(SQLMapper.class);
job.setReducerClass(SQLReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


这里的代码基本都是copy apache官方文档的,后面才慢慢弄明白:

a,Job是一切的核心,设置一堆参数后用waitForCompletion方法启动它;

b,MapperClass逐行的把输入转换为方便后续计算的数据结构;

c,ReducerClass依据上述结构生成最终结果;

d,FileInputFormat的功能有些杂,上面那两个方法的参数之一是job,该静态方法最后把参数给设置到job的conf里面了。它的主要功能是读取HDFS上的输入、分片、写最后的输出到HDFS。通过简单走读代码发现,MapReduce的输入输出不一定是HDFS,甚至都不一定是文件,而文件方式仅是最常用的默认实现,而且“挪软件优先于挪数据”的逻辑也在输入分片的结构体中一起返回。

 

2,Mapper。把输入的每个value字符串用逗号分割,然后取GROUBY_COLUMN_INDEX列为key,取AGG_COLUMN_INDEX列为value,然后输出key-value对儿

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SQLMapper extends Mapper


这里有几个事情值得注意

a,自定义类是Mapper的子类,而Mapper竟然不是Interface,而是个Class,窃以为不妥;

b,Object, Text, Text, DoubleWritable分别是输入key,输入value,输出key,输出value,所以MapReduce从头到尾都是以Key-Value对儿为基本数据结构的,这个是硬性约束,如果你的业务逻辑不是这样,那想办法转换成这种工作模式,或换个框架;

c,有一行输入后你可以根本不输出或输出多行,进而可以做到过滤和行拆分;

d,key和value都不是基本数据类型,如int、String等,一个个“似乎”都要用Hadoop自己定义的数据类型,自定义则要实现相应接口。

 

3,Reducer。这个逻辑是求平均值,框架会把相同key的value聚在一起形成一个iterator供我使用,我拿来遍历一下算出行数count和总和sum,而后用sum/count得出平均值,用输入key和平均值作为输出。

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SQLReducer extends Reducer {
private DoubleWritable result = new DoubleWritable();

protected void reduce(Text inputKey, Iterable inputValueItr,
Context context) throws IOException, InterruptedException {
int count =0;
double sum = 0;
for(DoubleWritable inputIntWritable : inputValueItr){
sum+=inputIntWritable.get();
count++;
}
if(count > 0){
result.set(sum/count);
context.write(inputKey, result);
}
}

}

 

4,常量定义文件,无关紧要了

package org.zsh.mr.test.sql.groupby;

public class SQLConst {

public static final String COLUMN_SEPERATOR = ",";
public static final int GROUBY_COLUMN_INDEX=1;
public static final int AGG_COLUMN_INDEX=6;

}


5,编译打包为groupbysql_v1_simpleAllDouble.jar,运行

bin/hdfs dfs -rm -R -f /test/data/output1

 

bin/hadoop jar ../testlib/groupbysql_v1_simpleAllDouble.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt /test/data/output1

 

zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -ls /test/data/output1

Found 2 items

-rw-r--r--   1 zhangshehui supergroup          0 2016-09-10 00:35 /test/data/output1/_SUCCESS

-rw-r--r--   1 zhangshehui supergroup        148 2016-09-10 00:35 /test/data/output1/part-r-00000

 

zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -cat /test/data/output1/part-r-00000

streat0 49.393

streat1 49.485

streat2 50.448

streat3 48.694

streat4 48.87

streat5 50.446

streat6 49.081

streat7 49.144

streat8 48.76

streat9 50.979

输入是在0到100范围内取随机数,所以均值在50上下符合预期,逻辑正确。

 

第2个版本,让输入输出的数据类型不同

版本1中功能正常了,但有一个小事如鲠在喉,我的输入其实是int,而输出是double,如果map的输出和Reduce的输入都选择IntWritable它会提示我说期望DoubleWritable,那就意味着一旦在job里设置了

job.setOutputValueClass(DoubleWritable.class);

就必须在map和reduce里永远只输出这一个类型了呢?如此则限制也太大了。。

 

经度娘搜索,我发现还要在driver里再设置一个参数才行!

job.setMapOutputValueClass(IntWritable.class);

 

于是乎三个主要的类就变成了下面这样

1,driver里额外设置mapper的输出类型

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SQLDriver {

/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sql");

job.setJarByClass(SQLDriver.class);

job.setMapperClass(SQLMapper.class);
// job.setMapOutputKeyClass(Object.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(SQLReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion
1bb8c
(true) ? 0 : 1);
}

}


2,mapper的输出value改为IntWritable

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SQLMapper extends Mapper

3,Reducer的输入value改为IntWritable

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SQLReducer extends Reducer {
private DoubleWritable result = new DoubleWritable();

protected void reduce(Text inputKey, Iterable inputValueItr,
Context context) throws IOException, InterruptedException {
int count =0;
double sum = 0;
for(IntWritable inputIntWritable : inputValueItr){
sum+=inputIntWritable.get();
count++;
}
if(count > 0){
result.set(sum/count);
context.write(inputKey, result);
}
}

}

4,常量定义还放在这里 

package org.zsh.mr.test.sql.groupby;

public class SQLConst {

public static final String COLUMN_SEPERATOR = ",";
public static final int GROUBY_COLUMN_INDEX=1;
public static final int AGG_COLUMN_INDEX=6;

}


编译打包为groupbysql_v2_IntToDouble.jar并运行

bin/hdfs dfs -rm -R -f /test/data/output2

 

bin/hadoop jar ../testlib/groupbysql_v2_IntToDouble.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt /test/data/output2

 

zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -ls /test/data/output2

Found 2 items

-rw-r--r--   1 zhangshehui supergroup          0 2016-09-10 00:53 /test/data/output2/_SUCCESS

-rw-r--r--   1 zhangshehui supergroup        148 2016-09-10 00:53 /test/data/output2/part-r-00000

 

zhangshehui@DELL:~/workspace/hadoop/hadoop-2.6.4$ bin/hdfs dfs -cat /test/data/output2/part-r-00000

streat0 49.393

streat1 49.485

streat2 50.448

streat3 48.694

streat4 48.87

streat5 50.446

streat6 49.081

streat7 49.144

streat8 48.76

streat9 50.979

结果与版本1完全相同,逻辑正确。

 

地雷:强制要求设置Mapper的输出类型,否则报错,但从原理上看是不必要的这行设置是完全不必要的,一个job只能有一个map和一个reduce,它们的输入输出都是可以用反射发现上来的,为什么非要去设置?就像必须出示老年证才能免费乘车一样可笑!

 

第3个版本,让Mapper传递自定义对象给Reducer用

这个版本3是为版本4做准备和探路的,这要从mapreduce的数据分发说起:

a,典型流程:input-split-map-shuffer-reduce-ouput;

b,shuffer是按key分类的,通过网络传输把一个key的所有行分组汇聚到一个另服务器;

c,如果A服务器有1G原始数据要送到M服务器,YARN允许我们在A服务器先汇聚一遍,减少数据量后(比如100M或10M)再送到M服务器,进而大幅减少网络负载,而这个机制就是Combiner;

 

这个又和自定义Value类型有什么关系呢?因为这个世界不是什么算法都能够分两级或多级汇聚的,比如统计一个省所有人的最大年龄,可以先让各个地市先统计出来个最大年龄,省再取地市最大年龄的最大年龄,这个情况OK;如果统计平均年龄呢,你把三千万人口地市平均年龄和100万人口地市平均年龄放到一起再平均则会得出荒谬的结论。

有一个办法,每个地市分别统计人口总数和年龄总和,省则再现算出全省的人口总和和年龄总和,再用年龄总和除以人口总和就得出准确的平均年龄了。这么干则地市向省政府上报的数据就不再是一个简单的数字,而是两个,考虑到其他无法穷举的计算公式则上报的内容格式应该允许自定义才行。

 

我们再看Mapper和Reducer的接口定义,是泛型,什么信息也没给

org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

那再看看默认的例子,IntWritable、Text,它们无一例外的都实现了接口Writable,有些还实现了Comparable,结合此前看的网帖,Key是要用来排序的,Key和Value都要在节点间传输,那么Writable用于序列化,而Comparable用于把相同Key的行归类聚集。

 

开干

1,先定义那个保存行数和总和的avg中间数据结构如下,要支持就是read自己和write自己到流,保证读写顺序一致即可,而那个静态方法我也不知道hadoop什么时候用,文档里这么建议的,不写也能运行。

package org.zsh.mr.test.sql.groupby;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class AvgStatWritable implements Writable {
private int count;
private double sum;

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(count);
out.writeDouble(sum);
}

@Override
public void readFields(DataInput in) throws IOException {
this.count = in.readInt();
this.sum = in.readDouble();
}

public static AvgStatWritable read(DataInput in) throws IOException {
AvgStatWritable w = new AvgStatWritable();
w.readFields(in);
return w;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public double getSum() {
return sum;
}

public void setSum(double sum) {
this.sum = sum;
}

}

 

2,在Driver里声明Map的输出为自定义的这个类型,不然人家就报错。。。

job.setMapOutputValueClass(AvgStatWritable.class);

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SQLDriver {

/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sql");

job.setJarByClass(SQLDriver.class);

job.setMapperClass(SQLMapper.class);
// job.setMapOutputKeyClass(Object.class);
job.setMapOutputValueClass(AvgStatWritable.class);

job.setReducerClass(SQLReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

 

3,Mapper的输出改用新类型

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SQLMapper extends Mapper

 

4, Reducer的输入改用新类型,当然统计方法也要跟着变

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SQLReducer extends
Reducer {
private DoubleWritable result = new DoubleWritable();

protected void reduce(Text inputKey,
Iterable inputValueItr, Context context)
throws IOException, InterruptedException {
int count = 0;
double sum = 0;
for (AvgStatWritable avgStatWritable : inputValueItr) {
sum += avgStatWritable.getSum();
count += avgStatWritable.getCount();
}
if (count > 0) {
result.set(sum / count);
context.write(inputKey, result);
}
}

}


 

5,把const放在这里保持完整性

package org.zsh.mr.test.sql.groupby;

public class SQLConst {

public static final String COLUMN_SEPERATOR = ",";
public static final int GROUBY_COLUMN_INDEX=1;
public static final int AGG_COLUMN_INDEX=6;

}


6,打包成groupbysql_v3_MiddleAsObject.jar运行,结果完全一样,不再重复贴了

 

第4个版本,增加Combiner步骤来提升效率

准备工作已就绪,增加Combiner来减少节点间数据量传输

1,编写一个新的Reduce类,其输入输出类型相同,且和Mapper的输出相同。其逻辑是cout相加,sum也相加,如此可以保证最终的结果不受影响。

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SQLCombiner extends
Reducer {

@Override
protected void reduce(Text inputKey, Iterable inputValues,
Context context) throws IOException, InterruptedException {
int count = 0;
double sum = 0;
for (AvgStatWritable value : inputValues) {
count += value.getCount();
sum += value.getSum();
}

AvgStatWritable outputValue = new AvgStatWritable();
outputValue.setCount(count);
outputValue.setSum(sum);
context.write(inputKey, outputValue);
}

}

 

2,在Driver里声明有这么一个步骤要做,

job.setCombinerClass(SQLCombiner.class);

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SQLDriver {

/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "sql");

job.setJarByClass(SQLDriver.class);

job.setMapperClass(SQLMapper.class);
// job.setMapOutputKeyClass(Object.class);
job.setMapOutputValueClass(AvgStatWritable.class);

job.setCombinerClass(SQLCombiner.class);

job.setReducerClass(SQLReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

 

3,打包成groupbysql_v4_MiddleAsObject-Combiner.jar运行,结果完全一致,但运行报告的combiner会有些变数,它把行数大幅缩小后reducer几乎无压力了,Reduce
shuffle bytes从第一个版本的1989006字节降低到现在的226字节,在真实的分布式环境下能降低网络IO近两万倍,而这正是我们所预期的。

File System Counters
FILE: Number of bytes read=28072
FILE: Number of bytes written=588528
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1094022
HDFS: Number of bytes written=148
HDFS: Number of read operations=13
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=10000
Map output records=10000
Map output bytes=200000
Map output materialized bytes=226
Input split bytes=114
Combine input records=10000  //变化点
Combine output records=10  //变化点
Reduce input groups=10
Reduce shuffle bytes=226
Reduce input records=10
Reduce output records=10
Spilled Records=20
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=16
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=543162368
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=547011
File Output Format Counters
Bytes Written=148

 

第5个版本,动态指定分组的列

版本5是为版本6探路的。上面的程序有几个明显缺陷:

a,用哪一列做key去group by是写死的,用途太局限;

b,用哪一列做max/min/avg/sum也是写死的,同上;

c,key和value都仅支持一列,远没有数据库的select
group by能力用着爽。

 

本章节先把动态传参问题解决了,下一章节解决多列问题。Driver里的没main函数可以一如既往的接收参数,但程序是分布式运行的,而且分布式进程都是YARN框架拉起来的,我只告诉了YARN用什么类,怎么传参数过去呢?

a,创建job时有个Configuration参数,而Configuration可以set(String
key, String value)

b,Mapper、Combiner、Reducer都可以重载setup和cleanup方法,这两个方法会传入Configuration参数,于是参数就可以分布式的传递了,感谢YARN想的周到

 

1,在Driver里Main函数的参数设置到Configuration里去,不过这个sql参数当前仅仅是个数字而已

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SQLDriver {

/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set(SQLConst.PARM_SQL, args[1]);
Job job = Job.getInstance(conf, "sql");

job.setJarByClass(SQLDriver.class);

job.setMapperClass(SQLMapper.class);
// job.setMapOutputKeyClass(Object.class);
job.setMapOutputValueClass(AvgStatWritable.class);

job.setCombinerClass(SQLCombiner.class);

job.setReducerClass(SQLReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


2,在Map里用它,而Combiner和Reducer则完全无视本次变更

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SQLMapper extends Mapper


3,增加了一个常量做KEY用了

package org.zsh.mr.test.sql.groupby;

public class SQLConst {

public static final String COLUMN_SEPERATOR = ",";
public static final int GROUBY_COLUMN_INDEX=1;
public static final int AGG_COLUMN_INDEX=6;

public static final String PARM_SQL = "zsh.sql.parmsql";

}


 

3,编译打包成groupbysql_v5_DynamicSQL.jar,运行如下

bin/hdfs dfs -rm -R -f /test/data/output5

bin/hadoop jar ../testlib/groupbysql_v5_DynamicSQL.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt 3  /test/data/output5

bin/hdfs dfs -cat /test/data/output5/part-r-00000

company0 49.821

company1 49.569

company2 50.574

company3 49.518

company4 48.809

company5 48.219

company6 49.116

company7 49.328

company8 49.539

company9 50.807

 

bin/hdfs dfs -rm -R -f /test/data/output5

bin/hadoop jar ../testlib/groupbysql_v5_DynamicSQL.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt 4  /test/data/output5

bin/hdfs dfs -cat /test/data/output5/part-r-00000

service0 49.625

service1 50.058

service2 48.916

service3 47.761

service4 52.22

service5 49.27

service6 49.535

service7 47.788

service8 50.317

service9 49.81

 

bin/hdfs dfs -rm -R -f /test/data/output5

bin/hadoop jar ../testlib/groupbysql_v5_DynamicSQL.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt 2  /test/data/output5

bin/hdfs dfs -cat /test/data/output5/part-r-00000

streat0 49.393

streat1 49.485

streat2 50.448

streat3 48.694

streat4 48.87

streat5 50.446

streat6 49.081

streat7 49.144

streat8 48.76

streat9 50.979

看streat这列时数值与以前一样,逻辑正常

第6个版本,全动态指定分组列和汇聚计算列

分布式传参成功了,那我们把它发挥到极致,动态传入一个类SQL的语句,比如“select 1,3,max(5),sum(6),avg(7) where 0=city8”,约定:

a,select子句出现的所有数字都是csv文件中的列号,从0开始计算;

b,where条件中暂且只支持and分割的等号表达式,等号前面的数字是列号,后面是比对参数;

c,函数暂且只支持的max,min,sum,avg;

d,直接输出的列、汇聚计算的列、过滤条件列均支持多列组合。

 

写这份代码前我还在犹豫有没有必要为测试代码搞这么复杂,因为核心理念已经测试完了,而这个功能的大部分代码与Mapreduce不怎么相关,结果还真踩到一个大雷,把自己炸的翻了两个空翻:)收获颇丰。

 

1,Driver里面直接把sql语句作为参数传给所有Mapper、Combiner和Reducer,而且从Mapper传给Reducer的Key和Value都是自定义对象

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.zsh.mr.test.sql.groupby.model.RowKey;
import org.zsh.mr.test.sql.groupby.model.RowValue;

public class SQLDriver {

/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
conf.set(SQLConst.PARM_SQL, args[1]);
Job job = Job.getInstance(conf, "sql");

job.setJarByClass(SQLDriver.class);

job.setMapperClass(SQLMapper.class);
job.setMapOutputKeyClass(RowKey.class);
job.setMapOutputValueClass(RowValue.class);

job.setCombinerClass(SQLCombiner.class);

job.setReducerClass(SQLReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


注意这里的三行关键差异化语句

conf.set(SQLConst.PARM_SQL, args[1]);

job.setMapOutputValueClass(RowValue.class);

job.setMapOutputValueClass(RowValue.class);

 

2,在Mapper中解析SQL,依据SQL过滤行,使用用户选择的列拼装Key,再用用户选择的列拼装Value

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.zsh.mr.test.sql.groupby.model.RowKey;
import org.zsh.mr.test.sql.groupby.model.RowValue;

public class SQLMapper extends Mapper


SQL的复杂性都封装在SQLParser中了,在mapper层面只看到Key和Value,sqlParser.isMatchCondition(columns)用来过滤行,若不满足条件则返回空RowValue,后续增加复杂where能力也只需要修改SQLParser

 

3,在Combiner中对Value做预汇聚,减轻网络压力

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.zsh.mr.test.sql.groupby.model.RowKey;
import org.zsh.mr.test.sql.groupby.model.RowValue;

public class SQLCombiner extends Reducer {

private SQLParser sqlParser = null;

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.sqlParser = new SQLParser(context.getConfiguration().get(
SQLConst.PARM_SQL));
}

@Override
protected void reduce(RowKey inputKey, Iterable inputValueItr,
Context context) throws IOException, InterruptedException {

final RowValue resultRowValue = this.sqlParser.getEmptyRowValue();

for (RowValue rowValue : inputValueItr) {
resultRowValue.combine(rowValue);
}

context.write(inputKey, resultRowValue);
}

}


我把RowValue设计成两者之间可以直接Combine,如此则可以保持顶层足够简单明了。

 

地雷:重大地雷,我们在这里拿inputValueItr遍历出来的对象其实仅仅是同一个对象,不过每次迭代时的值不同而已!!!重复一遍,这个for循环第0次循环和第N次循环得到的rowValue是同一个对象,但它的值不同!!!

 

我原本的逻辑是这样的,即拿第一行combine后续的所有行,最后返回第一行,如此我在顶层就不用理解Row构造的复杂性了,结果resultRowValue一个对象被反复的调用read方法初始化,且反复的combine自己,导致结果完全错误,后面在RowValue类里再展开讲一遍。

RowValue resultRowValue = null;
for (RowValue rowValue : inputValueItr) {
if(null == resultRowValue ){
resultRowValue  = resultRowValue ;
}else{
resultRowValue.combine(rowValue);
}
}
context.write(inputKey, resultRowValue);


 

4,在Reducer中做最后一步汇聚并输出

package org.zsh.mr.test.sql.groupby;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.zsh.mr.test.sql.groupby.model.RowKey;
import org.zsh.mr.test.sql.groupby.model.RowValue;

public class SQLReducer extends Reducer {

private SQLParser sqlParser = null;

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.sqlParser = new SQLParser(context.getConfiguration().get(
SQLConst.PARM_SQL));
}

protected void reduce(RowKey inputKey, Iterable inputValueItr,
Context context) throws IOException, InterruptedException {

final RowValue resultRowValue = this.sqlParser.getEmptyRowValue();

for (RowValue rowValue : inputValueItr) {
resultRowValue.combine(rowValue);
}

// format column as sql speicifed sequence
Text resultKey = new Text(inputKey.getOuputString());
Text resultValue = new Text(resultRowValue.getOuputString());
context.write(resultKey, resultValue);
}

}


这里踩过的雷和Combiner完全一样,一起定位和修改的

 

5,展开查看SQL怎么解析的,以及怎么从csv行生成Key和Value的。这里大概有四个过程要简单抽象描述下:

a,SQL解析阶段,把select列和where列都拆分出来,并且识别出来select列是直接输出还是要先汇聚再输出,该信息存储在columnInfoList和whereConditions中;

b,通过输入行生成RowKey,遍历columnInfoList找出需要直接输出的列集合,直接塞到RowKey中使用;

c,通过输入行生成RowValue,遍历columnInfoList找出先汇聚再输出的列,读出对应的输入列,构造汇聚方法对应的中间对象,放入RowValue中;

d,isMatchCondition用来确认输入行是否符合过滤条件要求,被上层mapper直接调用。

package org.zsh.mr.test.sql.groupby;

import java.util.ArrayList;
import java.util.List;

import org.zsh.mr.test.sql.groupby.model.AvgColumn;
import org.zsh.mr.test.sql.groupby.model.MaxColumn;
import org.zsh.mr.test.sql.groupby.model.MinColumn;
import org.zsh.mr.test.sql.groupby.model.RowColumn;
import org.zsh.mr.test.sql.groupby.model.RowKey;
import org.zsh.mr.test.sql.groupby.model.RowValue;
import org.zsh.mr.test.sql.groupby.model.SumColumn;

public class SQLParser {
private static final String TOKEN_SELECT = "select";
private static final String TOKEN_WHERE = "where";
private static final String TOKEN_AND = "and";
private static final String TOKEN_EQUAL = "=";
private static final String TOKEN_SPLIT = ",";

private String sql;

public List columnInfoList;// = new ArrayList();
private List whereConditions;// = new
// ArrayList();
private int maxColumnIndex = 0;

public SQLParser(String sqlParm) {
this.sql = sqlParm.toLowerCase().trim();
this.columnInfoList = new ArrayList();
this.whereConditions = new ArrayList();

// substring of output columns and where conditions
int selectIndex = this.sql.indexOf(TOKEN_SELECT);
if (selectIndex < 0) {
selectIndex = 0 - TOKEN_SELECT.length();
}

int whereIndex = this.sql.indexOf(TOKEN_WHERE);
if (whereIndex <= 0) {
whereIndex = this.sql.length();
}

final String selectColumnsSubStr = this.sql.substring(selectIndex
+ TOKEN_SELECT.length(), whereIndex);
String whereConditionStr = "";
if (this.sql.length() > (whereIndex + TOKEN_WHERE.length())) {
whereConditionStr = this.sql.substring(whereIndex
+ TOKEN_WHERE.length());
}

// parse output column expression
final String[] selectColumnStrs = selectColumnsSubStr
.split(TOKEN_SPLIT);
for (int colIndex = 0, len = selectColumnStrs.length; colIndex < len; colIndex++) {
// parse column
final ColumnInfo columnInfo = new ColumnInfo(colIndex,
selectColumnStrs[colIndex].trim());
this.columnInfoList.add(columnInfo);

// set max input index
this.maxColumnIndex = Math.max(maxColumnIndex,
columnInfo.getInputColumnIndex());
}

// parse where condition expression
// now only support a=x and b=y and c=m and d=n format as an example
String[] whereAndStrs = whereConditionStr.split(TOKEN_AND);
for (String andConditionStr : whereAndStrs) {
String[] segments = andConditionStr.split(TOKEN_EQUAL);
if ((null != segments) && (segments.length >= 2)) {
whereConditions.add(new EqualsCondition(Integer
.valueOf(segments[0].trim()), segments[1].trim()));
}
}
}

public boolean isMatchCondition(String[] columns) {
boolean result = true;

for (EqualsCondition condition : this.whereConditions) {
result = result && condition.isMatch(columns);
}

return result;
}

public int getMaxColumnIndex() {
return this.maxColumnIndex;
}

public RowKey generateRowKey(String[] columns) {
final RowKey result = new RowKey();
final ArrayList rowKeyColumns = result.getColumns();

for (ColumnInfo columnInfo : this.columnInfoList) {
if (RowColumn.OUTPUT == columnInfo.aggType) {
rowKeyColumns.add(columns[columnInfo.getInputColumnIndex()]);
}
}

return result;
}

public RowValue generateRowValue(String[] columns) {
final RowValue result = new RowValue();

for (ColumnInfo columnInfo : this.columnInfoList) {
switch (columnInfo.aggType) {
case RowColumn.MAX:
result.add(new MaxColumn(columns[columnInfo.inputColumnIndex]));
break;
case RowColumn.MIN:
result.add(new MinColumn(columns[columnInfo.inputColumnIndex]));
break;
case RowColumn.SUM:
result.add(new SumColumn(columns[columnInfo.inputColumnIndex]));
break;
case RowColumn.AVG:
result.add(new AvgColumn(columns[columnInfo.inputColumnIndex]));
break;
default:
// ignore it
break;
}
}

return result;
}

public RowValue getEmptyRowValue() {
// generate empty input
final String[] inputColumns = new String[this.maxColumnIndex + 1];
for (int i = 0, len = inputColumns.length; i < len; i++) {
inputColumns[i] = "";
}
return generateRowValue(inputColumns);
}

class ColumnInfo {
private int outputColumnIndex = -1;
private int inputColumnIndex = -1;
private byte aggType = RowColumn.UNKNOW;

public ColumnInfo(int outputColumnIndex, String expression) {
this.outputColumnIndex = outputColumnIndex;

// find index of char ( and )
expression = expression.trim();
int startIndex = expression.indexOf('(');
int endIndex = expression.indexOf(')');
if ((startIndex > 0) && (endIndex > (startIndex + 1))) {
this.inputColumnIndex = Integer.valueOf(expression.substring(
startIndex + 1, endIndex));

String aggMethodStr = expression.substring(0, startIndex)
.trim();
updateAggType("max", aggMethodStr, RowColumn.MAX);
updateAggType("min", aggMethodStr, RowColumn.MIN);
updateAggType("sum", aggMethodStr, RowColumn.SUM);
updateAggType("avg", aggMethodStr, RowColumn.AVG);
} else if ((startIndex < 0) && (endIndex < 0)) {
this.inputColumnIndex = Integer.valueOf(expression);
this.aggType = RowColumn.OUTPUT;
} else {
// Unknown, ignore it
this.aggType = RowColumn.UNKNOW;
}
}

private void updateAggType(String aggMethodStrConst,
String inputAggMethodStr, byte relatedAggType) {
if (aggMethodStrConst.equals(inputAggMethodStr)) {
this.aggType = relatedAggType;
}
}

public int getInputColumnIndex() {
return inputColumnIndex;
}

public int getOutputColumnIndex() {
return outputColumnIndex;
}

public byte getAggType() {
return aggType;
}
}

class EqualsCondition {
private int columnIndex;
private String expectedValue;

public EqualsCondition(int columnIndex, String expectedValue) {
super();
this.columnIndex = columnIndex;
this.expectedValue = expectedValue;
}

public boolean isMatch(String[] columns) {
return expectedValue.equals(columns[columnIndex]);
}

public int getColumnIndex() {
return columnIndex;
}
}

}
 

SQLParser完全是一个工具类,不涉及序列化和传递,它在每个Mapper、Combiner、Reducer里的setup方法中可以分别依据传入的sql语句单独构造。

 

6,再看一下Key和Value里面都有啥,怎么序列化的,Key是直接输出列的值列表,Value是汇聚输出列的中间值集合

package org.zsh.mr.test.sql.groupby.model;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.WritableComparable;

public class RowKey implements WritableComparable {
private ArrayList columns = new ArrayList();

public ArrayList getColumns() {
return columns;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.columns.size());
for (String column : this.columns) {
out.writeUTF(column);
}
}

@Override
public void readFields(DataInput in) throws IOException {
final int size = in.readInt();
this.columns = new ArrayList(size);
for (int i = 0; i < size; i++) {
this.columns.add(in.readUTF());
}
}

public static RowKey read(DataInput in) throws IOException {
RowKey w = new RowKey();
w.readFields(in);
return w;
}

@Override
public int compareTo(RowKey o) {
final List thisColumns = this.columns;
final List thatColumns = o.columns;

final int size = Math.min(thisColumns.size(), thatColumns.size());
int result = 0;
for (int i = 0; i < size; i++) {
result = thisColumns.get(i).compareTo(thatColumns.get(i));
if (0 != result) {
break;
}
}

return result;
}

public String getOuputString() {
final StringBuilder buffer = new StringBuilder();

if (!this.columns.isEmpty()) {
buffer.append(this.columns.get(0));
for (int i = 1, size = this.columns.size(); i < size; i++) {
buffer.append('\t');
buffer.append(this.columns.get(i));
}
}

return buffer.toString();
}

@Override
public String toString() {
return getOuputString();
}
}
 

该类有三个职责:

a,代表group by查询输出的一行,比如select a,b,max(c),min(d),则RowKey则保护a,b的一个组合;

b,要能够和其他RowKey比较大小,第一个作用是判断是否为同一行后YARN去聚集RowValue供Reducer处理,第二个作用是YARN拿去排序,不过排序的目的还是服务于第一个目的。

c,要能够支持序列化和反序列化,这是Writable接口定义的,来支撑在服务器间传递,即shuffer。

 

package org.zsh.mr.test.sql.groupby.model;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Writable;

public class RowValue implements Writable, Cloneable {
private List columns = new ArrayList();

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.columns.size());
for (RowColumn column : this.columns) {
out.writeByte(column.getAggType());
column.write(out);
}
}

@Override
public void readFields(DataInput in) throws IOException {

// WARN:must clear at hear, hadoop frame will reuse old object of this
// class to redo readFields after map, then provide to reducer or output
this.columns.clear();

final int size = in.readInt();
for (int i = 0; i < size; i++) {
final byte aggType = in.readByte();
switch (aggType) {
case RowColumn.MAX:
this.columns.add(MaxColumn.read(in));
break;
case RowColumn.MIN:
this.columns.add(MinColumn.read(in));
break;
case RowColumn.SUM:
this.columns.add(SumColumn.read(in));
break;
case RowColumn.AVG:
this.columns.add(AvgColumn.read(in));
break;
default:
// ignore it, do not support
break;
}
}
}

public static RowValue read(DataInput in) throws IOException {
RowValue w = new RowValue();
w.readFields(in);
return w;
}

public void combine(RowValue parm) {
final int size = Math.min(this.columns.size(), parm.columns.size());
for (int i = 0; i < size; i++) {
this.columns.get(i).combine(parm.columns.get(i));
}

// TODO, really happen!!!
if(this == parm){
this.columns.clear();
throw new IllegalStateException("combine to self");
}
}

public void add(RowColumn rowColumn) {
this.columns.add(rowColumn);
}

public String getOuputString() {
final StringBuilder buffer = new StringBuilder();

if (!this.columns.isEmpty()) {
buffer.append(this.columns.get(0).getResult());
for (int i = 1, size = this.columns.size(); i < size; i++) {
buffer.append('\t');
buffer.append(this.columns.get(i).getResult());
}
}

return buffer.toString();
}

@Override
public String toString() {
return getOuputString();
}
}

 RowValue的职责:

a,能代表select子句中的汇聚列,如select a,b,max(c),min(d)中的三四列;

b,能用来做Combine和最终Reduce;

c,能序列化以支撑服务器间传递

 

地雷:read方法要支持可重入,即YARN会构造一个该对象并反复使用,调用一次read方法后就代表另一行了,如此反复,所以该例中在read方法中使用this.columns
= new ArrayList<String>(size);语句,而不是在对象构造时初始化list,在read里直接add,我此前代码就是这样的,结果导致columns的size成了一个天文数字。


 

在Combiner里说过这个地雷,但我对这个比较不满,出于效率考虑框架可以把它作为一个可选功能打开,没必要默认这样,导致行为与预期不符。在实际项目中如果不知道此事,在海量功能代码完成后在调试阶段才发现的话,我们必须在Combiner里新编写一堆代码去生成空的、用来Combine其他RowValue的RowValue,这个工作量可能是巨大的。

 

比如此例,鉴于有比较久的编码经验且代码量可控,我写代码时根本没做类图设计,直接从最外层开始写代码,由粗到细逐层细化进化出最后的代码,即先写mapper、combiner、reducer,写完这三个类之后可以生成SQLParser、RowKey、RowValue的类定义了,而后再写这三个类方法的实现,写的过程中生成对RowColumn的接口定义,最后再写Max、Min、Avg、Sum的详细实现。

由于此前认为拿inputValueItr的第一行和后续所有行combine是可行的,根本没考虑生成空的RowValue行这么个事儿,抽象层次自顶向下是逐层隔离的,现在你让我给Combiner和Reducer提供一个完全为空的RowValue,我必须从SQLParser、RowValue一路改到AvgColumn的实现,还好我用空字符串作为特殊值处理才不至于把类变得很丑陋。。。

 

7,最后再看看max、min、avg、sum列的中间数据结构

package org.zsh.mr.test.sql.groupby.model;

import org.apache.hadoop.io.Writable;

public interface RowColumn extends Writable {
public static final byte MAX = 1;
public static final byte MIN = 2;
public static final byte SUM = 3;
public static final byte AVG = 4;
public static final byte OUTPUT = 5;
public static final byte UNKNOW = 6;

public int getAggType();

public void combine(RowColumn rowColumn);

public double getResult();

}


能序列化,能combine,能输出

package org.zsh.mr.test.sql.groupby.model;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MaxColumn implements RowColumn {
private double value = Double.MIN_VALUE;

public MaxColumn() {
super();
}

public MaxColumn(String value) {
if((null != value) && (! value.isEmpty())){
this.value = Double.valueOf(value);
}
}

public static MaxColumn read(DataInput in) throws IOException {
MaxColumn w = new MaxColumn();
w.readFields(in);
return w;
}

@Override
public void combine(RowColumn parm) {
final MaxColumn parmObj = (MaxColumn) parm;
if (Double.MIN_VALUE == this.value) {
this.value = parmObj.value;
} else if (parmObj.value > this.value) {
this.value = parmObj.value;
}
}

@Override
public int getAggType() {
return RowColumn.MAX;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(this.value);
}

@Override
public void readFields(DataInput in) throws IOException {
this.value = in.readDouble();
}

@Override
public double getResult() {
return this.value;
}

}

package org.zsh.mr.test.sql.groupby.model;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MinColumn implements RowColumn {
private double value = Double.MIN_VALUE;

public MinColumn() {
super();
}

public MinColumn(String value) {
if((null != value) && (! value.isEmpty())){
this.value = Double.valueOf(value);
}
}

public static MinColumn read(DataInput in) throws IOException {
MinColumn w = new MinColumn();
w.readFields(in);
return w;
}

@Override
public void combine(RowColumn parm) {
final MinColumn parmObj = (MinColumn) parm;
if (Double.MIN_VALUE == this.value) {
this.value = parmObj.value;
} else if (parmObj.value < this.value) {
this.value = parmObj.value;
}
}

@Override
public int getAggType() {
return RowColumn.MIN;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(this.value);
}

@Override
public void readFields(DataInput in) throws IOException {
this.value = in.readDouble();
}

@Override
public double getResult() {
return this.value;
}
}


package org.zsh.mr.test.sql.groupby.model;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class AvgColumn implements RowColumn {
private int count = 0;
private double sum = Double.MIN_VALUE;

public AvgColumn() {
super();
}

public AvgColumn(String value) {
if ((null != value) && (!value.isEmpty())) {
this.sum = Double.valueOf(value);
this.count = 1;
}
}

public static AvgColumn read(DataInput in) throws IOException {
AvgColumn w = new AvgColumn();
w.readFields(in);
return w;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public double getSum() {
return sum;
}

public void setSum(double sum) {
this.sum = sum;
}

@Override
public void combine(RowColumn parm) {
final AvgColumn parmObj = (AvgColumn) parm;
if (Double.MIN_VALUE == parmObj.sum) {
this.count = parmObj.count;
this.sum = parmObj.sum;
} else {
this.count += parmObj.count;
this.sum += parmObj.sum;
}
}

@Override
public int getAggType() {
return RowColumn.AVG;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(count);
out.writeDouble(sum);
}

@Override
public void readFields(DataInput in) throws IOException {
this.count = in.readInt();
this.sum = in.readDouble();
}

@Override
public double getResult() {
double result = 0;

if (0 != this.count) {
result = this.sum / this.count;
}

return result;
}

}


package org.zsh.mr.test.sql.groupby.model;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class SumColumn implements RowColumn {
private double value = Double.MIN_VALUE;

public SumColumn() {
super();
}

public SumColumn(String value) {
if ((null != value) && (!value.isEmpty())) {
this.value = Double.valueOf(value);
}
}

public static SumColumn read(DataInput in) throws IOException {
SumColumn w = new SumColumn();
w.readFields(in);
return w;
}

@Override
public void combine(RowColumn parm) {
final SumColumn parmObj = (SumColumn) parm;
if (Double.MIN_VALUE == this.value) {
this.value = parmObj.value;
} else {
this.value += parmObj.value;
}
}

@Override
public int getAggType() {
return RowColumn.SUM;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(this.value);
}

@Override
public void readFields(DataInput in) throws IOException {
this.value = in.readDouble();
}

@Override
public double getResult() {
return this.value;
}
}
 

这几个类都比较简单,不再详述。

 

总代码量有点多,或许有开源的库可以用,但我当前不知道也没去搜索,而且再强也赶不上直接不写Hive或Impalla吧,所以写这段代码主要还是为了测试MapReduce。

 

8,编译打包成groupbysql_v6_FullSql.jar,执行它

bin/hdfs dfs -rm -R -f /test/data/output6

bin/hadoop jar ../testlib/groupbysql_v6_FullSql.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt  "select 1,avg(6)"  /test/data/output6

bin/hdfs dfs -cat /test/data/output6/part-r-00000

streat0 49.393

streat1 49.485

streat2 50.448

streat3 48.694

streat4 48.87

streat5 50.446

streat6 49.081

streat7 49.144

streat8 48.76

streat9 50.979

由输出看和版本1一致,功能正常(把它调通可是被重用对象的坑折腾的不轻),拿再来搞点复杂的

bin/hdfs dfs -rm -R -f /test/data/output6

bin/hadoop jar ../testlib/groupbysql_v6_FullSql.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt  "select 1,3,4,avg(6),min(5),max(5)"  /test/data/output6

bin/hdfs dfs -getmerge /test/data/output6/part* output6.txt

less output6.txt

streat0 service0        1473085113700   98.0    74.0    74.0

streat0 service0        1473085113701   24.333333333333332      4.0     97.0

streat0 service0        1473085113705   50.2    7.0     89.0

streat0 service0        1473085113720   1.0     4.0     4.0

streat0 service0        1473085113721   54.888888888888886      9.0     81.0

streat0 service0        1473085113723   67.6    15.0    71.0

streat0 service0        1473085113724   60.2    1.0     56.0

streat0 service0        1473085113726   64.8    12.0    83.0

streat0 service0        1473085113730   44.5    21.0    96.0

streat0 service0        1473085113735   26.0    3.0     95.0

streat0 service0        1473085113736   50.833333333333336      2.0     99.0

...

 

 

bin/hdfs dfs -rm -R -f /test/data/output6

bin/hadoop jar ../testlib/groupbysql_v6_FullSql.jar org.zsh.mr.test.sql.groupby.SQLDriver /test/data/rawdata_10000.txt  "select 0,3,max(5),min(5),avg(6),sum(7) where 2=company6 and 1=streat8"  /test/data/output6

bin/hdfs dfs -getmerge /test/data/output6/part* output6.txt

cat output6.txt

city0 service0 97.0 8.0 56.9 356.0

city0 service1 82.0 12.0 61.0 514.0

city0 service2 91.0 0.0 52.7 523.0

city0 service3 93.0 4.0 43.3 522.0

city0 service4 99.0 18.0 56.3 461.0

city0 service5 94.0 3.0 41.8 601.0

city0 service6 98.0 4.0 53.1 463.0

city0 service7 96.0 10.0 43.4 650.0

city0 service8 95.0 18.0 38.2 360.0

city0 service9 96.0 6.0 46.1 549.0

放眼看去符合预期,就没精力和兴趣详查准确性了,如果想确认可以生成个只有10行的输入来计算对比。

至此大功告成,代码量不多,却具备了大数据SQL能力的雏形,而且达到了探雷的目的,满意。

 

总结

1,YARN(MapReduce V2)是一种海量数据处理框架,但可以不局限于此,它更是一个分布式特性的部署和调度框架,比如我在HDFS上记录上亿条URL,在Mapper里读取URL,下载解析它们,按Configuration里传来的参数搜索关键字或高级点的模式匹配,而后把符合条件的URL和文本段以Key-Value的形式直接存储到HDFS中,如此则做了一个大规模分布式的爬虫。

 

2,一次MapReduce任务有且只有一个Mapper,可选的挂一个Reducer,可选的挂一个Combiner。假设原始数据分布式在100台机器,典型的数据流是:

a,在100台机器上并行mapper并输到本地临时文件,文件行用key排序的;

b,在100台机器上并行的分别combiner在本地相同key的所有行为一行;

c,如果设置reducer为10个,则每个key都可以分区到一个Reducer,YARN帮我把属于XX
reducer的key及对应value搬运到reducer所在机器(共10台)并写为临时文件;

d,无论在mapper还是reducer的机器上,在排序合并小临时文件的时候都可以执行combiner来及早的缩减行数;

e,最终,reducer把相同key的所有value(combiner的漏网之鱼)合并为一个value并输出。

所以一个带有Combiner的MR任务执行下来大概经过1次HDFS读、5次本地硬盘写、5次硬盘读、1次HDFS写、1次网络传输(不含HDFS读写),总体成本还是很高的,所以在频繁精确读写的场合可以考虑用Redis等外部存储器来做数据传递和分发,YARN的文件级复制对用户透明但比较原始。

 

3,Mapper和Reducer衔接的那个Key-Value的数据类型可以自定义,但必须明确在job使用和setMapOutputKeyClass(Object.class)和setMapOutputValueClass()中指明,否则报错;而自定义Key要实现WritableComparable接口,Value要实现Writable接口,key和value都要支持序列化,key还要额外支持排序。

 

4,Combiner和Reducer都会使用Iterator参数遍历单个Key的多个Value,那个Value对象在Iterator循环中是反复重用的,于是:该类的read方法不能有累加效应,必须先清空对象的变量再赋值,比如不能直接调用list.add(),必须先清空list再add,现在的说法叫可重入,输入相同时调用N次和
调用1次效果相同;combiner和reducer时不能复用第一行和后续行直接合并,而必须在iterator所有内容之外新建对象或变量来支撑汇聚计算,否则那个对象的最后状态是iterator最后一个对象的值和自己combine后的状态。

 

5,使用Combiner可以大幅缩在减节点间传输文件的数据量,但有个前提,必须保证先Combine再Reduce的逻辑效果和直接Reduce效果相同,否则功能都错误了要combine的高效率还有何用?

 

6,在Job对象里的Configuration对象可以用来向分布式的mapper、combiner、reducer传递参数,后则可在setup方法中获取此对象,如此则继承了单机应用的灵活性。

 

7,map,combiner,reducer都是可以不输出的,也可以输入一行输出多行,所以可以支撑M:N的转换关系

 

8,除HDFS文件外,用户可以使用自定义的输入输出类来对接其他数据源,还没有深入研究。

张社会,2016年9月11日,于家中

转载请注明出处。

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息