您的位置:首页 > 运维架构

Hadoop之自定义格式分隔文件测试笔记

2016-01-25 18:46 513 查看

功能

通过重写FileInputFormat类下的getSplits方法实现自定义格式分隔文件,使用xml格式文件作为样例。

注意事项

注:仅测试,非工业版本!仅仅演示如何按自定义格式分隔文件!

XmlInputFormat类

package boa.hadoop.xml;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @author zhangdapeng
*
*         2016年1月23日
*/
public class XmlInputFormat extends FileInputFormat<LongWritable, Text>{
private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class);

public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
private long end = -1;
private FSDataInputStream fsin=null;

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
try {
return new XmlRecordReader((FileSplit) split, context.getConfiguration());
} catch (IOException ioe) {
log.warn("Error while creating XmlRecordReader", ioe);
return null;
}
// return new LineRecordReader();
}

@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> defaultSplits = super.getSplits(job);
Configuration conf = job.getConfiguration();
List<InputSplit> result = new ArrayList<InputSplit>();
byte[] startTag = conf.get(START_TAG_KEY).getBytes("UTF-8");
byte[] endTag = conf.get(END_TAG_KEY).getBytes("UTF-8");
long blockStart = -1;
long blockEnd = -1;
for (InputSplit genericSplit : defaultSplits) {
FileSplit fileSplit = (FileSplit) genericSplit;
Path file = fileSplit.getPath();
end = fileSplit.getLength();
FileSystem fs = file.getFileSystem(job.getConfiguration());
try {
fsin = fs.open(file);
while (fsin.getPos() < end) {
blockStart = -1;
blockEnd = -1;
if (readUntilMatch(startTag, false)) {
blockStart = fsin.getPos() - startTag.length;
if (readUntilMatch(endTag, true)) {
blockEnd = fsin.getPos();
System.out.println(blockStart + "---" + blockEnd);
}
}
if (blockStart != -1 && blockEnd != -1 && blockStart < blockEnd) {
result.add(new FileSplit(file, blockStart, blockEnd - blockStart, fileSplit.getLocations()));
System.out.println("Adding split start = " + blockStart + " end = " + blockEnd);
}
}
} finally {
fsin.close();
}

}

return result;
}

private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) {
return false;
}
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) {
return true;
}
} else {
i = 0;
}
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end) {
return false;
}
}
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
System.out.println(super.isSplitable(context, file));
return super.isSplitable(context, file);
}

}


XmlRecordReader类:

package boa.hadoop.xml;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
* XMLRecordReader class to read through a given xml document to output xml
* blocks as records as specified by the start tag and end tag
*/
public class XmlRecordReader extends RecordReader<LongWritable, Text> {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";

private final byte[] startTag;
private final byte[] endTag;
private final long start;
private final long end;
private final FSDataInputStream fsin;
private final DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable currentKey;
private Text currentValue;
FileSplit split = null;

public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
this.split = split;
startTag = conf.get(START_TAG_KEY).getBytes("UTF-8");
endTag = conf.get(END_TAG_KEY).getBytes("UTF-8");
// open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
fsin.seek(start);
}

private boolean next(LongWritable key, Text value) throws IOException {
if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
// buffer.writeBytes(split.toString());
// System.out.println(split.toString());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
return false;
}

@Override
public void close() throws IOException {
fsin.close();
}

@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}

private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) {
return false;
}
// save to buffer:
if (withinBlock) {
buffer.write(b);
}

// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) {
return true;
}
} else {
i = 0;
}
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end) {
return false;
}
}
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
currentKey = new LongWritable();
currentValue = new Text();
return next(currentKey, currentValue);
}
}


HadoopPropertyXMLMapReduce类:

package boa.hadoop.xml;

import static javax.xml.stream.XMLStreamConstants.CHARACTERS;
import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author zhangdapeng
*
* 2016年1月23日
*/
public final class HadoopPropertyXMLMapReduce {
private static final Logger log = LoggerFactory.getLogger(HadoopPropertyXMLMapReduce.class);

public static class Map extends Mapper<LongWritable, Text, Text, Text> {

@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String document = value.toString();
System.out.println("'" + document + "'");
try {
XMLStreamReader reader = XMLInputFactory.newInstance()
.createXMLStreamReader(new ByteArrayInputStream(document.getBytes()));
String propertyName = "";
String propertyValue = "";
String currentElement = "";
while (reader.hasNext()) {
int code = reader.next();
switch (code) {
case START_ELEMENT:
currentElement = reader.getLocalName();
break;
case CHARACTERS:
if (currentElement.equalsIgnoreCase("name")) {
propertyName += reader.getText();
} else if (currentElement.equalsIgnoreCase("value")) {
propertyValue += reader.getText();
}
break;
}
}
reader.close();
context.write(propertyName.trim(), propertyValue.trim());
} catch (Exception e) {
log.error("Error processing '" + document + "'", e);
}
}
}

public static void main(String... args) throws Exception {
// runJob(args[0], args[1]);
runJob("/home/hadoop/workspace/core-site.xml", "/home/hadoop/workspace/output");

}

public static void runJob(String input, String output) throws Exception {
Configuration conf = new Configuration();
conf.set("key.value.separator.in.input.line", " ");
conf.set("xmlinput.start", "<property>");
conf.set("xmlinput.end", "</property>");

Job job = Job.getInstance(conf);
job.setJarByClass(HadoopPropertyXMLMapReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setInputFormatClass(XmlInputFormat.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);

job.waitForCompletion(true);
}
}


Maven

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>boa</groupId>
<artifactId>hadoop</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>hadoop</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--这里要替换成jar包main方法所在类 -->
<mainClass>boa.hadoop.Temperature</mainClass>
</manifest>
<manifestEntries>
<Class-Path>.</Class-Path>
</manifestEntries>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


测试文件:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:8020</value>
</property>

<property>
<name>hadoop.tmp.dir</name>
<value>/var/lib/hadoop-0.20/cache/${user.name}</value>
</property>

<!-- OOZIE proxy user setting -->
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>

</configuration>


结果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: