您的位置:首页 > 职场人生

一道hadoop面试题

2015-08-23 16:51 399 查看
使用Hive或者自定义MR实现如下逻辑

product_no lac_id moment start_time user_id county_id staytime city_id

13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571

13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571

13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571

13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571

13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571

13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571

13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571

13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571

13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571

13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 571

字段解释:

product_no:用户手机号;

lac_id:用户所在基站;

start_time:用户在此基站的开始时间;

staytime:用户在此基站的逗留时间。

需求描述:

根据lac_id和start_time知道用户当时的位置,根据staytime知道用户各个基站的逗留时长。根据轨迹合并连续基站的staytime。

最终得到每一个用户按时间排序在每一个基站驻留时长

期望输出举例:

13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571

13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571

13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571

13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571

13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571

思路:自定义输出key类型为product_no+start_time,先按product_no排序,product_no相同的按start_time排序

自定义分组,把相同product_no的归为一组

组内lac_id相邻的staytime会相加



package hivemr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HiveMR {

	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration(),
				HiveMR.class.getSimpleName());
		job.setJarByClass(HiveMR.class);

		FileInputFormat.setInputPaths(job, new Path(args[1]));
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(MyWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setGroupingComparatorClass(MyGroupingComparator.class);

	job.setReducerClass(MyReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

		FileOutputFormat.setOutputPath(job, new Path(args[2]));

		job.waitForCompletion(true);
	}

	/**
	 * 自定义分组,把相同product_no的归为一组,忽略start_time
	 * long类型占8个字节,因此偏移量为8
	 *
	 */
	private static class MyGroupingComparator implements RawComparator {
		
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {	
			return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
		}

		public int compare(Object o1, Object o2) {
			return 0;
		}
	}

	public static class MyMapper extends
			Mapper<LongWritable, Text, MyWritable, Text> {
		MyWritable k2 = new MyWritable();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, MyWritable, Text>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] split = line.split("\t");

			String product_no = split[0];
			String lac_id = split[1];
			String moment = split[2];
			String start_time = split[3];
			String user_id = split[4];
			String county_id = split[5];
			String staytime = split[6];
			String city_id = split[7];

			k2.set(product_no, start_time);
			context.write(k2, new Text(product_no+ "\t"
					+lac_id + "\t"
					+ moment + "\t" + start_time + "\t" + user_id
					+ "\t" + county_id + "\t" + staytime + "\t" +city_id));

		}
	}

	public static class MyReducer extends Reducer<MyWritable, Text, Text, Text> {

		@Override
		protected void reduce(MyWritable k2, Iterable<Text> v2s,
				Reducer<MyWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			String beforeid = null;
			String start=null;
			int time = 0;
			
			String product_no = null;
			String lac_id = null;
			String moment = null;
			String start_time = null;
			String user_id = null;
			String county_id = null;
			String staytime = null;
			String city_id = null;
			
			for (Text text : v2s) {
				
				String line = text.toString();

				String[] split = line.split("\t");

				product_no = split[0];
				lac_id = split[1];
				moment = split[2];
				start_time = split[3];
				user_id = split[4];
				county_id = split[5];
				staytime = split[6];
				city_id = split[7];
				
				if(lac_id.equals(beforeid)){//若本次的id与上次的相同,则time时间相加,若本次为最后一次,则会在循环结束后输出
					start=start_time;//重新赋值否则时间会有问题(显示上次的?)
					time = time+Integer.parseInt(staytime);
				}
				else if(beforeid==null){//判断是否为空,赋值,若本次为最后一次,则会在循环结束后输出
					start=start_time;
					beforeid=lac_id;
					time = Integer.parseInt(staytime);
				}
				else if(!lac_id.equals(beforeid)){//若本次的id与上次的不同,则先输出上次的,然后赋值,若本次为最后一次,则会在循环结束后输出
					context.write(new Text(product_no), new Text(beforeid + "\t"
							+ moment + "\t" + start + "\t" + user_id
							+ "\t" + county_id + "\t" + time + "\t" +city_id));
					beforeid=lac_id;
					time = Integer.parseInt(staytime);
				}

			}
			context.write(new Text(product_no), new Text(lac_id + "\t"
					+ moment + "\t" + start_time + "\t" + user_id
					+ "\t" + county_id + "\t" + time + "\t" +city_id));
		}
	}

	/**
	 * 自定义输出key类型为product_no+start_time,先按product_no排序,product_no相同的按start_time排序
	 * 这样会导致相同product_no的在分组时,不能分到同一组,因此需自定义分组
	 */
	public static class MyWritable implements WritableComparable<MyWritable> {
		public long product_no;
		public long start_time;
		SimpleDateFormat sdf =   new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
		
		public void write(DataOutput out) throws IOException {
			out.writeLong(product_no);
			out.writeLong(start_time);
		}

		public void set(String product_no, String start_time){
			this.product_no=Long.parseLong(product_no);
			try {
				this.start_time=sdf.parse(start_time.split("\\.")[0]).getTime();
			} catch (ParseException e) {
				e.printStackTrace();
			}
		}

		public void readFields(DataInput in) throws IOException {
			this.product_no=in.readLong();
			this.start_time=in.readLong();
		}

		public int compareTo(MyWritable o) {
			if(this.product_no == o.product_no){
				return -1*(int) (this.start_time - o.start_time);
			}
			
			return (int) (this.product_no - o.product_no);
		}
		
		@Override
		public String toString() {
			return this.product_no+"\t"+this.start_time;
		}

	}
}


结果





内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: