您的位置:首页 > 其它

编写MapReduce程序(简单的电话被呼叫分析程序)

2014-03-17 23:26 239 查看
由于Hadoop 2.2.0目前还没有好用的Eclipse插件,目前使用Eclipse上编写代码,而后放到Hadoop环境执行的形式。

准备工作:

1、搭建Hadoop环境,创建项目,项目的BuildPath中添加所有Hadoop中的jar包;

2、构造数据集:每一行数据两个号码组成,呼叫号和被呼叫号,生成随机测试数据,将生成的文件放入hdfs中;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Random;

public class GenerateTestData {
public static void writeToFile(String fileName) throws Exception{
OutputStream out = new FileOutputStream(new File(fileName));
BufferedOutputStream bo = new BufferedOutputStream(out);

Random rd1 = new Random();

for(int i=0; i<10000; i++){
int j=0;
StringBuffer sb = new StringBuffer("");
sb.append(1);
for(j=1;j<9;j++){
sb.append(rd1.nextInt(10));
//bo.write(rd1.nextInt(10));
}
sb.append(" ");
switch(rd1.nextInt(10)){
case 1:
sb.append("10086");
break;
case 2:
sb.append("110");
break;
case 3:
sb.append("120");
break;
case 4:
sb.append("119");
break;
case 5:
sb.append("114");
break;
case 6:
sb.append("17951");
break;
case 7:
sb.append("10010");
break;
case 8:
sb.append("13323567897");
break;
default:
sb.append(1);
for(j=1;j<9;j++){
sb.append(rd1.nextInt(10));
//bo.write(rd1.nextInt(10));
}
break;

}

sb.append("\r\n");
bo.write(sb.toString().getBytes());
}
}

public static void main(String[] args) {
try {
writeToFile("d://helloa.txt");
System.out.println("finish!");
} catch (Exception e) {
e.printStackTrace();
}

}

}


MapReduce程序如下,目前编写的程序参考自Hadoop权威指南,用的还是老版本的API:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FirstTest extends Configured implements Tool{

enum Counter{
LINESKIP,
}

public static class Map extends Mapper<LongWritable, Text, Text, Text>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
String line = value.toString();

try {
String []arr = line.split(" ");
context.write(new Text(arr[1]), new Text(arr[0]));
} catch (Exception e) {
context.getCounter(Counter.LINESKIP).increment(1);
}
}
}

public static class Reduce extends Reducer<Text,Text,Text,Text>{
@Override
public void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException{
String out = "";
for(Text t:values){
out += t.toString()+"|";
}
context.write(key, new Text(out));
}
}

@Override
public int run(String[] args) throws Exception {

Configuration conf = getConf();

Job job = new Job(conf, "First Map-Reduce Program");
job.setJarByClass(getClass());

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(),new FirstTest(), args);
System.exit(exitCode);

}

}


在linux下编译构造jar文件后在hadoop环境运行:

hadoop jar FirstTest.jar /input/helloa.txt /output

注意出现的问题:

1、由于是在Eclipse编写的程序,加了package,但是在Linux下打包时直接使用了jar cvfm abc.jar ..的命令,导致hadoop运行jar包时总提示找不到main class;

2、在linux下编译时,FirstTest.java文件是放在了HADOOP_CLASSPATH下编译,在此目录运行hadoop jar FirstTest.jar /input/helloa.txt /output时提示FirstTest&Map类找不着,将生成的FirstTest.jar放入其他目录后运行正常。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: