Mapreduce TopK
2015-06-01 10:22
351 查看
思想比较简单,就是每个通过map来获取当前的数据块中的的topk个数据,然后将他们以相同的key值放到reduce中,最后通过reduce来对这n*k个数据排序并获得topk个数据。具体的就是建立一个k个大小的数组,一开始初始化为都是100(假定这里的100是最大的数),然后往里面插数据小的数据即可。
PS:有几个小细节以及当时写代码的时候出错的地方。
1 map和reduce都是在每个键值对来的时候会被调用。当时觉得应该把这k的数组放在哪,以及怎么初始化。如果放在map方法里面,那每次都会被初始化,岂不是白搞了。如果把这数组当作局部变量,那肯定是不行的,因为当作局部变量就无法实现存放k个数据了。只能存放当前的数据。后来查了资料发现,有个setup这个函数,就是用于mapper中的某些数据的初始化,这样就可以把数组作为mapper的属性,然后在setup中进行初始化了。
2 当我全部遍历完这个数据分片的数据后,并且已经获得了当前mapper中的topk了,我如何把数据传到reducer呢,最理想的就是在遍历完后才把数据发送过去,但是以前都是处理一个键值对就发送一个,然后查了下,发现有个cleanup函数,就是用于mapper或者reducer结束后用的,那么就可以通过这个函数来发送键值对了。
3 这是个逻辑上的问题,我这里的topk是选最小的几个,然后当时写的是,先将数组排序,然后从前往后查询,如果发现value<list[i]那么就将该数组中数据替换,但是这个有问题,例如有这样的
45
21
75
94
1
34
56
7
67
按照我一开始的逻辑是,
45,100,100
21,100,100显然这一步就错了,应该是21,45,100所以应该是从后往前的查询,每次查询能替换的最大的数据,而不是从前往后的查询替换最小的数据
具体代码:
Map
[align=left] public void setup(Context context){[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] list =new int[k];[/align]
[align=left] for (int i=0;i<k;i++){[/align]
[align=left] list [i]=100;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] public void cleanup(Context context) throws IOException, InterruptedException{[/align]
[align=left] for (int i=0;i< list. length ;i++){[/align]
[align=left] context.write( new IntWritable(0), new IntWritable( list[i]));[/align]
[align=left] System. out .println(" ");[/align]
[align=left] System. out .println("map is " + list[i]);[/align]
[align=left] System. out .println(" ");[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] public void map(LongWritable ikey, Text ivalue, Context context)[/align]
[align=left] throws IOException, InterruptedException {[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] int value=Integer.parseInt(ivalue.toString());[/align]
[align=left] [/align]
[align=left] Arrays. sort( list);[/align]
[align=left] System. out .println(" ");[/align]
[align=left] System. out .println("n is " + n);[/align]
[align=left] System. out .println(" ");[/align]
[align=left] [/align]
[align=left] for (int j=k-1;j>=0;j--){[/align]
[align=left] if (value<list [j]){[/align]
[align=left] list [j]=value;[/align]
[align=left] break ;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] [/align]
[align=left] }[/align]
[align=left] [/align]
[align=left]}[/align]
Reducer
[align=left] public void setup(Context context){[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] list =new int[k];[/align]
[align=left] for (int i=0;i<k;i++){[/align]
[align=left] list [i]=100;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] public void cleanup(Context context) throws IOException, InterruptedException{[/align]
[align=left] Arrays. sort( list);[/align]
[align=left] for (int i=0;i< list. length ;i++){[/align]
[align=left] context.write( new IntWritable(i), new IntWritable( list[i]));[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] [/align]
[align=left] public void reduce(IntWritable _key, Iterable<IntWritable> values, Context context)[/align]
[align=left] throws IOException, InterruptedException {[/align]
[align=left] // process values[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] for (IntWritable val : values) {[/align]
[align=left] /*[/align]
[align=left] System.out.println(" ");[/align]
[align=left] System.out.println("value is "+val.get());[/align]
[align=left] System.out.println(" ");[/align]
[align=left] */[/align]
[align=left] Arrays. sort( list);[/align]
[align=left] [/align]
[align=left] for (int j=k-1;j>=0;j--){[/align]
[align=left] if (val.get()<list [j]){[/align]
[align=left] list [j]=val.get();[/align]
[align=left] break ;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] [/align]
[align=left] }[/align]
[align=left] }[/align]
PS:有几个小细节以及当时写代码的时候出错的地方。
1 map和reduce都是在每个键值对来的时候会被调用。当时觉得应该把这k的数组放在哪,以及怎么初始化。如果放在map方法里面,那每次都会被初始化,岂不是白搞了。如果把这数组当作局部变量,那肯定是不行的,因为当作局部变量就无法实现存放k个数据了。只能存放当前的数据。后来查了资料发现,有个setup这个函数,就是用于mapper中的某些数据的初始化,这样就可以把数组作为mapper的属性,然后在setup中进行初始化了。
2 当我全部遍历完这个数据分片的数据后,并且已经获得了当前mapper中的topk了,我如何把数据传到reducer呢,最理想的就是在遍历完后才把数据发送过去,但是以前都是处理一个键值对就发送一个,然后查了下,发现有个cleanup函数,就是用于mapper或者reducer结束后用的,那么就可以通过这个函数来发送键值对了。
3 这是个逻辑上的问题,我这里的topk是选最小的几个,然后当时写的是,先将数组排序,然后从前往后查询,如果发现value<list[i]那么就将该数组中数据替换,但是这个有问题,例如有这样的
45
21
75
94
1
34
56
7
67
按照我一开始的逻辑是,
45,100,100
21,100,100显然这一步就错了,应该是21,45,100所以应该是从后往前的查询,每次查询能替换的最大的数据,而不是从前往后的查询替换最小的数据
具体代码:
Map
[align=left] public void setup(Context context){[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] list =new int[k];[/align]
[align=left] for (int i=0;i<k;i++){[/align]
[align=left] list [i]=100;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] public void cleanup(Context context) throws IOException, InterruptedException{[/align]
[align=left] for (int i=0;i< list. length ;i++){[/align]
[align=left] context.write( new IntWritable(0), new IntWritable( list[i]));[/align]
[align=left] System. out .println(" ");[/align]
[align=left] System. out .println("map is " + list[i]);[/align]
[align=left] System. out .println(" ");[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] public void map(LongWritable ikey, Text ivalue, Context context)[/align]
[align=left] throws IOException, InterruptedException {[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] int value=Integer.parseInt(ivalue.toString());[/align]
[align=left] [/align]
[align=left] Arrays. sort( list);[/align]
[align=left] System. out .println(" ");[/align]
[align=left] System. out .println("n is " + n);[/align]
[align=left] System. out .println(" ");[/align]
[align=left] [/align]
[align=left] for (int j=k-1;j>=0;j--){[/align]
[align=left] if (value<list [j]){[/align]
[align=left] list [j]=value;[/align]
[align=left] break ;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] [/align]
[align=left] }[/align]
[align=left] [/align]
[align=left]}[/align]
Reducer
[align=left] public void setup(Context context){[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] list =new int[k];[/align]
[align=left] for (int i=0;i<k;i++){[/align]
[align=left] list [i]=100;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] public void cleanup(Context context) throws IOException, InterruptedException{[/align]
[align=left] Arrays. sort( list);[/align]
[align=left] for (int i=0;i< list. length ;i++){[/align]
[align=left] context.write( new IntWritable(i), new IntWritable( list[i]));[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] [/align]
[align=left] public void reduce(IntWritable _key, Iterable<IntWritable> values, Context context)[/align]
[align=left] throws IOException, InterruptedException {[/align]
[align=left] // process values[/align]
[align=left] Configuration conf=context.getConfiguration();[/align]
[align=left] int k=Integer.parseInt(conf.get( "k" ));[/align]
[align=left] for (IntWritable val : values) {[/align]
[align=left] /*[/align]
[align=left] System.out.println(" ");[/align]
[align=left] System.out.println("value is "+val.get());[/align]
[align=left] System.out.println(" ");[/align]
[align=left] */[/align]
[align=left] Arrays. sort( list);[/align]
[align=left] [/align]
[align=left] for (int j=k-1;j>=0;j--){[/align]
[align=left] if (val.get()<list [j]){[/align]
[align=left] list [j]=val.get();[/align]
[align=left] break ;[/align]
[align=left] }[/align]
[align=left] }[/align]
[align=left] [/align]
[align=left] [/align]
[align=left] }[/align]
[align=left] }[/align]
相关文章推荐
- Linux 常用命令
- centos 终端 字体颜色
- popwind 如何使背景变暗
- eclipse配置tomcat7
- linux 后台运行,终止
- Linux 列出某个进程的具体的启动目录
- shell 日期加减运算
- [linux]文件的特殊权限 SetUid SetGid StickBit
- 国内外域名商.xyz域名总量TOP10统计报告(5月29日)
- 添加Linux系统调用(ubuntu, 3.13.0)
- linux手动安装sbt过程
- 使用python实现linux下守护进程(初学,瑕疵多)
- Web Deploy发布网站及常见问题解决方法(图文)
- Linux and Unix ip command
- kafka的监控工具--kafka web console安装
- IsoHash( Isotropic Hashing)
- 在 Linux 下体验谷歌 Material风格的GTK和图标主题Paper
- 在 Linux 下体验谷歌 Material风格的GTK和图标主题Paper
- storm自带例子详解 (三)——ExclamationTopology
- openwrt下安装和配置ser2net