Hadoop之自定义格式分隔文件测试笔记
2016-01-25 18:46
513 查看
功能
通过重写FileInputFormat类下的getSplits方法实现自定义格式分隔文件,使用xml格式文件作为样例。注意事项
注:仅测试,非工业版本!仅仅演示如何按自定义格式分隔文件!XmlInputFormat类
package boa.hadoop.xml; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author zhangdapeng * * 2016年1月23日 */ public class XmlInputFormat extends FileInputFormat<LongWritable, Text>{ private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class); public static final String START_TAG_KEY = "xmlinput.start"; public static final String END_TAG_KEY = "xmlinput.end"; private long end = -1; private FSDataInputStream fsin=null; @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { try { return new XmlRecordReader((FileSplit) split, context.getConfiguration()); } catch (IOException ioe) { log.warn("Error while creating XmlRecordReader", ioe); return null; } // return new LineRecordReader(); } @Override public List<InputSplit> getSplits(JobContext job) throws IOException { List<InputSplit> defaultSplits = super.getSplits(job); Configuration conf = job.getConfiguration(); List<InputSplit> result = new ArrayList<InputSplit>(); byte[] startTag = conf.get(START_TAG_KEY).getBytes("UTF-8"); byte[] endTag = conf.get(END_TAG_KEY).getBytes("UTF-8"); long blockStart = -1; long blockEnd = -1; for (InputSplit genericSplit : defaultSplits) { FileSplit fileSplit = (FileSplit) genericSplit; Path file = fileSplit.getPath(); end = fileSplit.getLength(); FileSystem fs = file.getFileSystem(job.getConfiguration()); try { fsin = fs.open(file); while (fsin.getPos() < end) { blockStart = -1; blockEnd = -1; if (readUntilMatch(startTag, false)) { blockStart = fsin.getPos() - startTag.length; if (readUntilMatch(endTag, true)) { blockEnd = fsin.getPos(); System.out.println(blockStart + "---" + blockEnd); } } if (blockStart != -1 && blockEnd != -1 && blockStart < blockEnd) { result.add(new FileSplit(file, blockStart, blockEnd - blockStart, fileSplit.getLocations())); System.out.println("Adding split start = " + blockStart + " end = " + blockEnd); } } } finally { fsin.close(); } } return result; } private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); // end of file: if (b == -1) { return false; } // check if we're matching: if (b == match[i]) { i++; if (i >= match.length) { return true; } } else { i = 0; } // see if we've passed the stop point: if (!withinBlock && i == 0 && fsin.getPos() >= end) { return false; } } } @Override protected boolean isSplitable(JobContext context, Path file) { System.out.println(super.isSplitable(context, file)); return super.isSplitable(context, file); } }
XmlRecordReader类:
package boa.hadoop.xml; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * XMLRecordReader class to read through a given xml document to output xml * blocks as records as specified by the start tag and end tag */ public class XmlRecordReader extends RecordReader<LongWritable, Text> { public static final String START_TAG_KEY = "xmlinput.start"; public static final String END_TAG_KEY = "xmlinput.end"; private final byte[] startTag; private final byte[] endTag; private final long start; private final long end; private final FSDataInputStream fsin; private final DataOutputBuffer buffer = new DataOutputBuffer(); private LongWritable currentKey; private Text currentValue; FileSplit split = null; public XmlRecordReader(FileSplit split, Configuration conf) throws IOException { this.split = split; startTag = conf.get(START_TAG_KEY).getBytes("UTF-8"); endTag = conf.get(END_TAG_KEY).getBytes("UTF-8"); // open the file and seek to the start of the split start = split.getStart(); end = start + split.getLength(); Path file = split.getPath(); FileSystem fs = file.getFileSystem(conf); fsin = fs.open(split.getPath()); fsin.seek(start); } private boolean next(LongWritable key, Text value) throws IOException { if (fsin.getPos() < end && readUntilMatch(startTag, false)) { try { buffer.write(startTag); if (readUntilMatch(endTag, true)) { key.set(fsin.getPos()); // buffer.writeBytes(split.toString()); // System.out.println(split.toString()); value.set(buffer.getData(), 0, buffer.getLength()); return true; } } finally { buffer.reset(); } } return false; } @Override public void close() throws IOException { fsin.close(); } @Override public float getProgress() throws IOException { return (fsin.getPos() - start) / (float) (end - start); } private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); // end of file: if (b == -1) { return false; } // save to buffer: if (withinBlock) { buffer.write(b); } // check if we're matching: if (b == match[i]) { i++; if (i >= match.length) { return true; } } else { i = 0; } // see if we've passed the stop point: if (!withinBlock && i == 0 && fsin.getPos() >= end) { return false; } } } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return currentKey; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return currentValue; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } @Override public boolean nextKeyValue() throws IOException, InterruptedException { currentKey = new LongWritable(); currentValue = new Text(); return next(currentKey, currentValue); } }
HadoopPropertyXMLMapReduce类:
package boa.hadoop.xml; import static javax.xml.stream.XMLStreamConstants.CHARACTERS; import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; import java.io.ByteArrayInputStream; import java.io.IOException; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLStreamReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author zhangdapeng * * 2016年1月23日 */ public final class HadoopPropertyXMLMapReduce { private static final Logger log = LoggerFactory.getLogger(HadoopPropertyXMLMapReduce.class); public static class Map extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String document = value.toString(); System.out.println("'" + document + "'"); try { XMLStreamReader reader = XMLInputFactory.newInstance() .createXMLStreamReader(new ByteArrayInputStream(document.getBytes())); String propertyName = ""; String propertyValue = ""; String currentElement = ""; while (reader.hasNext()) { int code = reader.next(); switch (code) { case START_ELEMENT: currentElement = reader.getLocalName(); break; case CHARACTERS: if (currentElement.equalsIgnoreCase("name")) { propertyName += reader.getText(); } else if (currentElement.equalsIgnoreCase("value")) { propertyValue += reader.getText(); } break; } } reader.close(); context.write(propertyName.trim(), propertyValue.trim()); } catch (Exception e) { log.error("Error processing '" + document + "'", e); } } } public static void main(String... args) throws Exception { // runJob(args[0], args[1]); runJob("/home/hadoop/workspace/core-site.xml", "/home/hadoop/workspace/output"); } public static void runJob(String input, String output) throws Exception { Configuration conf = new Configuration(); conf.set("key.value.separator.in.input.line", " "); conf.set("xmlinput.start", "<property>"); conf.set("xmlinput.end", "</property>"); Job job = Job.getInstance(conf); job.setJarByClass(HadoopPropertyXMLMapReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setInputFormatClass(XmlInputFormat.class); job.setNumReduceTasks(0); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input)); Path outPath = new Path(output); FileOutputFormat.setOutputPath(job, outPath); outPath.getFileSystem(conf).delete(outPath, true); job.waitForCompletion(true); } }
Maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>boa</groupId> <artifactId>hadoop</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>hadoop</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <!--这里要替换成jar包main方法所在类 --> <mainClass>boa.hadoop.Temperature</mainClass> </manifest> <manifestEntries> <Class-Path>.</Class-Path> </manifestEntries> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
测试文件:
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:8020</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/var/lib/hadoop-0.20/cache/${user.name}</value> </property> <!-- OOZIE proxy user setting --> <property> <name>hadoop.proxyuser.oozie.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.oozie.groups</name> <value>*</value> </property> </configuration>
结果
相关文章推荐
- MDX 脚本语句 -- Scope
- 怎么使用 JavaScript 将网站后台的数据变化实时更新到前端?
- Nginx基础入门之nginx.conf配置项相关介绍
- 基于Hadoop 2.6.0运行数字排序的计算
- docker学习(2) mac中docker-machine使用vmware fusion以及配置国内镜像加速
- RedHat Linux 6.x 环境离线配置OpenCV2.4.9
- linux运维实战练习-2016年1月19日-2月3日课程作业
- linux下安装svn
- CentOS7 上安装Hadoop 2.7.2 的安装 和 初步使用
- OpenCL 查看设备信息
- 编译安装lamp
- Docker 1.10 --> 1.11 新网络 overlay 网络
- 树莓派利用360wifi2开机自动连接路由器
- Non-Blocking Channel Operations
- centos6 pxe minimal install
- popwindow 设置动画
- linux 4位权限详解
- linux 时间同步
- Direct Access技术之二:DA架构及原理
- nginx sendfile 参数解释