您的位置:首页 > 运维架构

Hadoop & Hadoop Streaming 自定义输出格式

2013-09-23 22:30 357 查看
    在用Hadoop处理大量的日志文件的时候,有时候会将错误的或者不符合要求的日志输出到另外一个目录,以备后来进行查验,这里给出个简单的例子,并简单说明下如何在Hadoop Streaming中使用这种方法将错误格式的日志输出到自定义的路径。

    例子中类MultiFilesOutput继承自MultipleTextOutputFormat<Text, Text>,可以自己定义不同的keyType,然后输出到不同的目录下。

package MultiFormats;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

/**
*
* @author xyl
*
*/
public class MultiFilesOutput extends MultipleTextOutputFormat<Text, Text> {

/**
* Use the key as part of the path for the final output results
* now we just define three types
* err_log: the log format we can't parse
* err_oth: the other log format
* pvnormal: the right log format for output pv result
* uvnormal: the right log format for output uvresult
*/
@Override
protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
String keyType = new String(key.toString());
if(keyType.equals("err_log"))
{
return new Path("err_log",leaf).toString();
}
if(keyType.equals("err_oth"))
{
return new Path("err_oth",leaf).toString();
}

/*
* return new Path("normal",leaf).toString();
*/
if(keyType.equals("pvnormal"))
{
return new Path("pvnormal",leaf).toString();
}
return new Path("uvnormal", leaf).toString();
}

/**
* When actually writing the data, discard the key since it is already in
* the file path.
*/
@Override
protected Text generateActualKey(Text key, Text value) {
return null;
}
}
</text></text>


 在Hadoop Streaming,使用扩展的jar文件格式如下:

INPUT=/user/flume/${PATH}
OUTPUT=/user/dplog/errlog/${DATE}

#JOBNAME: displayed mapreduce jobname, better add job owner name
JOBNAME="${DATE}-pvTest""(xyl)"

######################################################################
# Check if output directory aready exists
# --------------------------------------------------------------------
#if $(/opt/hadoop/bin/hadoop fs -test -d ${OUTPUT})
#then
#    echo "Output Directory" $OUTPUT "already exists."
# Other action code can be added at here
#    exit 100
#else
#    echo "Output Directory will be created!"
#fi
/opt/hadoop/bin/hadoop fs -rmr ${OUTPUT}
######################################################################
# Main hadoop streaming process
# --------------------------------------------------------------------
/opt/hadoop/bin/hadoop jar ${STREAMING_JAR} \
-libjars /opt/home/hadoop/xyl/Jars/CustomOutputFormats.jar \
-Dmapred.job.name=${JOBNAME} \
-Dmapred.reduce.tasks=1 \
-input ${INPUT} \
-output ${OUTPUT} \
-mapper mapper.py \
-reducer reducer.py \
-file /opt/home/hadoop/xyl/mappers/mapper.py \
-file /opt/home/hadoop/xyl/reducers/reducer.py \
-outputformat MultiFormats.MultiFilesOutput \
;


其中CustomOutputFormats.jar 为生成的jar文件名称,MultiFormats.MultiFilesOutput对应于package name和类名。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息