您的位置:首页 > 其它

MapReduce map side join实例

2016-01-06 21:48 831 查看

1.问题描述

现有一张大表(大概有2亿多条记录),存放的是机顶盒用户每天的播放记录,有所看的电视台名称和开始时间,但是没有节目名。

还有一张小表(几十万条),数据是用爬虫获取的每天各个电视台的节目单信息。

现在需要将用户的播放记录与节目信息关联起来。即根据用户播放记录中的电视台名和开始时间确定节目名称。

2.reduce side join 还是map side join

由于小表的数据量比较小,完全可以放到内存中去,所以我们采用map side join,在继承了Mapper类的map类中,在setup方法中读取小表数据放到内存中,在map()方法中对每一条大表中的数据进行关联。

3.完整代码

main函数

public class AddRelation2 {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = HBaseConfiguration.create();
conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/core-site.xml"));
conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/hdfs-site.xml"));
conf.addResource(new Path("/usr/local/cluster/hadoop/etc/hadoop/mapred-site.xml"));
if(args.length !=1){
System.out.println("1 args <datapath>");
System.exit(2);
}
String cachefile = "hdfs://bigdata/tvInfo/tvinfo.txt";//缓存小表,以文本形式
Job job = Job.getInstance(conf);
job.setJobName("iadd Relation 2");
job.setJarByClass(AddRelation2.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.addCacheFile(new Path(cachefile).toUri());
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration());

job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true)?0:1);

}


Mapper类

public class JoinMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{

static ArrayList<ArrayList<String>> cctv1 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv2 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv3 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv4 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv5 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv6 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv7 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv8 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv9 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv10 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv11 = new ArrayList<ArrayList<String>>();
static ArrayList<ArrayList<String>> cctv12 = new ArrayList<ArrayList<String>>();
public static final String usefulMenu="CCTV1CCTV2CCTV3CCTV4CCTV5CCTV6CCTV7CCTV8CCTV9CCTV10CCTV11CCTV12";

@Override
protected void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
Configuration conf =  context.getConfiguration();
URI[] localCacheFiles = context.getCacheFiles();//获取缓存中的小表文件
//          System.out.println("filename="+localCacheFiles[0]);
//          System.out.println("filePath="+localCacheFiles[0].getPath());
Path tvinfoSetPath = new Path(localCacheFiles[0]);
FileSystem fs= FileSystem.get(conf);
FSDataInputStream in = fs.open(tvinfoSetPath);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
readCacheFile(br);

}
private static void readCacheFile( BufferedReader br) throws IOException {
BufferedReader reader = br;
String line;
while ((line = reader.readLine()) != null) {
String[] detail = line.split("\\|");
ArrayList<String> temp = new ArrayList<String>();
temp.add(detail[3]+" "+detail[1]);                    // temp = date  +  menu
temp.add(detail[2]);
if(detail[0].equals("CCTV1")){
cctv1.add(temp);
}else if(detail[0].equals("CCTV2")){
cctv2.add(temp);
}else if(detail[0].equals("CCTV3")){
cctv3.add(temp);
}else if(detail[0].equals("CCTV4")){
cctv4.add(temp);
}else if(detail[0].equals("CCTV5")){
cctv5.add(temp);
}else if(detail[0].equals("CCTV6")){
cctv6.add(temp);
}else if(detail[0].equals("CCTV7")){
cctv7.add(temp);
}else if(detail[0].equals("CCTV8")){
cctv8.add(temp);
}else if(detail[0].equals("CCTV9")){
cctv9.add(temp);
}else if(detail[0].equals("CCTV10")){
cctv10.add(temp);
}else if(detail[0].equals("CCTV11")){
cctv11.add(temp);
}else if(detail[0].equals("CCTV12")){
cctv12.add(temp);
}
}
reader.close();
}

