您的位置:首页 > 大数据 > Hadoop

整合storm-hdfs过程中源码学习

2015-12-08 12:56 501 查看
前一段整合了stomr-hdfs,但是发现在原有的storm-hdfs-0.9.4.jar中的写入数据的逻辑不满足我们的需求,于是乎需要看源码,然后在源码的基础上改写源码,满足自己的需求。
整合storm-hdfs的过程,其实也就是编写storm的拓扑结构,然后调用storm-hdfs-0.9.4.jar中的hdfsBolt,通过配置hdfsBolt的一些与hdfs有关的参数,将数据写入到hdfs中。
配置的参数:
1、RecordFormat:定义字段定界符,你可以使用换行符\n或者制表符\t;
2、SyncPolicy:定义每次写入的tuple的数量;
3、FileRotationPolicy:定义写入的hdfs文件的轮转策略,你可以以时间轮转(TimedRotationPolicy)、大小轮转(FileSizeRotationPolicy)、不轮转(NoRotationPolicy);
4、FileNameFormat:定义写入文件的路径(withPath)和文件名的前后缀(withPrefix、withExtension);
5、withFsUrl:定义hdfs的地址。
hdfsBolt中写数据的源码:
public void execute(Tuple tuple)
  {
    try
    {
      byte[] bytes = this.format.format(tuple);  //对每一条数据添加定界符
      synchronized (this.writeLock)
      {
        this.out.write(bytes);     //调用输出流写数据
        this.offset += bytes.length;  //更新写入文件的当前大小
        if (this.syncPolicy.mark(tuple, this.offset))      //当数据条数满足所配的条数时,写入到hdfs
        {       
          if ((this.out instanceof HdfsDataOutputStream)) {      
            ((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
          } else {
            this.out.hsync();
          }
          this.syncPolicy.reset();
        }
      }
      this.collector.ack(tuple);
      if (this.rotationPolicy.mark(tuple, this.offset))   //当当前文件大小等于所配轮转文件的大小,则轮转文件,重建新的写入文件
      {
        rotateOutputFile();
        this.offset = 0L;
        this.rotationPolicy.reset();
      }
    }
    catch (IOException e)
    {
      LOG.warn("write/sync failed.", e);
      this.collector.fail(tuple);
    }
  }


hdfsBolt每次新建文件的方法:

Path createOutputFile()
    throws IOException
  {
    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
    this.out = this.fs.create(path);  //新建一个输出流(对应一个新的文件)
    return path;   //返回路径
  }


轮转文件的方法:

protected void rotateOutputFile()
    throws IOException
  {
    LOG.info("Rotating output file...");
    long start = System.currentTimeMillis();
    synchronized (this.writeLock)
    {
      closeOutputFile();    //关闭前一个文件的输出流
      this.rotation += 1;  //轮转数加一(这里的轮转数会反应到文件名上)
      
      Path newFile = createOutputFile();   //新建一个文件
      LOG.info("Performing {} file rotation actions.", Integer.valueOf(this.rotationActions.size()));
      for (RotationAction action : this.rotationActions) {
        action.execute(this.fs, this.currentFile);
      }
      this.currentFile = newFile;  //更新当前写入文件的路径
    }
    long time = System.currentTimeMillis() - start;
    LOG.info("File rotation took {} ms.", Long.valueOf(time));
  }


添加字段定界符源码:

public byte[] format(Tuple tuple)
  {
    StringBuilder sb = new StringBuilder();
    Fields fields = this.fields == null ? tuple.getFields() : this.fields;
    int size = fields.size();
    for (int i = 0; i < size; i++)
    {
      sb.append(tuple.getValueByField(fields.get(i)));
      if (i != size - 1) {
        sb.append(this.fieldDelimiter);
      }
    }
    sb.append(this.recordDelimiter);   //添加定界符
    return sb.toString().getBytes();
  }


CountSyncPolicy源码,CountSyncPolicy实现SyncPolicy的接口方法:

public class CountSyncPolicy
  implements SyncPolicy
{
  private int count;       //配置的每次写入tuple数量
  private int executeCount = 0; //当前已经执行的tuple的数量
  
  public CountSyncPolicy(int count)
  {
    this.count = count;
  }
  
  public boolean mark(Tuple tuple, long offset)    //判断当前写入输出流缓存中的数量是否超过每次写入数量
  {
    this.executeCount += 1;
    return this.executeCount >= this.count;
  }
  
  public void reset()
  {
    this.executeCount = 0;    //重置方法,每次写入后,执行重置方法归零
  }
}


FileSizeRotationPolicy的源码(FileSizeRotationPolicy实现FileRotationPolicy的接口方法):

public class FileSizeRotationPolicy
  implements FileRotationPolicy
{
  private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class);
  private long maxBytes;  //文件写满的大小
  
  public static enum Units
  {                                                                                                                                           //文件切换轮转的大小单位
    KB(Math.pow(2.0D, 10.0D)),  MB(Math.pow(2.0D, 20.0D)),  GB(Math.pow(2.0D, 30.0D)),  TB(Math.pow(2.0D, 40.0D));
    
    private long byteCount;
    
    private Units(long byteCount)
    {
      this.byteCount = byteCount;
    }
    
    public long getByteCount()
    {
      return this.byteCount;
    }
  }
  
  private long lastOffset = 0L;
  private long currentBytesWritten = 0L;
  
  public FileSizeRotationPolicy(float count, Units units)
  {
    this.maxBytes = ((count * (float)units.getByteCount())); //根据切换文件的单位来计算文件写满该有的大小
  }
  
  public boolean mark(Tuple tuple, long offset)   //文件是否切换的判断方法
  {
    long diff = offset - this.lastOffset;
    this.currentBytesWritten += diff;
    this.lastOffset = offset;
    return this.currentBytesWritten >= this.maxBytes;
  }
  
  public void reset()    //重置方法
  {
    this.currentBytesWritten = 0L;  //当前文件已写的大小
    this.lastOffset = 0L;  //一次写入后的offset值
  }
}


DefaultFileNameFormat的源码(DefaultFileNameFormat实现FileNameFormat的接口方法):

public class DefaultFileNameFormat
  implements FileNameFormat
{
  private String componentId;
  private int taskId;         //任务名id
  private String path = "/storm"; //写入的目录路径
  private String prefix = "";   //文件名前缀
  private String extension = ".txt";//文件名后缀
  
  public DefaultFileNameFormat withPrefix(String prefix)
  {
    this.prefix = prefix;
    return this;
  }
  
  public DefaultFileNameFormat withExtension(String extension)
  {
    this.extension = extension;
    return this;
  }
  
  public DefaultFileNameFormat withPath(String path)
  {
    this.path = path;
    return this;
  }
  
  public void prepare(Map conf, TopologyContext topologyContext)
  {
    this.componentId = topologyContext.getThisComponentId();
    this.taskId = topologyContext.getThisTaskId();
  }
  
  public String getName(long rotation, long timeStamp)  //得到写入文件的文件名
  {
    return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension;
  }
  
  public String getPath()
  {
    return this.path;
  }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: