MapReduce map side join实例
2016-01-06 21:48
831 查看
1.问题描述
现有一张大表(大概有2亿多条记录),存放的是机顶盒用户每天的播放记录,有所看的电视台名称和开始时间,但是没有节目名。还有一张小表(几十万条),数据是用爬虫获取的每天各个电视台的节目单信息。
现在需要将用户的播放记录与节目信息关联起来。即根据用户播放记录中的电视台名和开始时间确定节目名称。
2.reduce side join 还是map side join
由于小表的数据量比较小,完全可以放到内存中去,所以我们采用map side join,在继承了Mapper类的map类中,在setup方法中读取小表数据放到内存中,在map()方法中对每一条大表中的数据进行关联。3.完整代码
main函数public class AddRelation2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf = HBaseConfiguration.create(); conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/core-site.xml")); conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/hdfs-site.xml")); conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/mapred-site.xml")); if(args.length !=1){ System.out.println("1 args <datapath>"); System.exit(2); } String cachefile = "hdfs://bigdata/tvInfo/tvinfo.txt";//缓存小表,以文本形式 Job job = Job.getInstance(conf); job.setJobName("iadd Relation 2"); job.setJarByClass(AddRelation2.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.addCacheFile(new Path(cachefile).toUri()); job.setMapperClass(JoinMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(MultiTableOutputFormat.class); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration()); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true)?0:1); }
Mapper类
public class JoinMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{ static ArrayList<ArrayList<String>> cctv1 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv2 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv3 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv4 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv5 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv6 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv7 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv8 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv9 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv10 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv11 = new ArrayList<ArrayList<String>>(); static ArrayList<ArrayList<String>> cctv12 = new ArrayList<ArrayList<String>>(); public static final String usefulMenu="CCTV1CCTV2CCTV3CCTV4CCTV5CCTV6CCTV7CCTV8CCTV9CCTV10CCTV11CCTV12"; @Override protected void setup(Context context) throws IOException,InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); URI[] localCacheFiles = context.getCacheFiles();//获取缓存中的小表文件 // System.out.println("filename="+localCacheFiles[0]); // System.out.println("filePath="+localCacheFiles[0].getPath()); Path tvinfoSetPath = new Path(localCacheFiles[0]); FileSystem fs= FileSystem.get(conf); FSDataInputStream in = fs.open(tvinfoSetPath); BufferedReader br = new BufferedReader(new InputStreamReader(in)); readCacheFile(br); } private static void readCacheFile( BufferedReader br) throws IOException { BufferedReader reader = br; String line; while ((line = reader.readLine()) != null) { String[] detail = line.split("\\|"); ArrayList<String> temp = new ArrayList<String>(); temp.add(detail[3]+" "+detail[1]); // temp = date + menu temp.add(detail[2]); if(detail[0].equals("CCTV1")){ cctv1.add(temp); }else if(detail[0].equals("CCTV2")){ cctv2.add(temp); }else if(detail[0].equals("CCTV3")){ cctv3.add(temp); }else if(detail[0].equals("CCTV4")){ cctv4.add(temp); }else if(detail[0].equals("CCTV5")){ cctv5.add(temp); }else if(detail[0].equals("CCTV6")){ cctv6.add(temp); }else if(detail[0].equals("CCTV7")){ cctv7.add(temp); }else if(detail[0].equals("CCTV8")){ cctv8.add(temp); }else if(detail[0].equals("CCTV9")){ cctv9.add(temp); }else if(detail[0].equals("CCTV10")){ cctv10.add(temp); }else if(detail[0].equals("CCTV11")){ cctv11.add(temp); }else if(detail[0].equals("CCTV12")){ cctv12.add(temp); } } reader.close(); } public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String values = value.toString(); String detail[] = values.split("\\|"); String channelName = detail[7]; String collectTime = detail[0]; //collect_time String uid = detail[5]; //user_id String channelId = detail[8]; //channel_id String startTime = detail[9]; //start_time //user_id String temp =""; if(uid.length()>=4){ temp = uid.substring(0,4); } ArrayList<ArrayList<String>> cctvTemp = null; if(temp.equals("0531") && !uid.contains("test")){ //check the row belong to ji nan or not if(usefulMenu.contains(channelName)){ if(channelName.equals("CCTV1")){ cctvTemp = cctv1; }else if(channelName.equals("CCTV2")){ cctvTemp = cctv2; }else if(channelName.equals("CCTV3")){ cctvTemp = cctv3; }else if(channelName.equals("CCTV4")){ cctvTemp = cctv4; }else if(channelName.equals("CCTV5")){ cctvTemp = cctv5; }else if(channelName.equals("CCTV6")){ cctvTemp = cctv6; }else if(channelName.equals("CCTV7")){ cctvTemp = cctv7; }else if(channelName.equals("CCTV8")){ cctvTemp = cctv8; }else if(channelName.equals("CCTV9")){ cctvTemp = cctv9; }else if(channelName.equals("CCTV10")){ cctvTemp = cctv10; }else if(channelName.equals("CCTV11")){ cctvTemp = cctv11; }else if(channelName.equals("CCTV12")){ cctvTemp = cctv12; } if(cctvTemp!=null){ String menuName =""; try { menuName = formJoin(detail[9],cctvTemp); } catch (ParseException e) { // TODO Auto-generated catch block System.out.println("string date transform error"); e.printStackTrace(); } //channelid get the last 6 char int channelnum = channelId.length(); String channelId6 =""; if(channelnum>6){ channelId6 = channelId.substring(channelnum-6,channelnum); }else{ channelId6 = channelId; } //starttime get the last 8 char int startTimenum = startTime.length(); String startTime8 =""; if(startTimenum > 8){ startTime8 = startTime.substring(startTimenum-8,startTimenum); }else{ startTime8 = startTime; } byte[] time = Bytes.toBytes(collectTime); String hashPrefix = MD5Hash.getMD5AsHex(time).substring(0,8); //collect_time hash byte[] bytesMD52Date = Bytes.toBytes(hashPrefix); //change md5 to byte[] byte[] uidBytes = Bytes.toBytes(uid); //change user_id to byte[] byte[] channelidBytes = Bytes.toBytes(channelId6); //change the last 6 of channelid to byte[] byte[] starttimeBytes = Bytes.toBytes(startTime8); //change the last 8 of starttime to byte[] byte[] rowKeytemp =Bytes.add(bytesMD52Date, time, uidBytes); //md5+collect_time+user_id byte[] rowKey = Bytes.add(rowKeytemp,channelidBytes,starttimeBytes); //md5+collect_time+user_id+channel_id+start_time Put p1 = new Put(rowKey); p1.add(Bytes.toBytes("cf"), Bytes.toBytes("tvStation"), Bytes.toBytes(channelName)); p1.add(Bytes.toBytes("cf"), Bytes.toBytes("tvMenu"), Bytes.toBytes(menuName)); if(!p1.isEmpty()){ ImmutableBytesWritable ib = new ImmutableBytesWritable(); ib.set(Bytes.toBytes("play_record_file_jn_programme")); context.write(ib, p1); } } } } } public String formJoin(String time,ArrayList<ArrayList<String>> cctvTemp) throws ParseException{ String playTime = time; DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); DateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm"); Date playDate = null; playDate = df1.parse(playTime); long min = 36000000; int index = 0; for(int i=0;i<cctvTemp.size();i++){ String menuTime = cctvTemp.get(i).get(0); Date menuDate = df2.parse(menuTime); long temp = playDate.getTime()-menuDate.getTime(); if(temp>=0 && temp<min){ min = temp; index = i; } } String menuName = cctvTemp.get(index).get(1); return menuName; } }
相关文章推荐
- 2015‘12杭电校赛1003 The collector’s puzzle(双指针水题)
- php实现文件上传
- 车辆管理系统之分析信息建表(一)
- makefile--嵌套执行(四)
- ubuntu ns-3编译
- linux下使用无线网卡的命令行方
- 再见2015,你好2016
- OpenStack 配置
- pragma code_seg
- 在Windows环境下使用MinGW编译Qt 4.8.6
- 堆和栈的区别
- TypeError: window.clipboardData is undefined
- poj3660
- 搜索命令之locate,whereis,which
- SQL循环语句
- ireport各个版本的下载地址
- 解决sqlite删除数据后,文件大小不变问题(VACUUM)
- Qt 工程 pro文件
- linux-scp命令
- face++ c++接口 以及 curl 库的使用,json库在另一个博客介绍