整合storm-hdfs过程中源码学习
2015-12-08 12:56
501 查看
前一段整合了stomr-hdfs,但是发现在原有的storm-hdfs-0.9.4.jar中的写入数据的逻辑不满足我们的需求,于是乎需要看源码,然后在源码的基础上改写源码,满足自己的需求。
整合storm-hdfs的过程,其实也就是编写storm的拓扑结构,然后调用storm-hdfs-0.9.4.jar中的hdfsBolt,通过配置hdfsBolt的一些与hdfs有关的参数,将数据写入到hdfs中。
配置的参数:
1、RecordFormat:定义字段定界符,你可以使用换行符\n或者制表符\t;
2、SyncPolicy:定义每次写入的tuple的数量;
3、FileRotationPolicy:定义写入的hdfs文件的轮转策略,你可以以时间轮转(TimedRotationPolicy)、大小轮转(FileSizeRotationPolicy)、不轮转(NoRotationPolicy);
4、FileNameFormat:定义写入文件的路径(withPath)和文件名的前后缀(withPrefix、withExtension);
5、withFsUrl:定义hdfs的地址。
hdfsBolt中写数据的源码:
hdfsBolt每次新建文件的方法:
轮转文件的方法:
添加字段定界符源码:
CountSyncPolicy源码,CountSyncPolicy实现SyncPolicy的接口方法:
FileSizeRotationPolicy的源码(FileSizeRotationPolicy实现FileRotationPolicy的接口方法):
DefaultFileNameFormat的源码(DefaultFileNameFormat实现FileNameFormat的接口方法):
整合storm-hdfs的过程,其实也就是编写storm的拓扑结构,然后调用storm-hdfs-0.9.4.jar中的hdfsBolt,通过配置hdfsBolt的一些与hdfs有关的参数,将数据写入到hdfs中。
配置的参数:
1、RecordFormat:定义字段定界符,你可以使用换行符\n或者制表符\t;
2、SyncPolicy:定义每次写入的tuple的数量;
3、FileRotationPolicy:定义写入的hdfs文件的轮转策略,你可以以时间轮转(TimedRotationPolicy)、大小轮转(FileSizeRotationPolicy)、不轮转(NoRotationPolicy);
4、FileNameFormat:定义写入文件的路径(withPath)和文件名的前后缀(withPrefix、withExtension);
5、withFsUrl:定义hdfs的地址。
hdfsBolt中写数据的源码:
public void execute(Tuple tuple) { try { byte[] bytes = this.format.format(tuple); //对每一条数据添加定界符 synchronized (this.writeLock) { this.out.write(bytes); //调用输出流写数据 this.offset += bytes.length; //更新写入文件的当前大小 if (this.syncPolicy.mark(tuple, this.offset)) //当数据条数满足所配的条数时,写入到hdfs { if ((this.out instanceof HdfsDataOutputStream)) { ((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); } else { this.out.hsync(); } this.syncPolicy.reset(); } } this.collector.ack(tuple); if (this.rotationPolicy.mark(tuple, this.offset)) //当当前文件大小等于所配轮转文件的大小,则轮转文件,重建新的写入文件 { rotateOutputFile(); this.offset = 0L; this.rotationPolicy.reset(); } } catch (IOException e) { LOG.warn("write/sync failed.", e); this.collector.fail(tuple); } }
hdfsBolt每次新建文件的方法:
Path createOutputFile() throws IOException { Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); this.out = this.fs.create(path); //新建一个输出流(对应一个新的文件) return path; //返回路径 }
轮转文件的方法:
protected void rotateOutputFile() throws IOException { LOG.info("Rotating output file..."); long start = System.currentTimeMillis(); synchronized (this.writeLock) { closeOutputFile(); //关闭前一个文件的输出流 this.rotation += 1; //轮转数加一(这里的轮转数会反应到文件名上) Path newFile = createOutputFile(); //新建一个文件 LOG.info("Performing {} file rotation actions.", Integer.valueOf(this.rotationActions.size())); for (RotationAction action : this.rotationActions) { action.execute(this.fs, this.currentFile); } this.currentFile = newFile; //更新当前写入文件的路径 } long time = System.currentTimeMillis() - start; LOG.info("File rotation took {} ms.", Long.valueOf(time)); }
添加字段定界符源码:
public byte[] format(Tuple tuple) { StringBuilder sb = new StringBuilder(); Fields fields = this.fields == null ? tuple.getFields() : this.fields; int size = fields.size(); for (int i = 0; i < size; i++) { sb.append(tuple.getValueByField(fields.get(i))); if (i != size - 1) { sb.append(this.fieldDelimiter); } } sb.append(this.recordDelimiter); //添加定界符 return sb.toString().getBytes(); }
CountSyncPolicy源码,CountSyncPolicy实现SyncPolicy的接口方法:
public class CountSyncPolicy implements SyncPolicy { private int count; //配置的每次写入tuple数量 private int executeCount = 0; //当前已经执行的tuple的数量 public CountSyncPolicy(int count) { this.count = count; } public boolean mark(Tuple tuple, long offset) //判断当前写入输出流缓存中的数量是否超过每次写入数量 { this.executeCount += 1; return this.executeCount >= this.count; } public void reset() { this.executeCount = 0; //重置方法,每次写入后,执行重置方法归零 } }
FileSizeRotationPolicy的源码(FileSizeRotationPolicy实现FileRotationPolicy的接口方法):
public class FileSizeRotationPolicy implements FileRotationPolicy { private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class); private long maxBytes; //文件写满的大小 public static enum Units { //文件切换轮转的大小单位 KB(Math.pow(2.0D, 10.0D)), MB(Math.pow(2.0D, 20.0D)), GB(Math.pow(2.0D, 30.0D)), TB(Math.pow(2.0D, 40.0D)); private long byteCount; private Units(long byteCount) { this.byteCount = byteCount; } public long getByteCount() { return this.byteCount; } } private long lastOffset = 0L; private long currentBytesWritten = 0L; public FileSizeRotationPolicy(float count, Units units) { this.maxBytes = ((count * (float)units.getByteCount())); //根据切换文件的单位来计算文件写满该有的大小 } public boolean mark(Tuple tuple, long offset) //文件是否切换的判断方法 { long diff = offset - this.lastOffset; this.currentBytesWritten += diff; this.lastOffset = offset; return this.currentBytesWritten >= this.maxBytes; } public void reset() //重置方法 { this.currentBytesWritten = 0L; //当前文件已写的大小 this.lastOffset = 0L; //一次写入后的offset值 } }
DefaultFileNameFormat的源码(DefaultFileNameFormat实现FileNameFormat的接口方法):
public class DefaultFileNameFormat implements FileNameFormat { private String componentId; private int taskId; //任务名id private String path = "/storm"; //写入的目录路径 private String prefix = ""; //文件名前缀 private String extension = ".txt";//文件名后缀 public DefaultFileNameFormat withPrefix(String prefix) { this.prefix = prefix; return this; } public DefaultFileNameFormat withExtension(String extension) { this.extension = extension; return this; } public DefaultFileNameFormat withPath(String path) { this.path = path; return this; } public void prepare(Map conf, TopologyContext topologyContext) { this.componentId = topologyContext.getThisComponentId(); this.taskId = topologyContext.getThisTaskId(); } public String getName(long rotation, long timeStamp) //得到写入文件的文件名 { return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension; } public String getPath() { return this.path; } }
相关文章推荐
- 上传本地文件到hdfs
- hdfs的常用命令
- HDFS 上传文件的不平衡,Balancer问题是过慢
- HDFS(1)
- hdfs 机架感知和复制因子的设置
- Avro技术应用_5. 利用 Camus 来将 Avro 数据从 Kafka 拷贝到 HDFS -- 待完善
- 从本地上传到hdfs上出现异常
- HDFS的命令行操作
- hadoop 优化之container
- HDFS小文件处理解决方案总结+facebook(HayStack) + 淘宝(TFS)
- HDFS TFS
- hdfs文件操作操作示例,包括上传文件到HDFS上、从HDFS上下载文件和删除HDFS上的文件
- Hadoop之yarn和mapreduce
- hdfs配置文件详解(转载)
- eclipse远程连接hadoop-笔记2
- flume 收集日志到HDFS
- HDFS操作
- eclipse远程连接hadoop-笔记
- spring hadoop系列(六)---HbaseSystemException
- hdfs 删除和新增节点