mapreduce——join算法的代码实现
需求:有user数据文件:user.csv
u001,senge,18,angelababy u002,laozhao,48,ruhua u003,xiaoxu,16,chunge u004,laoyang,28,zengge u005,nana,14,huangbo
有订单数据文件:order.dat.1 order.dat.2 order.dat.3
order001,u001 order002,u001 order003,u005 order004,u002 order005,u003 order006,u004
需要对上述两类数据进行连接:
order001,u001,senge,18,angelababy order002,u001,senge,18,angelababy order003,u005,nana,14,huangbo order004,u002,laozhao,48,ruhua order005,u003,xiaoxu,16,chunge order006,u004,laoyang,28,zengge
思路:
map端:
不管worker读到的是什么文件,我们的map方法中是可以通过context来区分的
对于order数据,map中切字段,封装为一个joinbean,打标记:t_order
对于user数据,map中切字段,封装为一个joinbean,打标记:t_user
然后,以uid作为key,以joinbean作为value返回
reduce端:
用迭代器迭代出一组相同uid的所有数据joinbean,然后判断
如果是标记字段为t_order的,则加入一个arraylist<JoinBean>中
如果标记字段为t_user的,则放入一个Joinbean对象中
然后,遍历arraylist,对里面的每一个JoinBean填充userBean中的user数据,然后输出这个joinBean即可
代码实现
JoinBean
public class JoinBean implements Writable { private String orderId; private String userId; private String userName; private int userAge; private String userFriend; private String tableName; public void set(String orderId, String userId, String userName, int userAge, String userFriend,String tableName) { this.orderId = orderId; this.userId = userId; this.userName = userName; this.userAge = userAge; this.userFriend = userFriend; this.tableName=tableName; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public int getUserAge() { return userAge; } public void setUserAge(int userAge) { this.userAge = userAge; } public String getUserFriend() { return userFriend; } public void setUserFriend(String userFriend) { this.userFriend = userFriend; } public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } @Override public String toString() { return this.orderId+","+this.userId+","+this.userAge +","+this.userName+","+this.userFriend; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.orderId); dataOutput.writeUTF(this.userId); dataOutput.writeUTF(this.userName); dataOutput.writeInt(this.userAge); dataOutput.writeUTF(this.userFriend); dataOutput.writeUTF(this.tableName); } public void readFields(DataInput dataInput) throws IOException { this.orderId=dataInput.readUTF(); this.userId=dataInput.readUTF(); this.userName=dataInput.readUTF(); this.userAge=dataInput.readInt(); this.userFriend=dataInput.readUTF(); this.tableName=dataInput.readUTF(); } }
ReduceSideJoin
public class ReduceSideJoin { public static class ReduceSideJoinMapper extends Mapper<LongWritable,Text,Text,JoinBean>{ String fileName =null; JoinBean bean = new JoinBean(); Text k=new Text(); /** * maptask在做数据处理的时候,会先调用一次setup(只会调用一次) * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit) context.getInputSplit(); fileName=inputSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); if (fileName.startsWith("order")){ bean.set(fields[0],fields[1],"NULL",-1,"NULL","order"); }else{ bean.set("NULL",fields[0],fields[1],Integer.parseInt(fields[2]),fields[3],"user"); } k.set(bean.getUserId()); context.write(k,bean); } } public static class ReducerSideJoinReduce extends Reducer<Text,JoinBean,JoinBean,NullWritable>{ @Override protected void reduce(Text key, Iterable<JoinBean> beans, Context context) throws IOException, InterruptedException { ArrayList<JoinBean> orderList = new ArrayList<JoinBean>(); JoinBean userBean=null; try { for (JoinBean bean:beans){ //区分两类数据 if("order".equals(bean.getTableName())){ JoinBean newBean = new JoinBean(); BeanUtils.copyProperties(newBean,bean); orderList.add(newBean); }else{ userBean=new JoinBean(); BeanUtils.copyProperties(userBean,bean); } } //拼接数据userBean数据加入到orderBean for(JoinBean bean:orderList){ bean.setUserAge(userBean.getUserAge()); bean.setUserFriend(userBean.getUserFriend()); bean.setUserName(userBean.getUserName()); context.write(bean,NullWritable.get()); System.out.println(userBean.getUserFriend()); } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //动态获取jar包在哪里 job.setJarByClass(ReduceSideJoin.class); //2.封装参数:本次job所要调用的mapper实现类 job.setMapperClass(ReduceSideJoinMapper.class); job.setReducerClass(ReducerSideJoinReduce.class); //3.封装参数:本次job的Mapper实现类产生的数据key,value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(JoinBean.class); //4.封装参数:本次Reduce返回的key,value数据类型 job.setOutputKeyClass(JoinBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\join\\input")); FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\join\\out")); boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
测试数据
order.txt.1
order001,u001
order002,u001
order003,u005
order004,u002
order005,u003
order006,u004
order.txt.2
order001,u001
order002,u001
order003,u005
order004,u002
order005,u003
order006,u004
user.txt
u001,senge,18,angelababy
u002,laozhao,48,ruhua
u003,xiaoxu,16,chunge
u004,laoyang,28,zengge
u005,nana,14,huangbo
结果输出
order002,u001,18,senge,angelababy
order001,u001,18,senge,angelababy
order002,u001,18,senge,angelababy
order001,u001,18,senge,angelababy
order004,u002,48,laozhao,ruhua
order004,u002,48,laozhao,ruhua
order005,u003,16,xiaoxu,chunge
order005,u003,16,xiaoxu,chunge
order006,u004,28,laoyang,zengge
order006,u004,28,laoyang,zengge
order003,u005,14,nana,huangbo
order003,u005,14,nana,huangbo
(adsbygoogle = window.adsbygoogle || []).push({});
- 仿QQ切换按钮,纯代码实现,告别图片
- asp.net利用对象数组实现xml序列化的代码实例
- JS根据年月获得当月天数的实现代码
- Python 实现网络爬虫 抓取静态网页【代码】
- Master-Detail Application的纯代码实现
- 用.net实现QQ的原代码!
- 算法谜题——夜过吊桥代码实现
- java实现字符串转换成可执行代码
- 100行代码实现了多线程,批量写入,文件分块的日志方法
- PowerShell:30行代码轻松实现SQL Server数据库容量监控
- 基于用户的协同过滤代码具体实现细节
- JQuery获取当前屏幕的高度宽度的实现代码
- 工厂模式及C++代码实现
- js实现局部页面打印预览原理及示例代码
- FreeMarker自定义指令--代码实现
- Group Anagrams java代码实现
- 使用wsimport命令生成webService客户端,实现天气预报,代码实例(源码 DEMO)
- 少侠学代码系列(二)->JS实现
- javascript实现Ajax代码