如何自定义mapreduce在输出的key和value之间自动添加的分隔符
2014-07-23 13:23
405 查看
背景:我在用mahout做一个推荐系统,RecommenderJob要求的数据格式是:userId,itemId,preference。但是我在调用mahout来做推荐之前写有mapreduce数据处理操作,mapreduce输出的格式中有tab分隔符,本来我想把最终结果只放到key或value用NullWritable来实现,可是感觉不好使,所以只能换以下的方式来实现。
输出的结果
conf.set("mapred.textoutputformat.ignoreseparator", "true"); conf.set("mapred.textoutputformat.separator", ",");由于公司的代码不能随便透露,所以我只能以wordcount为例了
package com.panguoyuan.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception{ System.out.println(2); Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.ignoreseparator", "true"); conf.set("mapred.textoutputformat.separator", ","); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } // conf.set("fs.hdfs.impl", // org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() // ); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
输出的结果
2 log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.DailyRollingFileAppender. log4j:WARN No such property [maxFileSize] in org.apache.log4j.DailyRollingFileAppender. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/F:/CDH4/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/F:/workspace1/recommendation/lib/mahout-examples-0.7-cdh4.1.2-job.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. [13:23:44,251][ WARN][main][org.apache.hadoop.conf.Configuration:808] - session.id is deprecated. Instead, use dfs.metrics.session-id [13:23:44,261][ INFO][main][org.apache.hadoop.metrics.jvm.JvmMetrics:76] - Initializing JVM Metrics with processName=JobTracker, sessionId= [13:23:44,356][ WARN][main][org.apache.hadoop.util.NativeCodeLoader:62] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [13:23:44,859][ WARN][main][org.apache.hadoop.mapred.JobClient:704] - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. [13:23:45,028][ WARN][main][org.apache.hadoop.mapred.JobClient:830] - No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). [13:23:45,164][ INFO][main][org.apache.hadoop.mapreduce.lib.input.FileInputFormat:233] - Total input paths to process : 1 [13:23:46,269][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:192] - OutputCommitter set in config null [13:23:46,270][ INFO][main][org.apache.hadoop.mapred.JobClient:1386] - Running job: job_local_0001 [13:23:46,298][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:210] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter [13:23:46,371][ WARN][Thread-18][mapreduce.Counters:224] - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead [13:23:46,415][ INFO][Thread-18][org.apache.hadoop.mapred.Task:533] - Using ResourceCalculatorPlugin : null [13:23:46,439][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:792] - io.sort.mb = 100 [13:23:46,555][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:804] - data buffer = 79691776/99614720 [13:23:46,556][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:805] - record buffer = 262144/327680 [13:23:46,761][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - [13:23:46,785][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:1131] - Starting flush of map output [13:23:47,272][ INFO][main][org.apache.hadoop.mapred.JobClient:1399] - map 0% reduce 0% [13:23:48,580][ INFO][Thread-18][org.apache.hadoop.mapred.MapTask:1311] - Finished spill 0 [13:23:48,589][ INFO][Thread-18][org.apache.hadoop.mapred.Task:846] - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting [13:23:48,962][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - [13:23:48,964][ INFO][Thread-18][org.apache.hadoop.mapred.Task:958] - Task 'attempt_local_0001_m_000000_0' done. [13:23:49,021][ WARN][Thread-18][mapreduce.Counters:224] - Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead [13:23:49,184][ INFO][Thread-18][org.apache.hadoop.mapred.Task:533] - Using ResourceCalculatorPlugin : null [13:23:49,184][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - [13:23:49,191][ INFO][Thread-18][org.apache.hadoop.mapred.Merger:390] - Merging 1 sorted segments [13:23:49,331][ INFO][Thread-18][org.apache.hadoop.mapred.Merger:473] - Down to the last merge-pass, with 1 segments left of total size: 1546 bytes [13:23:49,332][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - [13:23:49,337][ INFO][main][org.apache.hadoop.mapred.JobClient:1399] - map 100% reduce 0% [13:23:49,436][ INFO][Thread-18][org.apache.hadoop.mapred.Task:846] - Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting [13:23:49,439][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - [13:23:49,440][ INFO][Thread-18][org.apache.hadoop.mapred.Task:999] - Task attempt_local_0001_r_000000_0 is allowed to commit now [13:23:49,574][ INFO][Thread-18][org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter:173] - Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://10.71.197.94:8020/op1 [13:23:49,574][ INFO][Thread-18][org.apache.hadoop.mapred.LocalJobRunner:370] - reduce > reduce [13:23:49,576][ INFO][Thread-18][org.apache.hadoop.mapred.Task:958] - Task 'attempt_local_0001_r_000000_0' done. [13:23:50,337][ INFO][main][org.apache.hadoop.mapred.JobClient:1399] - map 100% reduce 100% [13:23:50,337][ INFO][main][org.apache.hadoop.mapred.JobClient:1454] - Job complete: job_local_0001 [13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:566] - Counters: 22 [13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:568] - File System Counters [13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - FILE: Number of bytes read=1898 [13:23:50,339][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - FILE: Number of bytes written=173904 [13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - FILE: Number of read operations=0 [13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - FILE: Number of large read operations=0 [13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - FILE: Number of write operations=0 [13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - HDFS: Number of bytes read=2188 [13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - HDFS: Number of bytes written=1244 [13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - HDFS: Number of read operations=11 [13:23:50,340][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - HDFS: Number of large read operations=0 [13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - HDFS: Number of write operations=3 [13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:568] - Map-Reduce Framework [13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Map input records=75 [13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Map output records=75 [13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Map output bytes=1394 [13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Input split bytes=101 [13:23:50,341][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Combine input records=75 [13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Combine output records=75 [13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Reduce input groups=75 [13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Reduce shuffle bytes=0 [13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Reduce input records=75 [13:23:50,342][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Reduce output records=75 [13:23:50,343][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Spilled Records=150 [13:23:50,343][ INFO][main][org.apache.hadoop.mapred.JobClient:570] - Total committed heap usage (bytes)=1065484288
[root@master 2014-07-10]# hadoop fs -cat /op1/part-r-00000 |more abrt:x:173:,1 adm:x:4:root,adm,daemon,1 apache:x:48:,1 audio:x:63:,1 avahi-autoipd:x:170:,1 avahi:x:70:,1 bin:x:1:root,bin,daemon,1 cdrom:x:11:,1 cloudera-scm:x:490:,1 daemon:x:2:root,bin,daemon,1 dbus:x:81:,1 desktop_admin_r:x:499:,1 desktop_user_r:x:498:,1 dialout:x:18:,1 dip:x:40:,1 disk:x:6:root,1 floppy:x:19:,1 flume:x:480:,1 ftp:x:50:,1 fuse:x:493:,1 games:x:20:,1 gdm:x:42:,1 gopher:x:30:,1 hadoop:x:502:hdfs,mapred,yarn,1
相关文章推荐
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- mapreduce 自定义key/value 输出分隔符 (mapreduce输出结果key和value有空格怎么办?)
- Hadoop的mapper输出中key和value之间的分隔符