您的位置:首页 > 其它

MapReduce处理多个不同的出入文件

2016-03-25 21:45 253 查看
MultipleInputs类指定不同的输入文件路径以及输入文化格式

现有两份数据

phone

123,good number

124,common number

125,bad number

user

zhangsan,123

lisi,124

wangwu,125

现在需要把user和phone按照phone number连接起来。得到下面的结果

zhangsan,123,good number

lisi,123,common number

wangwu,125,bad number

分析思路(不同文件之间每行数据有相同的kay,在map阶段相同key的不同value就形成了一个集合,在map阶段对该集合里的value进行组合进而得到想要的结果)

还是相当于两张表的一对一join操作。join时对value设置个Bean(JavaBean实现writablecomparable接口),key为外键值
join的优化,详见http://blog.csdn.net/u010366796/article/details/44649933,设置KeyBean(外健和标识flag属性),进行排序

本例中将通过value进行排序,即在value的JavaBean中通过实习CompareTo()方法,完成排序,使得phone表位于首位

1.对value实现JavaBean(实现writablecomparable接口)

view sourceprint?

01.
package

test.mr.multiinputs;

02.

03.
import

java.io.DataInput;

04.
import

java.io.DataOutput;

05.
import

java.io.IOException;

06.

07.
import

org.apache.hadoop.io.WritableComparable;

08.

09.
/*

10.
* 自定义的JavaBean

11.
*/

12.
public

class
FlagString
implements

WritableComparable<FlagString> {

13.
private

String value;

14.
private

int
flag;
// 标记 0:表示phone表 1:表示user表

15.

16.
public

FlagString() {

17.
super
();

18.
// TODO Auto-generated constructor stub

19.
}

20.

21.
public

FlagString(String value,
int

flag) {

22.
super
();

23.
this
.value = value;

24.
this
.flag = flag;

25.
}

26.

27.
public

String getValue() {

28.
return

value;

29.
}

30.

31.
public

void
setValue(String value) {

32.
this
.value = value;

33.
}

34.

35.
public

int
getFlag() {

36.
return

flag;

37.
}

38.

39.
public

void
setFlag(
int

flag) {

40.
this
.flag = flag;

41.
}

42.

43.
@Override

44.
public

void
write(DataOutput out)

throws
IOException {

45.
out.writeInt(flag);

46.
out.writeUTF(value);

47.

48.
}

49.

50.
@Override

51.
public

void
readFields(DataInput in)

throws
IOException {

52.
this
.flag = in.readInt();

53.
this
.value = in.readUTF();

54.
}

55.

56.
@Override

57.
public

int
compareTo(FlagStringo) {

58.
if

(
this
.flag >= o.getFlag()) {

59.
if

(
this
.flag > o.getFlag()) {

60.
return

1
;

61.
}

62.
}
else
{

63.
return

-
1
;

64.
}

65.
return

this
.value.compareTo(o.getValue());

66.
}

67.

68.
}


2.多map类,map1(实现对phone表文件操作)

view sourceprint?

01.
package

test.mr.multiinputs;

02.

03.
import

java.io.IOException;

04.

05.
import

org.apache.hadoop.io.LongWritable;

06.
import

org.apache.hadoop.io.Text;

07.
import

org.apache.hadoop.mapreduce.Mapper;

08.

09.
public

class
MultiMap1
extends

Mapper<LongWritable, Text, Text, FlagString> {

10.
private

String delimiter;
// 定义分隔符,由job端设置

11.

12.
@Override

13.
protected

void
setup(

14.
Mapper<LongWritable, Text, Text, FlagString>.Context context)

15.
throws

IOException, InterruptedException {

16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);

17.
}

18.

19.
@Override

20.
protected

void
map(LongWritable key, Text value,

21.
Mapper<LongWritable, Text, Text, FlagString>.Context context)

22.
throws