public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
String values = value.toString();
String detail[] = values.split("\\|");
String channelName = detail[7];
String collectTime = detail[0];   //collect_time
String uid = detail[5];           //user_id
String channelId = detail[8];     //channel_id
String startTime = detail[9];     //start_time
//user_id
String temp ="";
if(uid.length()>=4){
temp = uid.substring(0,4);
}
ArrayList<ArrayList<String>> cctvTemp = null;
if(temp.equals("0531") && !uid.contains("test")){           //check the row belong to ji nan or not
if(usefulMenu.contains(channelName)){
if(channelName.equals("CCTV1")){
cctvTemp = cctv1;
}else if(channelName.equals("CCTV2")){
cctvTemp = cctv2;
}else if(channelName.equals("CCTV3")){
cctvTemp = cctv3;
}else if(channelName.equals("CCTV4")){
cctvTemp = cctv4;
}else if(channelName.equals("CCTV5")){
cctvTemp = cctv5;
}else if(channelName.equals("CCTV6")){
cctvTemp = cctv6;
}else if(channelName.equals("CCTV7")){
cctvTemp = cctv7;
}else if(channelName.equals("CCTV8")){
cctvTemp = cctv8;
}else if(channelName.equals("CCTV9")){
cctvTemp = cctv9;
}else if(channelName.equals("CCTV10")){
cctvTemp = cctv10;
}else if(channelName.equals("CCTV11")){
cctvTemp = cctv11;
}else if(channelName.equals("CCTV12")){
cctvTemp = cctv12;
}

if(cctvTemp!=null){
String menuName ="";
try {
menuName = formJoin(detail[9],cctvTemp);
} catch (ParseException e) {
// TODO Auto-generated catch block
System.out.println("string date transform error");
e.printStackTrace();
}

//channelid get the last 6 char
int channelnum = channelId.length();
String channelId6 ="";
if(channelnum>6){
channelId6 = channelId.substring(channelnum-6,channelnum);
}else{
channelId6 = channelId;
}
//starttime get the last 8 char
int startTimenum = startTime.length();
String startTime8 ="";
if(startTimenum > 8){
startTime8 = startTime.substring(startTimenum-8,startTimenum);
}else{
startTime8 = startTime;
}

byte[] time = Bytes.toBytes(collectTime);
String hashPrefix = MD5Hash.getMD5AsHex(time).substring(0,8);   //collect_time hash
byte[] bytesMD52Date = Bytes.toBytes(hashPrefix);               //change md5 to byte[]
byte[] uidBytes = Bytes.toBytes(uid);                   //change user_id to byte[]
byte[] channelidBytes = Bytes.toBytes(channelId6);      //change the last 6 of channelid to byte[]
byte[] starttimeBytes = Bytes.toBytes(startTime8);      //change the last 8 of starttime to byte[]
byte[] rowKeytemp =Bytes.add(bytesMD52Date, time, uidBytes);  //md5+collect_time+user_id
byte[] rowKey = Bytes.add(rowKeytemp,channelidBytes,starttimeBytes); //md5+collect_time+user_id+channel_id+start_time
Put p1 = new Put(rowKey);
p1.add(Bytes.toBytes("cf"), Bytes.toBytes("tvStation"), Bytes.toBytes(channelName));
p1.add(Bytes.toBytes("cf"), Bytes.toBytes("tvMenu"), Bytes.toBytes(menuName));
if(!p1.isEmpty()){
ImmutableBytesWritable ib = new ImmutableBytesWritable();
ib.set(Bytes.toBytes("play_record_file_jn_programme"));
context.write(ib, p1);
}
}

}
}
}

public String formJoin(String time,ArrayList<ArrayList<String>> cctvTemp) throws ParseException{
String playTime = time;
DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
DateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
Date playDate = null;
playDate = df1.parse(playTime);
long min = 36000000;
int index = 0;
for(int i=0;i<cctvTemp.size();i++){
String menuTime = cctvTemp.get(i).get(0);
Date menuDate = df2.parse(menuTime);
long temp = playDate.getTime()-menuDate.getTime();
if(temp>=0 && temp<min){
min = temp;
index = i;
}
}
String menuName = cctvTemp.get(index).get(1);
return menuName;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: