您的位置:首页 > 产品设计 > UI/UE

如何自定义mapreduce在输出的key和value之间自动添加的分隔符

2014-07-23 13:23 405 查看
背景:我在用mahout做一个推荐系统,RecommenderJob要求的数据格式是:userId,itemId,preference。但是我在调用mahout来做推荐之前写有mapreduce数据处理操作,mapreduce输出的格式中有tab分隔符,本来我想把最终结果只放到key或value用NullWritable来实现,可是感觉不好使,所以只能换以下的方式来实现。

conf.set("mapred.textoutputformat.ignoreseparator", "true");
conf.set("mapred.textoutputformat.separator", ",");
由于公司的代码不能随便透露,所以我只能以wordcount为例了

package com.panguoyuan.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception{
		System.out.println(2);
		Configuration conf = new Configuration();
		conf.set("mapred.textoutputformat.ignoreseparator", "true");
		conf.set("mapred.textoutputformat.separator", ",");
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: wordcount <in> <out>");
	      System.exit(2);
	    }
//	    conf.set("fs.hdfs.impl", 
//	            org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
//	    	    );
	    Job job = new Job(conf, "word count");
	    job.setJarByClass(WordCount.class);
	    job.setMapperClass(TokenizerMapper.class);
	    job.setCombinerClass(IntSumReducer.class);
	    job.setReducerClass(IntSumReducer.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(IntWritable.class);
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


输出的结果

2
log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.DailyRollingFileAppender.
log4j:WARN No such property [maxFileSize] in org.apache.log4j.DailyRollingFileAppender.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/F:/CDH4/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/F:/workspace1/recommendation/lib/mahout-examples-0.7-cdh4.1.2-job.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
[13:23:44,251][ WARN][main][org.apache.hadoop.conf.Configuration:808] - session.id is deprecated. Instead, use dfs.metrics.session-id
[13:23:44,261][ INFO][main][org.apache.hadoop.metrics.jvm.JvmMetrics:76] - Initializing JVM Metrics with processName=JobTracker, sessionId=
[13:23:44,356][ WARN][main][org.apache.hadoop.util.NativeCodeLoader:62] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[13:23:44,859][ WARN][main][org.apache.hadoop.mapred.JobClient:704] - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
[13:23:45,028][ WARN][main][org.apache.hadoop.mapred.JobClient:830] - No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
[13:23:45,164][ INFO][main][org.apache.hadoop.mapreduce.lib.input.FileInputFormat:233] - Total input paths to process : 1
[13:23:46,269][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:192] - OutputCommitter set in config null
[13:23:46,270][ INFO][main][org.apache.hadoop.mapred.JobClient:1386] - Running job: job_local_0001
[13:23:46,298][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:210] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
[13:23:46,371][ WARN][Thread-18][mapreduce.Counters:224] - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
[13:23:46,415][ INFO][Thread-18][org.apache.hadoop.mapred.Task:533] -  Using ResourceCalculatorPlugin : null
[13:23:46,439][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:792] - io.sort.mb = 100
[13:23:46,555][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:804] - data buffer = 79691776/99614720
[13:23:46,556][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:805] - record buffer = 262144/327680
[13:23:46,761][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - 
[13:23:46,785][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:1131] - Starting flush of map output
[13:23:47,272][ INFO][main][org.apache.hadoop.mapred.JobClient:1399] -  map 0% reduce 0%
[13:23:48,580][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:1311] - Finished spill 0
[13:23:48,589][ INFO][Thread-18][org.apache.hadoop.mapred.Task:846] - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
[13:23:48,962][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - 
[13:23:48,964][ INFO][Thread-18][org.apache.hadoop.mapred.Task:958] - Task 'attempt_local_0001_m_000000_0' done.
[13:23:49,021][ WARN][Thread-18][mapreduce.Counters:224] - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead
[13:23:49,184][ INFO][Thread-18][org.apache.hadoop.mapred.Task:533] -  Using ResourceCalculatorPlugin : null
[13:23:49,184][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - 
[13:23:49,191][ INFO][Thread-18][org.apache.hadoop.mapred.Merger:390] - Merging 1 sorted segments
[13:23:49,331][ INFO][Thread-18][org.apache.hadoop.mapred.Merger:473] - Down to the last merge-pass, with 1 segments left of total size: 1546 bytes
[13:23:49,332][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - 
[13:23:49,337][ INFO][main][org.apache.hadoop.mapred.JobClient:1399] -  map 100% reduce 0%
[13:23:49,436][ INFO][Thread-18][org.apache.hadoop.mapred.Task:846] - Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
[13:23:49,439][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - 
[13:23:49,440][ INFO][Thread-18][org.apache.hadoop.mapred.Task:999] - Task attempt_local_0001_r_000000_0 is allowed to commit now
[13:23:49,574][ INFO][Thread-18][org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter:173] - Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://10.71.197.94:8020/op1
[13:23:49,574][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - reduce > reduce
[13:23:49,576][ INFO][Thread-18][org.apache.hadoop.mapred.Task:958] - Task 'attempt_local_0001_r_000000_0' done.
[13:23:50,337][ INFO][main][org.apache.hadoop.mapred.JobClient:1399] -  map 100% reduce 100%
[13:23:50,337][ INFO][main][org.apache.hadoop.mapred.JobClient:1454] - Job complete: job_local_0001
[13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:566] - Counters: 22
[13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:568] -   File System Counters
[13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     FILE: Number of bytes read=1898
[13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     FILE: Number of bytes written=173904
[13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     FILE: Number of read operations=0
[13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     FILE: Number of large read operations=0
[13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     FILE: Number of write operations=0
[13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     HDFS: Number of bytes read=2188
[13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     HDFS: Number of bytes written=1244
[13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     HDFS: Number of read operations=11
[13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     HDFS: Number of large read operations=0
[13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     HDFS: Number of write operations=3
[13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:568] -   Map-Reduce Framework
[13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Map input records=75
[13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Map output records=75
[13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Map output bytes=1394
[13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Input split bytes=101
[13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Combine input records=75
[13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Combine output records=75
[13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Reduce input groups=75
[13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Reduce shuffle bytes=0
[13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Reduce input records=75
[13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Reduce output records=75
[13:23:50,343][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Spilled Records=150
[13:23:50,343][ INFO][main][org.apache.hadoop.mapred.JobClient:570] -     Total committed heap usage (bytes)=1065484288
[root@master 2014-07-10]# hadoop fs -cat /op1/part-r-00000 |more
abrt:x:173:,1
adm:x:4:root,adm,daemon,1
apache:x:48:,1
audio:x:63:,1
avahi-autoipd:x:170:,1
avahi:x:70:,1
bin:x:1:root,bin,daemon,1
cdrom:x:11:,1
cloudera-scm:x:490:,1
daemon:x:2:root,bin,daemon,1
dbus:x:81:,1
desktop_admin_r:x:499:,1
desktop_user_r:x:498:,1
dialout:x:18:,1
dip:x:40:,1
disk:x:6:root,1
floppy:x:19:,1
flume:x:480:,1
ftp:x:50:,1
fuse:x:493:,1
games:x:20:,1
gdm:x:42:,1
gopher:x:30:,1
hadoop:x:502:hdfs,mapred,yarn,1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