IOException, InterruptedException {

23.
String line = value.toString().trim();

24.
if

(line.length() >
0
) {

25.
String[] str = line.split(delimiter);

26.
if

(str.length ==
2
) {

27.
context.write(
new

Text(str[
0
].trim()),

28.
new

FlagString(str[
1
].trim(),
0
));
// flag=0,表示phone表

29.
}

30.
}

31.
}

32.
}


2.map2(实现对user表文件操作)

view sourceprint?

01.
package

test.mr.multiinputs;

02.

03.
import

java.io.IOException;

04.

05.
import

org.apache.hadoop.io.LongWritable;

06.
import

org.apache.hadoop.io.Text;

07.
import

org.apache.hadoop.mapreduce.Mapper;

08.

09.
public

class
MultiMap2
extends

Mapper<LongWritable, Text, Text, FlagString> {

10.
private

String delimiter;
// 设置分隔符

11.

12.
@Override

13.
protected

void
setup(

14.
Mapper<LongWritable, Text, Text, FlagString>.Context context)

15.
throws

IOException, InterruptedException {

16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);

17.
}

18.

19.
@Override

20.
protected

void
map(LongWritable key, Text value,

21.
Mapper<LongWritable, Text, Text, FlagString>.Context context)

22.
throws

IOException, InterruptedException {

23.
String line = value.toString().trim();

24.
if

(line.length() >
0
) {

25.
String[] str = line.split(delimiter);

26.
if

(str.length ==
2
) {

27.
context.write(
new

Text(str[
1
].trim()),

28.
new

FlagString(str[
0
].trim(),
1
));
// flag=1为user表

29.
}

30.
}

31.
}

32.
}


3.reduce类

view sourceprint?

01.
package

test.mr.multiinputs;

02.

03.
import

java.io.IOException;

04.

05.
import

org.apache.hadoop.io.NullWritable;

06.
import

org.apache.hadoop.io.Text;

07.
import

org.apache.hadoop.mapreduce.Reducer;

08.

09.
public

class
MultiRedu
extends

Reducer<Text, FlagString, NullWritable, Text> {

10.
private

String delimiter;
// 设置分隔符

11.

12.
@Override

13.
protected

void
setup(

14.
Reducer<Text, FlagString, NullWritable, Text>.Context context)

15.
throws

IOException, InterruptedException {

16.
delimiter = context.getConfiguration().get(
"delimiter"
,
","
);

17.
}

18.

19.
@Override

20.
protected

void
reduce(Text key, Iterable<FlagString> values,

21.
Reducer<Text, FlagString, NullWritable, Text>.Context context)

22.
throws

IOException, InterruptedException {

23.
// 最后输出的格式为: uservalue,key,phonevalue

24.
String phoneValue =
""
;

25.
String userValue =
""
;

26.
int

num =
0
;

27.
for

(FlagStringvalue : values) {

28.
// 第一个即为phone表

29.
if

(num ==
0
) {

30.
phoneValue = value.getValue();

31.
num++;

32.
}
else
{

33.
userValue = value.getValue();

34.
context.write(NullWritable.get(),

35.
new

Text(userValue + key.toString() + phoneValue));

36.
}

37.
}

38.
}

39.
}


4.job类(关键!!实现多文件的输入格式等)

view sourceprint?

001.
package

test.mr.multiinputs;

002.

003.
import

org.apache.hadoop.conf.Configuration;

004.
import

org.apache.hadoop.fs.Path;

005.
import

org.apache.hadoop.io.NullWritable;

006.
import

org.apache.hadoop.io.Text;

007.
import

org.apache.hadoop.mapreduce.Job;

008.
import

org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

009.
import

org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

010.
import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

011.
import

org.apache.hadoop.util.Tool;

012.
import

org.apache.hadoop.util.ToolRunner;

013.

014.
/*

015.
* MultipleInputs类指定不同的输入文件路径以及输入文化格式

016.
现有两份数据

017.
phone

018.
123,good number

019.
124,common number

020.
123,bad number

021.

022.
user

023.
zhangsan,123

024.
lisi,124

025.
wangwu,125

026.

027.
现在需要把user和phone按照phone number连接起来。得到下面的结果

028.
zhangsan,123,good number

029.
lisi,123,common number

030.
wangwu,125,bad number

031.
*/

