MapReduce处理多个不同的出入文件
2016-03-25 21:45
253 查看
MultipleInputs类指定不同的输入文件路径以及输入文化格式
现有两份数据
phone
123,good number
124,common number
125,bad number
user
zhangsan,123
lisi,124
wangwu,125
现在需要把user和phone按照phone number连接起来。得到下面的结果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number
分析思路(不同文件之间每行数据有相同的kay,在map阶段相同key的不同value就形成了一个集合,在map阶段对该集合里的value进行组合进而得到想要的结果)
还是相当于两张表的一对一join操作。join时对value设置个Bean(JavaBean实现writablecomparable接口),key为外键值
join的优化,详见http://blog.csdn.net/u010366796/article/details/44649933,设置KeyBean(外健和标识flag属性),进行排序
本例中将通过value进行排序,即在value的JavaBean中通过实习CompareTo()方法,完成排序,使得phone表位于首位
1.对value实现JavaBean(实现writablecomparable接口)
view sourceprint?
2.多map类,map1(实现对phone表文件操作)
view sourceprint?
2.map2(实现对user表文件操作)
view sourceprint?
3.reduce类
view sourceprint?
4.job类(关键!!实现多文件的输入格式等)
view sourceprint?
现有两份数据
phone
123,good number
124,common number
125,bad number
user
zhangsan,123
lisi,124
wangwu,125
现在需要把user和phone按照phone number连接起来。得到下面的结果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number
分析思路(不同文件之间每行数据有相同的kay,在map阶段相同key的不同value就形成了一个集合,在map阶段对该集合里的value进行组合进而得到想要的结果)
还是相当于两张表的一对一join操作。join时对value设置个Bean(JavaBean实现writablecomparable接口),key为外键值
join的优化,详见http://blog.csdn.net/u010366796/article/details/44649933,设置KeyBean(外健和标识flag属性),进行排序
本例中将通过value进行排序,即在value的JavaBean中通过实习CompareTo()方法,完成排序,使得phone表位于首位
1.对value实现JavaBean(实现writablecomparable接口)
view sourceprint?
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.DataInput;
04.
import
java.io.DataOutput;
05.
import
java.io.IOException;
06.
07.
import
org.apache.hadoop.io.WritableComparable;
08.
09.
/*
10.
* 自定义的JavaBean
11.
*/
12.
public
class
FlagString
implements
WritableComparable<FlagString> {
13.
private
String value;
14.
private
int
flag;
// 标记 0:表示phone表 1:表示user表
15.
16.
public
FlagString() {
17.
super
();
18.
// TODO Auto-generated constructor stub
19.
}
20.
21.
public
FlagString(String value,
int
flag) {
22.
super
();
23.
this
.value = value;
24.
this
.flag = flag;
25.
}
26.
27.
public
String getValue() {
28.
return
value;
29.
}
30.
31.
public
void
setValue(String value) {
32.
this
.value = value;
33.
}
34.
35.
public
int
getFlag() {
36.
return
flag;
37.
}
38.
39.
public
void
setFlag(
int
flag) {
40.
this
.flag = flag;
41.
}
42.
43.
@Override
44.
public
void
write(DataOutput out)
throws
IOException {
45.
out.writeInt(flag);
46.
out.writeUTF(value);
47.
48.
}
49.
50.
@Override
51.
public
void
readFields(DataInput in)
throws
IOException {
52.
this
.flag = in.readInt();
53.
this
.value = in.readUTF();
54.
}
55.
56.
@Override
57.
public
int
compareTo(FlagStringo) {
58.
if
(
this
.flag >= o.getFlag()) {
59.
if
(
this
.flag > o.getFlag()) {
60.
return
1
;
61.
}
62.
}
else
{
63.
return
-
1
;
64.
}
65.
return
this
.value.compareTo(o.getValue());
66.
}
67.
68.
}
2.多map类,map1(实现对phone表文件操作)
view sourceprint?
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.IOException;
04.
05.
import
org.apache.hadoop.io.LongWritable;
06.
import
org.apache.hadoop.io.Text;
07.
import
org.apache.hadoop.mapreduce.Mapper;
08.
09.
public
class
MultiMap1
extends
Mapper<LongWritable, Text, Text, FlagString> {
10.
private
String delimiter;
// 定义分隔符,由job端设置
11.
12.
@Override
13.
protected
void
setup(
14.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
15.
throws
IOException, InterruptedException {
16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);
17.
}
18.
19.
@Override
20.
protected
void
map(LongWritable key, Text value,
21.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
22.
throws
IOException, InterruptedException {
23.
String line = value.toString().trim();
24.
if
(line.length() >
0
) {
25.
String[] str = line.split(delimiter);
26.
if
(str.length ==
2
) {
27.
context.write(
new
Text(str[
0
].trim()),
28.
new
FlagString(str[
1
].trim(),
0
));
// flag=0,表示phone表
29.
}
30.
}
31.
}
32.
}
2.map2(实现对user表文件操作)
view sourceprint?
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.IOException;
04.
05.
import
org.apache.hadoop.io.LongWritable;
06.
import
org.apache.hadoop.io.Text;
07.
import
org.apache.hadoop.mapreduce.Mapper;
08.
09.
public
class
MultiMap2
extends
Mapper<LongWritable, Text, Text, FlagString> {
10.
private
String delimiter;
// 设置分隔符
11.
12.
@Override
13.
protected
void
setup(
14.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
15.
throws
IOException, InterruptedException {
16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);
17.
}
18.
19.
@Override
20.
protected
void
map(LongWritable key, Text value,
21.
Mapper<LongWritable, Text, Text, FlagString>.Context context)
22.
throws
IOException, InterruptedException {
23.
String line = value.toString().trim();
24.
if
(line.length() >
0
) {
25.
String[] str = line.split(delimiter);
26.
if
(str.length ==
2
) {
27.
context.write(
new
Text(str[
1
].trim()),
28.
new
FlagString(str[
0
].trim(),
1
));
// flag=1为user表
29.
}
30.
}
31.
}
32.
}
3.reduce类
view sourceprint?
01.
package
test.mr.multiinputs;
02.
03.
import
java.io.IOException;
04.
05.
import
org.apache.hadoop.io.NullWritable;
06.
import
org.apache.hadoop.io.Text;
07.
import
org.apache.hadoop.mapreduce.Reducer;
08.
09.
public
class
MultiRedu
extends
Reducer<Text, FlagString, NullWritable, Text> {
10.
private
String delimiter;
// 设置分隔符
11.
12.
@Override
13.
protected
void
setup(
14.
Reducer<Text, FlagString, NullWritable, Text>.Context context)
15.
throws
IOException, InterruptedException {
16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);
17.
}
18.
19.
@Override
20.
protected
void
reduce(Text key, Iterable<FlagString> values,
21.
Reducer<Text, FlagString, NullWritable, Text>.Context context)
22.
throws
IOException, InterruptedException {
23.
// 最后输出的格式为: uservalue,key,phonevalue
24.
String phoneValue =
""
;
25.
String userValue =
""
;
26.
int
num =
0
;
27.
for
(FlagStringvalue : values) {
28.
// 第一个即为phone表
29.
if
(num ==
0
) {
30.
phoneValue = value.getValue();
31.
num++;
32.
}
else
{
33.
userValue = value.getValue();
34.
context.write(NullWritable.get(),
35.
new
Text(userValue + key.toString() + phoneValue));
36.
}
37.
}
38.
}
39.
}
4.job类(关键!!实现多文件的输入格式等)
view sourceprint?
001.
package
test.mr.multiinputs;
002.
003.
import
org.apache.hadoop.conf.Configuration;
004.
import
org.apache.hadoop.fs.Path;
005.
import
org.apache.hadoop.io.NullWritable;
006.
import
org.apache.hadoop.io.Text;
007.
import
org.apache.hadoop.mapreduce.Job;
008.
import
org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
009.
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
010.
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
011.
import
org.apache.hadoop.util.Tool;
012.
import
org.apache.hadoop.util.ToolRunner;
013.
014.
/*
015.
* MultipleInputs类指定不同的输入文件路径以及输入文化格式
016.
现有两份数据
017.
phone
018.
123,good number
019.
124,common number
020.
123,bad number
021.
022.
user
023.
zhangsan,123
024.
lisi,124
025.
wangwu,125
026.
027.
现在需要把user和phone按照phone number连接起来。得到下面的结果
028.
zhangsan,123,good number
029.
lisi,123,common number
030.
wangwu,125,bad number
031.
*/
032.
public
class
MultiMapMain
extends
Configuration
implements
Tool {
033.
private
String input1 =
null
;
// 定义的多个输入文件
034.
private
String input2 =
null
;
035.
private
String output =
null
;
036.
private
String delimiter =
null
;
037.
038.
@Override
039.
public
void
setConf(Configurationconf) {
040.
041.
}
042.
043.
@Override
044.
public
ConfigurationgetConf() {
045.
return
new
Configuration();
046.
}
047.
048.
@Override
049.
public
int
run(String[] args)
throws
Exception {
050.
setArgs(args);
051.
checkParam();
// 对参数进行检测
052.
053.
Configurationconf =
new
Configuration();
054.
Job job =
new
Job(conf);
055.
job.setJarByClass(MultiMapMain.
class
);
056.
057.
job.setMapOutputKeyClass(Text.
class
);
058.
job.setMapOutputValueClass(FlagString.
class
);
059.
060.
job.setReducerClass(MultiRedu.
class
);
061.
job.setOutputKeyClass(NullWritable.
class
);
062.
job.setOutputValueClass(Text.
class
);
063.
064.
// MultipleInputs类添加文件路径
065.
MultipleInputs.addInputPath(job,
new
Path(input1),
066.
TextInputFormat.
class
, MultiMap1.
class
);
067.
MultipleInputs.addInputPath(job,
new
Path(input2),
068.
TextInputFormat.
class
, MultiMap2.
class
);
069.
070.
FileOutputFormat.setOutputPath(job,
new
Path(output));
071.
job.waitForCompletion(
true
);
072.
return
0
;
073.
}
074.
075.
private
void
checkParam() {
076.
if
(input1 ==
null
||
""
.equals(input1.trim())) {
077.
System.out.println(
"no input phone-data path"
);
078.
userMaunel();
079.
System.exit(-
1
);
080.
}
081.
if
(input2 ==
null
||
""
.equals(input2.trim())) {
082.
System.out.println(
"no input user-data path"
);
083.
userMaunel();
084.
System.exit(-
1
);
085.
}
086.
if
(output ==
null
||
""
.equals(output.trim())) {
087.
System.out.println(
"no output path"
);
088.
userMaunel();
089.
System.exit(-
1
);
090.
}
091.
if
(delimiter ==
null
||
""
.equals(delimiter.trim())) {
092.
System.out.println(
"no delimiter"
);
093.
userMaunel();
094.
System.exit(-
1
);
095.
}
096.
097.
}
098.
099.
// 用户手册
100.
private
void
userMaunel() {
101.
System.err.println(
"Usage:"
);
102.
System.err.println(
"-i1 input phone data path."
);
103.
System.err.println(
"-i2 input user data path."
);
104.
System.err.println(
"-o output output data path."
);
105.
System.err.println(
"-delimiter data delimiter default comma."
);
106.
}
107.
108.
// 对属性进行赋值
109.
// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) -delimiter x(分隔符)
110.
private
void
setArgs(String[] args) {
111.
for
(
int
i =
0
; i < args.length; i++) {
112.
if
(
"-i1"
.equals(args[i])) {
113.
input1 = args[++i];
// 将input1赋值为第一个文件的输入路径
114.
}
else
if
(
"-i2"
.equals(args[i])) {
115.
input2 = args[++i];
116.
}
else
if
(
"-o"
.equals(args[i])) {
117.
output = args[++i];
118.
}
else
if
(
"-delimiter"
.equals(args[i])) {
119.
delimiter = args[++i];
120.
}
121.
}
122.
}
123.
124.
public
static
void
main(String[] args)
throws
Exception {
125.
Configurationconf =
new
Configuration();
126.
ToolRunner.run(conf,
new
MultiMapMain(), args);
// 调用run方法
127.
}
128.
}
相关文章推荐
- Windows 7 Ultimate(旗舰版)SP1 32/64位官方原版下载(2011年5月12日更新版)
- ExtJS获取父子、兄弟容器元素方法
- 20145214 《Java程序设计》第4周学习总结
- objective-C学习笔记(九)ARC
- 设置一个虚拟域名使内网其他用户也可以通过虚拟域名访问
- 用shell脚本监控进程是否存在 不存在则启动的实例
- poj3904 Sky Code【容斥原理】
- Ubuntu下编译linux内核,报"mkimage" command not found错的解决
- 无人机矩阵操作系统
- PHP include 和 require 语句
- 在Java中使用Json
- 群发微信图文消息,但是正文中的图片却不显示
- KMP训练五题
- linker command failed with exit code 1 (use -v to see invocation)
- Python reverse order
- Java集合类框架的实践经验
- Linux USB 驱动开发(一)—— USB设备基础概念
- win7 win8 C程序图形化界面
- 把数组排成最小的数
- 关键字