电话号码上传下载流量的hadoop代码简单实现
2017-05-09 11:22
453 查看
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class DataCount {
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value,Context context){
//key 为一个long型的数字。value为一个字符串,context为上下文的内容
String line = value.toString();
//先把hadoop里面的字符串转换成可以Java识别的字符串
String fileds[] = line.split("/t")
//按照空格分割里面的字符串
String tel = fileds[1]
//拿出里面的电话号码那一行
long up = long.parseLong(fileds[8]);
//电话上网上传流量数据的取出,需要解析成long型的数据
long down = long.parseLong(fileds[9]);
//电话上网下载流量数据的取出。也是需要解析成long型的数据
DataBean bean = new DataBean(tel,up,down)
//新建一个对象。用构造方法来存储需要的显示的信息
电话号码 : 上传流量 :下载流量
context.write(new Text(tel),bean);
//把电话号码作为key 把bean作为value输出
}
//以上为mapper方法
}
public static class DCReducer extends Reducer<Text,DataBean,Text,DataBean>{
@Override
protected void reduce (Text key,Iterable<DataBean>values,Context context){
long up_sum = 0;
long down_sum = 0;
//定义2个计数器
for(DataBean bean :values){
up_sum += bean.getUpPayLoad();
down_sum += bean.getDownPayLoad()
//累加上传和下行流量
}
DataBean bean = new DataBean("",up_sum,down_sum);
context.write(key,bean);
//把电话号码 和bean输出
}
}
public static void mian(String[] args){
Configuration conf = new Configuration()
//新建一个配置文件
Job job = Job.getInstance(conf);
//hadoop里面的自带的函数必须用Job
job.setJar(DataCount.class);
//这句话必须加进去
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
job.setMapperClass(DCMapper.class);
FileInputFormat,setInputPaths(job,new Path(""));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileInputFormat.setOutputPaths(job,new Path(""));
job.wait(true);
}
//下面的是写一个DataBean
//定义一些属性和方法
public class DataInfo implements Writable{
private String tel;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;
public DataInfo(){}
public DataInfo(String tel, long upPayLoad, long downPayLoad) {
this.tel = tel;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = upPayLoad + downPayLoad;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(tel);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
this.tel = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
@Override
public String toString() {
return upPayLoad + "\t" + downPayLoad + "\t" + totalPayLoad;
}
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
}
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class DataCount {
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value,Context context){
//key 为一个long型的数字。value为一个字符串,context为上下文的内容
String line = value.toString();
//先把hadoop里面的字符串转换成可以Java识别的字符串
String fileds[] = line.split("/t")
//按照空格分割里面的字符串
String tel = fileds[1]
//拿出里面的电话号码那一行
long up = long.parseLong(fileds[8]);
//电话上网上传流量数据的取出,需要解析成long型的数据
long down = long.parseLong(fileds[9]);
//电话上网下载流量数据的取出。也是需要解析成long型的数据
DataBean bean = new DataBean(tel,up,down)
//新建一个对象。用构造方法来存储需要的显示的信息
电话号码 : 上传流量 :下载流量
context.write(new Text(tel),bean);
//把电话号码作为key 把bean作为value输出
}
//以上为mapper方法
}
public static class DCReducer extends Reducer<Text,DataBean,Text,DataBean>{
@Override
protected void reduce (Text key,Iterable<DataBean>values,Context context){
long up_sum = 0;
long down_sum = 0;
//定义2个计数器
for(DataBean bean :values){
up_sum += bean.getUpPayLoad();
down_sum += bean.getDownPayLoad()
//累加上传和下行流量
}
DataBean bean = new DataBean("",up_sum,down_sum);
context.write(key,bean);
//把电话号码 和bean输出
}
}
public static void mian(String[] args){
Configuration conf = new Configuration()
//新建一个配置文件
Job job = Job.getInstance(conf);
//hadoop里面的自带的函数必须用Job
job.setJar(DataCount.class);
//这句话必须加进去
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
job.setMapperClass(DCMapper.class);
FileInputFormat,setInputPaths(job,new Path(""));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileInputFormat.setOutputPaths(job,new Path(""));
job.wait(true);
}
//下面的是写一个DataBean
//定义一些属性和方法
public class DataInfo implements Writable{
private String tel;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;
public DataInfo(){}
public DataInfo(String tel, long upPayLoad, long downPayLoad) {
this.tel = tel;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = upPayLoad + downPayLoad;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(tel);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
this.tel = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
@Override
public String toString() {
return upPayLoad + "\t" + downPayLoad + "\t" + totalPayLoad;
}
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
}
相关文章推荐
- 通过linux的tc工具简单实现上传和下载的流量控制
- Spring MVC代码实例系列-10:Spring MVC实现简单的文件上传和下载
- JavaWeb 文件的上传和下载功能简单实现代码
- Asp.net 2.0 用 FileUpload 控件实现多文件上传 用户控件(示例代码下载)
- Asp.net 2.0 用 FileUpload 控件实现多文件上传 用户控件(示例代码下载)
- 一个简单的FTP客户端 实现上传下载
- [导入]AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- Asp.net 2.0 用 FileUpload 控件实现多文件上传 用户控件(示例代码下载)
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- [导入]AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- Asp.net 2.0 用 FileUpload 控件实现多文件上传 用户控件(示例代码下载)
- php 实现简单的图片上传代码
- Asp.net 2.0 用 FileUpload 控件实现多文件上传 用户控件(示例代码下载)
- Asp.net中文件上传下载的简单实现
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- AjaxPanel自定义控件实现页面无刷新数据交互(做了个示例程序, 效果确实比较Cool, 用法非常简单! )(示例代码下载)
- Asp.net 2.0 用 FileUpload 控件实现多文件上传 用户控件(示例代码下载)