032.
public

class
MultiMapMain
extends

Configuration
implements
Tool {

033.
private

String input1 =
null
;
// 定义的多个输入文件

034.
private

String input2 =
null
;

035.
private

String output =
null
;

036.
private

String delimiter =
null
;

037.

038.
@Override

039.
public

void
setConf(Configurationconf) {

040.

041.
}

042.

043.
@Override

044.
public

ConfigurationgetConf() {

045.
return

new
Configuration();

046.
}

047.

048.
@Override

049.
public

int
run(String[] args)
throws

Exception {

050.
setArgs(args);

051.
checkParam();
// 对参数进行检测

052.

053.
Configurationconf =
new
Configuration();

054.
Job job =
new
Job(conf);

055.
job.setJarByClass(MultiMapMain.
class
);

056.

057.
job.setMapOutputKeyClass(Text.
class
);

058.
job.setMapOutputValueClass(FlagString.
class
);

059.

060.
job.setReducerClass(MultiRedu.
class
);

061.
job.setOutputKeyClass(NullWritable.
class
);

062.
job.setOutputValueClass(Text.
class
);

063.

064.
// MultipleInputs类添加文件路径

065.
MultipleInputs.addInputPath(job,
new
Path(input1),

066.
TextInputFormat.
class
, MultiMap1.
class
);

067.
MultipleInputs.addInputPath(job,
new
Path(input2),

068.
TextInputFormat.
class
, MultiMap2.
class
);

069.

070.
FileOutputFormat.setOutputPath(job,
new
Path(output));

071.
job.waitForCompletion(
true
);

072.
return

0
;

073.
}

074.

075.
private

void
checkParam() {

076.
if

(input1 ==
null
||
""
.equals(input1.trim())) {

077.
System.out.println(
"no input phone-data path"
);

078.
userMaunel();

079.
System.exit(-
1
);

080.
}

081.
if

(input2 ==
null
||
""
.equals(input2.trim())) {

082.
System.out.println(
"no input user-data path"
);

083.
userMaunel();

084.
System.exit(-
1
);

085.
}

086.
if

(output ==
null
||
""
.equals(output.trim())) {

087.
System.out.println(
"no output path"
);

088.
userMaunel();

089.
System.exit(-
1
);

090.
}

091.
if

(delimiter ==
null
||
""
.equals(delimiter.trim())) {

092.
System.out.println(
"no delimiter"
);

093.
userMaunel();

094.
System.exit(-
1
);

095.
}

096.

097.
}

098.

099.
// 用户手册

100.
private

void
userMaunel() {

101.
System.err.println(
"Usage:"
);

102.
System.err.println(
"-i1 input    phone data path."
);

103.
System.err.println(
"-i2 input    user data path."
);

104.
System.err.println(
"-o output    output data path."
);

105.
System.err.println(
"-delimiter data delimiter    default comma."
);

106.
}

107.

108.
// 对属性进行赋值

109.
// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) -delimiter x(分隔符)

110.
private

void
setArgs(String[] args) {

111.
for

(
int
i =
0
; i < args.length; i++) {

112.
if

(
"-i1"
.equals(args[i])) {

113.
input1 = args[++i];
// 将input1赋值为第一个文件的输入路径

114.
}
else
if
(
"-i2"
.equals(args[i])) {

115.
input2 = args[++i];

116.
}
else
if
(
"-o"
.equals(args[i])) {

117.
output = args[++i];

118.
}
else
if
(
"-delimiter"
.equals(args[i])) {

119.
delimiter = args[++i];

120.
}

121.
}

122.
}

123.

124.
public

static
void
main(String[] args)
throws
Exception {

125.
Configurationconf =
new
Configuration();

126.
ToolRunner.run(conf,
new
MultiMapMain(), args);
// 调用run方法

127.
}

128.
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: