您的位置:首页 > 大数据 > Hadoop

整合storm-hdfs过程中源码学习

2016-07-24 19:01 465 查看
前一段整合了stomr-hdfs,但是发现在原有的storm-hdfs-0.9.4.jar中的写入数据的逻辑不满足我们的需求,于是乎需要看源码,然后在源码的基础上改写源码,满足自己的需求。
整合storm-hdfs的过程,其实也就是编写storm的拓扑结构,然后调用storm-hdfs-0.9.4.jar中的hdfsBolt,通过配置hdfsBolt的一些与hdfs有关的参数,将数据写入到hdfs中。
配置的参数:
1、RecordFormat:定义字段定界符,你可以使用换行符\n或者制表符\t;
2、SyncPolicy:定义每次写入的tuple的数量;
3、FileRotationPolicy:定义写入的hdfs文件的轮转策略,你可以以时间轮转(TimedRotationPolicy)、大小轮转(FileSizeRotationPolicy)、不轮转(NoRotationPolicy);
4、FileNameFormat:定义写入文件的路径(withPath)和文件名的前后缀(withPrefix、withExtension);
5、withFsUrl:定义hdfs的地址。
hdfsBolt中写数据的源码:

[java]
view plain
copy

public void execute(Tuple tuple)  
 {  
   try  
   {  
     byte[] bytes = this.format.format(tuple);  //对每一条数据添加定界符  
     synchronized (this.writeLock)  
     {  
       this.out.write(bytes);     //调用输出流写数据  
       this.offset += bytes.length;  //更新写入文件的当前大小  
       if (this.syncPolicy.mark(tuple, this.offset))      //当数据条数满足所配的条数时,写入到hdfs  
       {         
         if ((this.out instanceof HdfsDataOutputStream)) {        
           ((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));  
         } else {  
           this.out.hsync();  
         }  
         this.syncPolicy.reset();  
       }  
     }  
     this.collector.ack(tuple);  
     if (this.rotationPolicy.mark(tuple, this.offset))   //当当前文件大小等于所配轮转文件的大小,则轮转文件,重建新的写入文件  
     {  
       rotateOutputFile();  
       this.offset = 0L;  
       this.rotationPolicy.reset();  
     }  
   }  
   catch (IOException e)  
   {  
     LOG.warn("write/sync failed.", e);  
     this.collector.fail(tuple);  
   }  
 }  

hdfsBolt每次新建文件的方法:

[java]
view plain
copy

Path createOutputFile()  
    throws IOException  
  {  
    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));  
    this.out = this.fs.create(path);  //新建一个输出流(对应一个新的文件)  
    return path;   //返回路径  
  }  

轮转文件的方法:

[java]
view plain
copy

protected void rotateOutputFile()  
    throws IOException  
  {  
    LOG.info("Rotating output file...");  
    long start = System.currentTimeMillis();  
    synchronized (this.writeLock)  
    {  
      closeOutputFile();    //关闭前一个文件的输出流  
      this.rotation += 1;  //轮转数加一(这里的轮转数会反应到文件名上)  
        
      Path newFile = createOutputFile();   //新建一个文件  
      LOG.info("Performing {} file rotation actions.", Integer.valueOf(this.rotationActions.size()));  
      for (RotationAction action : this.rotationActions) {  
        action.execute(this.fs, this.currentFile);  
      }  
      this.currentFile = newFile;  //更新当前写入文件的路径  
    }  
    long time = System.currentTimeMillis() - start;  
    LOG.info("File rotation took {} ms.", Long.valueOf(time));  
  }  

添加字段定界符源码:

[java]
view plain
copy

public byte[] format(Tuple tuple)  
  {  
    StringBuilder sb = new StringBuilder();  
    Fields fields = this.fields == null ? tuple.getFields() : this.fields;  
    int size = fields.size();  
    for (int i = 0; i < size; i++)  
    {  
      sb.append(tuple.getValueByField(fields.get(i)));  
      if (i != size - 1) {  
        sb.append(this.fieldDelimiter);  
      }  
    }  
    sb.append(this.recordDelimiter);   //添加定界符  
    return sb.toString().getBytes();  
  }  

CountSyncPolicy源码,CountSyncPolicy实现SyncPolicy的接口方法:

[java]
view plain
copy

public class CountSyncPolicy  
  implements SyncPolicy  
{  
  private int count;       //配置的每次写入tuple数量  
  private int executeCount = 0; //当前已经执行的tuple的数量  
    
  public CountSyncPolicy(int count)  
  {  
    this.count = count;  
  }  
    
  public boolean mark(Tuple tuple, long offset)    //判断当前写入输出流缓存中的数量是否超过每次写入数量  
  {  
    this.executeCount += 1;  
    return this.executeCount >= this.count;  
  }  
    
  public void reset()  
  {  
    this.executeCount = 0;    //重置方法,每次写入后,执行重置方法归零  
  }  
}  

FileSizeRotationPolicy的源码(FileSizeRotationPolicy实现FileRotationPolicy的接口方法):

[java]
view plain
copy

public class FileSizeRotationPolicy  
  implements FileRotationPolicy  
{  
  private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class);  
  private long maxBytes;  //文件写满的大小  
    
  public static enum Units  
  {                                                                                                                                           //文件切换轮转的大小单位  
    KB(Math.pow(2.0D, 10.0D)),  MB(Math.pow(2.0D, 20.0D)),  GB(Math.pow(2.0D, 30.0D)),  TB(Math.pow(2.0D, 40.0D));  
      
    private long byteCount;  
      
    private Units(long byteCount)  
    {  
      this.byteCount = byteCount;  
    }  
      
    public long getByteCount()  
    {  
      return this.byteCount;  
    }  
  }  
    
  private long lastOffset = 0L;  
  private long currentBytesWritten = 0L;  
    
  public FileSizeRotationPolicy(float count, Units units)  
  {  
    this.maxBytes = ((count * (float)units.getByteCount())); //根据切换文件的单位来计算文件写满该有的大小  
  }  
    
  public boolean mark(Tuple tuple, long offset)   //文件是否切换的判断方法  
  {  
    long diff = offset - this.lastOffset;  
    this.currentBytesWritten += diff;  
    this.lastOffset = offset;  
    return this.currentBytesWritten >= this.maxBytes;  
  }  
    
  public void reset()    //重置方法  
  {  
    this.currentBytesWritten = 0L;  //当前文件已写的大小  
    this.lastOffset = 0L;  //一次写入后的offset值  
  }  
}  

DefaultFileNameFormat的源码(DefaultFileNameFormat实现FileNameFormat的接口方法):

[java]
view plain
copy

public class DefaultFileNameFormat  
  implements FileNameFormat  
{  
  private String componentId;  
  private int taskId;         //任务名id  
  private String path = "/storm"; //写入的目录路径  
  private String prefix = "";   //文件名前缀  
  private String extension = ".txt";//文件名后缀  
    
  public DefaultFileNameFormat withPrefix(String prefix)  
  {  
    this.prefix = prefix;  
    return this;  
  }  
    
  public DefaultFileNameFormat withExtension(String extension)  
  {  
    this.extension = extension;  
    return this;  
  }  
    
  public DefaultFileNameFormat withPath(String path)  
  {  
    this.path = path;  
    return this;  
  }  
    
  public void prepare(Map conf, TopologyContext topologyContext)  
  {  
    this.componentId = topologyContext.getThisComponentId();  
    this.taskId = topologyContext.getThisTaskId();  
  }  
    
  public String getName(long rotation, long timeStamp)  //得到写入文件的文件名  
  {  
    return this.prefix + this.componentId + "-" + this.taskId + "-" + rotation + "-" + timeStamp + this.extension;  
  }  
    
  public String getPath()  
  {  
    return this.path;  
  }  
}  
http://blog.csdn.net/u014039577/article/details/50215913
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: