整合storm-hdfs过程中源码学习
2016-07-24 19:01
465 查看
前一段整合了stomr-hdfs,但是发现在原有的storm-hdfs-0.9.4.jar中的写入数据的逻辑不满足我们的需求,于是乎需要看源码,然后在源码的基础上改写源码,满足自己的需求。
整合storm-hdfs的过程,其实也就是编写storm的拓扑结构,然后调用storm-hdfs-0.9.4.jar中的hdfsBolt,通过配置hdfsBolt的一些与hdfs有关的参数,将数据写入到hdfs中。
配置的参数:
1、RecordFormat:定义字段定界符,你可以使用换行符\n或者制表符\t;
2、SyncPolicy:定义每次写入的tuple的数量;
3、FileRotationPolicy:定义写入的hdfs文件的轮转策略,你可以以时间轮转(TimedRotationPolicy)、大小轮转(FileSizeRotationPolicy)、不轮转(NoRotationPolicy);
4、FileNameFormat:定义写入文件的路径(withPath)和文件名的前后缀(withPrefix、withExtension);
5、withFsUrl:定义hdfs的地址。
hdfsBolt中写数据的源码:
[java]
view plain
copy
public void execute(Tuple tuple)
{
try
{
byte[] bytes = this.format.format(tuple); //对每一条数据添加定界符
synchronized (this.writeLock)
{
this.out.write(bytes); //调用输出流写数据
this.offset += bytes.length; //更新写入文件的当前大小
if (this.syncPolicy.mark(tuple, this.offset)) //当数据条数满足所配的条数时,写入到hdfs
{
if ((this.out instanceof HdfsDataOutputStream)) {
((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
}
this.syncPolicy.reset();
}
}
this.collector.ack(tuple);
if (this.rotationPolicy.mark(tuple, this.offset)) //当当前文件大小等于所配轮转文件的大小,则轮转文件,重建新的写入文件
{
rotateOutputFile();
this.offset = 0L;
this.rotationPolicy.reset();
}
}
catch (IOException e)
{
LOG.warn("write/sync failed.", e);
this.collector.fail(tuple);
}
}
hdfsBolt每次新建文件的方法:
[java]
view plain
copy
Path createOutputFile()
throws IOException
{
Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
this.out = this.fs.create(path); //新建一个输出流(对应一个新的文件)
return path; //返回路径
}
轮转文件的方法:
[java]
view plain
copy
protected void rotateOutputFile()
throws IOException
{
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
synchronized (this.writeLock)
{
closeOutputFile(); //关闭前一个文件的输出流
this.rotation += 1; //轮转数加一(这里的轮转数会反应到文件名上)
Path newFile = createOutputFile(); //新建一个文件
LOG.info("Performing {} file rotation actions.", Integer.valueOf(this.rotationActions.size()));
for (RotationAction action : this.rotationActions) {
action.execute(this.fs, this.currentFile);
}
this.currentFile = newFile; //更新当前写入文件的路径
}
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", Long.valueOf(time));
}
添加字段定界符源码:
[java]
view plain
copy
public byte[] format(Tuple tuple)
{
StringBuilder sb = new StringBuilder();
Fields fields = this.fields == null ? tuple.getFields() : this.fields;
int size = fields.size();
for (int i = 0; i < size; i++)
{
sb.append(tuple.getValueByField(fields.get(i)));
if (i != size - 1) {
sb.append(this.fieldDelimiter);
}
}
sb.append(this.recordDelimiter); //添加定界符
return sb.toString().getBytes();
}
CountSyncPolicy源码,CountSyncPolicy实现SyncPolicy的接口方法:
[java]
view plain
copy
public class CountSyncPolicy
implements SyncPolicy
{
private int count; //配置的每次写入tuple数量
private int executeCount = 0; //当前已经执行的tuple的数量
public CountSyncPolicy(int count)
{
this.count = count;
}
public boolean mark(Tuple tuple, long offset) //判断当前写入输出流缓存中的数量是否超过每次写入数量
{
this.executeCount += 1;
return this.executeCount >= this.count;
}
public void reset()
{
this.executeCount = 0; //重置方法,每次写入后,执行重置方法归零
}
}
FileSizeRotationPolicy的源码(FileSizeRotationPolicy实现FileRotationPolicy的接口方法):
[java]
view plain
copy
public class FileSizeRotationPolicy
implements FileRotationPolicy
{
private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class);
private long maxBytes; //文件写满的大小
public static enum Units
{ //文件切换轮转的大小单位
KB(Math.pow(2.0D, 10.0D)), MB(Math.pow(2.0D, 20.0D)), GB(Math.pow(2.0D, 30.0D)), TB(Math.pow(2.0D, 40.0D));
private long byteCount;
private Units(long byteCount)
{
this.byteCount = byteCount;
}
public long getByteCount()
{
return this.byteCount;
}
}
private long lastOffset = 0L;
private long currentBytesWritten = 0L;
public FileSizeRotationPolicy(float count, Units units)
{
this.maxBytes = ((count * (float)units.getByteCount())); //根据切换文件的单位来计算文件写满该有的大小
}
public boolean mark(Tuple tuple, long offset) //文件是否切换的判断方法
{
long diff = offset - this.lastOffset;
this.currentBytesWritten += diff;
this.lastOffset = offset;
return this.currentBytesWritten >= this.maxBytes;
}
public void reset() //重置方法
{
this.currentBytesWritten = 0L; //当前文件已写的大小
this.lastOffset = 0L; //一次写入后的offset值
}
}
DefaultFileNameFormat的源码(DefaultFileNameFormat实现FileNameFormat的接口方法):
[java]
view plain
copy
public class DefaultFileNameFormat
implements FileNameFormat
{
private String componentId;
private int taskId; //任务名id
private String path = "/storm"; //写入的目录路径
private String prefix = ""; //文件名前缀
private String extension = ".txt";//文件名后缀
public DefaultFileNameFormat withPrefix(String prefix)
{
this.prefix = prefix;
return this;
}
public DefaultFileNameFormat withExtension(String extension)
{
this.extension = extension;
return this;
}
public DefaultFileNameFormat withPath(String path)
{
this.path = path;
return this;
}
public void prepare(Map conf, TopologyContext topologyContext)
{
this.componentId = topologyContext.getThisComponentId();
this.taskId = topologyContext.getThisTaskId();
}
public String getName(long rotation, long timeStamp) //得到写入文件的文件名
{
return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension;
}
public String getPath()
{
return this.path;
}
}
http://blog.csdn.net/u014039577/article/details/50215913
整合storm-hdfs的过程,其实也就是编写storm的拓扑结构,然后调用storm-hdfs-0.9.4.jar中的hdfsBolt,通过配置hdfsBolt的一些与hdfs有关的参数,将数据写入到hdfs中。
配置的参数:
1、RecordFormat:定义字段定界符,你可以使用换行符\n或者制表符\t;
2、SyncPolicy:定义每次写入的tuple的数量;
3、FileRotationPolicy:定义写入的hdfs文件的轮转策略,你可以以时间轮转(TimedRotationPolicy)、大小轮转(FileSizeRotationPolicy)、不轮转(NoRotationPolicy);
4、FileNameFormat:定义写入文件的路径(withPath)和文件名的前后缀(withPrefix、withExtension);
5、withFsUrl:定义hdfs的地址。
hdfsBolt中写数据的源码:
[java]
view plain
copy
public void execute(Tuple tuple)
{
try
{
byte[] bytes = this.format.format(tuple); //对每一条数据添加定界符
synchronized (this.writeLock)
{
this.out.write(bytes); //调用输出流写数据
this.offset += bytes.length; //更新写入文件的当前大小
if (this.syncPolicy.mark(tuple, this.offset)) //当数据条数满足所配的条数时,写入到hdfs
{
if ((this.out instanceof HdfsDataOutputStream)) {
((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
}
this.syncPolicy.reset();
}
}
this.collector.ack(tuple);
if (this.rotationPolicy.mark(tuple, this.offset)) //当当前文件大小等于所配轮转文件的大小,则轮转文件,重建新的写入文件
{
rotateOutputFile();
this.offset = 0L;
this.rotationPolicy.reset();
}
}
catch (IOException e)
{
LOG.warn("write/sync failed.", e);
this.collector.fail(tuple);
}
}
hdfsBolt每次新建文件的方法:
[java]
view plain
copy
Path createOutputFile()
throws IOException
{
Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
this.out = this.fs.create(path); //新建一个输出流(对应一个新的文件)
return path; //返回路径
}
轮转文件的方法:
[java]
view plain
copy
protected void rotateOutputFile()
throws IOException
{
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
synchronized (this.writeLock)
{
closeOutputFile(); //关闭前一个文件的输出流
this.rotation += 1; //轮转数加一(这里的轮转数会反应到文件名上)
Path newFile = createOutputFile(); //新建一个文件
LOG.info("Performing {} file rotation actions.", Integer.valueOf(this.rotationActions.size()));
for (RotationAction action : this.rotationActions) {
action.execute(this.fs, this.currentFile);
}
this.currentFile = newFile; //更新当前写入文件的路径
}
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", Long.valueOf(time));
}
添加字段定界符源码:
[java]
view plain
copy
public byte[] format(Tuple tuple)
{
StringBuilder sb = new StringBuilder();
Fields fields = this.fields == null ? tuple.getFields() : this.fields;
int size = fields.size();
for (int i = 0; i < size; i++)
{
sb.append(tuple.getValueByField(fields.get(i)));
if (i != size - 1) {
sb.append(this.fieldDelimiter);
}
}
sb.append(this.recordDelimiter); //添加定界符
return sb.toString().getBytes();
}
CountSyncPolicy源码,CountSyncPolicy实现SyncPolicy的接口方法:
[java]
view plain
copy
public class CountSyncPolicy
implements SyncPolicy
{
private int count; //配置的每次写入tuple数量
private int executeCount = 0; //当前已经执行的tuple的数量
public CountSyncPolicy(int count)
{
this.count = count;
}
public boolean mark(Tuple tuple, long offset) //判断当前写入输出流缓存中的数量是否超过每次写入数量
{
this.executeCount += 1;
return this.executeCount >= this.count;
}
public void reset()
{
this.executeCount = 0; //重置方法,每次写入后,执行重置方法归零
}
}
FileSizeRotationPolicy的源码(FileSizeRotationPolicy实现FileRotationPolicy的接口方法):
[java]
view plain
copy
public class FileSizeRotationPolicy
implements FileRotationPolicy
{
private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class);
private long maxBytes; //文件写满的大小
public static enum Units
{ //文件切换轮转的大小单位
KB(Math.pow(2.0D, 10.0D)), MB(Math.pow(2.0D, 20.0D)), GB(Math.pow(2.0D, 30.0D)), TB(Math.pow(2.0D, 40.0D));
private long byteCount;
private Units(long byteCount)
{
this.byteCount = byteCount;
}
public long getByteCount()
{
return this.byteCount;
}
}
private long lastOffset = 0L;
private long currentBytesWritten = 0L;
public FileSizeRotationPolicy(float count, Units units)
{
this.maxBytes = ((count * (float)units.getByteCount())); //根据切换文件的单位来计算文件写满该有的大小
}
public boolean mark(Tuple tuple, long offset) //文件是否切换的判断方法
{
long diff = offset - this.lastOffset;
this.currentBytesWritten += diff;
this.lastOffset = offset;
return this.currentBytesWritten >= this.maxBytes;
}
public void reset() //重置方法
{
this.currentBytesWritten = 0L; //当前文件已写的大小
this.lastOffset = 0L; //一次写入后的offset值
}
}
DefaultFileNameFormat的源码(DefaultFileNameFormat实现FileNameFormat的接口方法):
[java]
view plain
copy
public class DefaultFileNameFormat
implements FileNameFormat
{
private String componentId;
private int taskId; //任务名id
private String path = "/storm"; //写入的目录路径
private String prefix = ""; //文件名前缀
private String extension = ".txt";//文件名后缀
public DefaultFileNameFormat withPrefix(String prefix)
{
this.prefix = prefix;
return this;
}
public DefaultFileNameFormat withExtension(String extension)
{
this.extension = extension;
return this;
}
public DefaultFileNameFormat withPath(String path)
{
this.path = path;
return this;
}
public void prepare(Map conf, TopologyContext topologyContext)
{
this.componentId = topologyContext.getThisComponentId();
this.taskId = topologyContext.getThisTaskId();
}
public String getName(long rotation, long timeStamp) //得到写入文件的文件名
{
return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension;
}
public String getPath()
{
return this.path;
}
}
http://blog.csdn.net/u014039577/article/details/50215913
相关文章推荐
- HDFS
- Understanding HDFS Recovery Processes (Part 2)
- HDFS全面解析涉及基础、命令、API
- 01 HDFS 简介
- 常用的HDFS下的文件命令
- [干货] Flume综述与实例
- Hadoop集群的配置
- 我的疑问:hadoop的单机、伪分布式、分布式区别
- 查看HDFS文件系统数据的三种方法
- HDFS的StartupProgress启动跟踪分析
- HDFS的fs.defaultFS的端口
- hdfs 通过NFSV3 加载至本地目录
- Spark将HDFS数据导入到HBase
- Spark 读取HDFS存入 HBase(1.0.0 新 API)
- hdfs HA + MR HA
- Ubuntu16.04下安装Hadoop2.6.2 (单机模式)
- Ubuntu Virtualbox 搭建hadoop伪分布式集群
- HDFS 的 file size 和 block size
- HBase从hdfs导入数据
- HDFS 冗余数据块的自动删除