您的位置:首页 > 产品设计 > UI/UE

Hadoop Core 学习笔记(一) SequenceFile文件写入和读取Writable数据

2012-02-01 14:55 609 查看
本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blogs/1265944

     刚接触Hadoop时,对SequenceFile和Writable还产生了一点联想,以为是什么神奇的东西.后来也明白,不过就是自己IO的一些协议,用于自己的输入输出.这里介绍下如何从sequence file中读出和写入Writable数据.

     Writable类似传输的数据,相对于Java来说等同于对象,只是引用到Hadoop中需要一套协议去进行传输转换这个对象.于是有了里面的 public void write(DataOutput out) throws IOException 和public void readFields(DataInput in) throws IOException方法,一个怎么写入,一个怎么读取.如此这些对象才可以在整个Hadoop集群无障碍的通行.至于Hadoop为什么要另起炉灶自己搞一套序列化的东西,之前也看过一些介绍,但还没有心得,日后再慢慢领会.

      所以这个例子就是自己构造一个Writable对象,然后写入到sequence file以及读出.最后将读出的数据进行对比,是否正确.具体看代码吧:

 

 

package com.guoyun.hadoop.io.study;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileStudy {

public static class UserWritable implements Writable,Comparable{
private long userId;
private String userName;
private int userAge;

public long getUserId() {
return userId;
}

public void setUserId(long userId) {
this.userId = userId;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public int getUserAge() {
return userAge;
}

public void setUserAge(int userAge) {
this.userAge = userAge;
}

public UserWritable(long userId, String userName, int userAge) {
super();
this.userId = userId;
this.userName = userName;
this.userAge = userAge;
}

public UserWritable() {
super();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.userId);
out.writeUTF(this.userName);
out.writeInt(this.userAge);
}

@Override
public void readFields(DataInput in) throws IOException {
this.userId=in.readLong();
this.userName=in.readUTF();
this.userAge=in.readInt();
}

@Override
public String toString() {
return this.userId+"\t"+this.userName+"\t"+this.userAge;
}

/**
* 只对比userId
*/
@Override
public boolean equals(Object obj) {
if(obj instanceof UserWritable){
UserWritable u1=(UserWritable)obj;
return u1.getUserId()==this.getUserId();
}
return false;
}

/**
* 只对比userId
*/
@Override
public int compareTo(Object obj) {
int result=-1;
if(obj instanceof UserWritable){
UserWritable u1=(UserWritable)obj;
if(this.userId>u1.userId){
result=1;
}else if(this.userId==u1.userId){
result=1;
}
}
return result;
}

@Override
public int hashCode() {
return (int)this.userId&Integer.MAX_VALUE;
}

}

/**
* 写入到sequence file
*
* @param filePath
* @param conf
* @param datas
*/
public static void write2SequenceFile(String filePath,Configuration conf,Collection<UserWritable> datas){
FileSystem fs=null;
SequenceFile.Writer writer=null;
Path path=null;
LongWritable idKey=new LongWritable(0);

try {
fs=FileSystem.get(conf);
path=new Path(filePath);
writer=SequenceFile.createWriter(fs, conf, path, LongWritable.class, UserWritable.class);

for(UserWritable user:datas){
idKey.set(user.getUserId());  // userID为Key
writer.append(idKey, user);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
IOUtils.closeStream(writer);
}
}

/**
* 从sequence file文件中读取数据
*
* @param sequeceFilePath
* @param conf
* @return
*/
public static List<UserWritable> readSequenceFile(String sequeceFilePath,Configuration conf){
List<UserWritable> result=null;
FileSystem fs=null;
SequenceFile.Reader reader=null;
Path path=null;
Writable key=null;
UserWritable value=new UserWritable();

try {
fs=FileSystem.get(conf);
result=new ArrayList<UserWritable>();
path=new Path(sequeceFilePath);
reader=new SequenceFile.Reader(fs, path, conf);
key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf); // 获得Key,也就是之前写入的userId
while(reader.next(key, value)){
result.add(value);
value=new UserWritable();
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
}finally{
IOUtils.closeStream(reader);
}
return result;
}

private  static Configuration getDefaultConf(){
Configuration conf=new Configuration();
conf.set("mapred.job.tracker", "local");
conf.set("fs.default.name", "file:///");
//conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzoCodec");
return conf;
}

/**
* @param args
*/
public static void main(String[] args) {
String filePath="data/user.sequence"; // 文件路径
Set<UserWritable> users=new HashSet<UserWritable>();
UserWritable user=null;
// 生成数据
for(int i=1;i<=10;i++){
user=new UserWritable(i+(int)(Math.random()*100000),"name-"+(i+1),(int)(Math.random()*50)+10);
users.add(user);
}
// 写入到sequence file
write2SequenceFile(filePath,getDefaultConf(),users);
//从sequence file中读取
List<UserWritable> readDatas=readSequenceFile(filePath,getDefaultConf());

// 对比数据是否正确并输出
for(UserWritable u:readDatas){
if(users.contains(u)){
System.out.println(u.toString());
}else{
System.err.println("Error data:"+u.toString());
}
}

}

}


 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop string path null file user
相关文章推荐