您的位置:首页 > 其它

MapReduce生成HFile入库到HBase及源码分析

2017-05-17 13:32 543 查看
原文:

http://blog.pureisle.net/archives/1950.html

如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即hbase提供的HFileOutputFormat类。

它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

下边给出mapreduce程序样例,数据源是hbase,结果文件输出路径自定义:
public class HFileOutput {

        //job 配置

  public static Job configureJob(Configuration conf) throws IOException {

    Job job = new Job(configuration, "countUnite1");

    job.setJarByClass(HFileOutput.class);

                //job.setNumReduceTasks(2);  

    //job.setOutputKeyClass(ImmutableBytesWritable.class);

    //job.setOutputValueClass(KeyValue.class);

    //job.setOutputFormatClass(HFileOutputFormat.class);

 

    Scan scan = new Scan();

    scan.setCaching(10);

    scan.addFamily(INPUT_FAMILY);

    TableMapReduceUtil.initTableMapperJob(inputTable, scan,

        HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);

    //这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class

                job.setReducerClass(HFileOutputRedcuer.class);

    //job.setOutputFormatClass(HFileOutputFormat.class);

    HFileOutputFormat.configureIncrementalLoad(job, new HTable(

        configuration, outputTable));

    HFileOutputFormat.setOutputPath(job, new Path());

                //FileOutputFormat.setOutputPath(job, new Path()); //等同上句

    return job;

  }

 

  public static class HFileOutputMapper extends

      TableMapper<ImmutableBytesWritable, LongWritable> {

    public void map(ImmutableBytesWritable key, Result values,

        Context context) throws IOException, InterruptedException {

      //mapper逻辑部分

      context.write(new ImmutableBytesWritable(Bytes()), LongWritable());

    }

  }

 

  public static class HFileOutputRedcuer extends

      Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {

    public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,

        Context context) throws IOException, InterruptedException {

                        //reducer逻辑部分

      KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),

          Bytes.toBytes(count));

      context.write(key, kv);

    }

  }

}

这里需要注意的是无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是:< ImmutableBytesWritable, KeyValue> 或者< ImmutableBytesWritable, Put>。否则报这样的错误:

java.lang.IllegalArgumentException: Can't read partitions file

...

Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

上边配置部分,注释掉的其实写不写都无所谓,因为看源码(最后有贴出源码)就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的需要手动配置。setNumReduceTasks设置是根据region个数自动配置的。

生成的文件入库代码为:
public class TestLoadIncrementalHFileToHBase {  

    public static void main(String[] args) throws IOException {  

        Configuration conf = HBaseConfiguration.create();  

        byte[] TABLE = Bytes.toBytes(args[0]);  

        HTable table = new HTable(TABLE);  

        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  

        loader.doBulkLoad(new Path(args[1]), table);  

    }  

}

另外hbase有打包好的入库jar包使用方法:

hadoop jar hbase-VERSION.jar completebulkload /myoutput mytable;

最后就是你执行你的程序时有可能遇到这样的问题:

FAILED Error: java.lang.ClassNotFoundException: com.google.common.util.concurrent.ThreadFactoryBuilder

就是你需要添加一个jar包,位置在HBASE_HOME/bin/guava-r09.jar ,添加上就OK了。

下边把HFileOutputFormat类的源码贴出来看一看:
/**

 * Writes HFiles. Passed KeyValues must arrive in order.

 * Currently, can only write files to a single column family at a

 * time.  Multiple column families requires coordinating keys cross family.

 * Writes current time as the sequence id for the file. Sets the major compacted

 * attribute on created hfiles. Calling write(null,null) will forceably roll

 * all HFiles being written.

 * @see KeyValueSortReducer

 */

public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {

  static Log LOG = LogFactory.getLog(HFileOutputFormat.class);

  static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";

  TimeRangeTracker trt = new TimeRangeTracker();

 

  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)

  throws IOException, InterruptedException {

    // Get the path of the temporary output file

    final Path outputPath = FileOutputFormat.getOutputPath(context);

    final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();

    final Configuration conf = context.getConfiguration();

    final FileSystem fs = outputdir.getFileSystem(conf);

    // These configs. are from hbase-*.xml

    final long maxsize = conf.getLong("hbase.hregion.max.filesize",

        HConstants.DEFAULT_MAX_FILE_SIZE);

    final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize",

        HFile.DEFAULT_BLOCKSIZE);

    // Invented config.  Add to hbase-*.xml if other than default compression.

    final String defaultCompression = conf.get("hfile.compression",

        Compression.Algorithm.NONE.getName());

 

    // create a map from column family to the compression algorithm

    final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);

 

    return new RecordWriter<ImmutableBytesWritable, KeyValue>() {

      // Map of families to writers and how much has been output on the writer.

      private final Map<byte [], WriterLength> writers =

        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);

      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;

      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());

      private boolean rollRequested = false;

 

      public void write(ImmutableBytesWritable row, KeyValue kv)

      throws IOException {

        // null input == user explicitly wants to flush

        if (row == null && kv == null) {

          rollWriters();

          return;

        }

 

        byte [] rowKey = kv.getRow();

        long length = kv.getLength();

        byte [] family = kv.getFamily();

        WriterLength wl = this.writers.get(family);

 

        // If this is a new column family, verify that the directory exists

        if (wl == null) {

          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));

        }

 

        // If any of the HFiles for the column families has reached

        // maxsize, we need to roll all the writers

        if (wl != null && wl.written + length >= maxsize) {

          this.rollRequested = true;

        }

 

        // This can only happen once a row is finished though

        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {

          rollWriters();

        }

 

        // create a new HLog writer, if necessary

        if (wl == null || wl.writer == null) {

          wl = getNewWriter(family, conf);

        }

 

        // we now have the proper HLog writer. full steam ahead

        kv.updateLatestStamp(this.now);

        trt.includeTimestamp(kv);

        wl.writer.append(kv);

        wl.written += length;

 

        // Copy the row so we know when a row transition.

        this.previousRow = rowKey;

      }

 

      private void rollWriters() throws IOException {

        for (WriterLength wl : this.writers.values()) {

          if (wl.writer != null) {

            LOG.info("Writer=" + wl.writer.getPath() +

                ((wl.written == 0)? "": ", wrote=" + wl.written));

            close(wl.writer);

          }

          wl.writer = null;

          wl.written = 0;

        }

        this.rollRequested = false;

      }

 

      /* Create a new HFile.Writer.

       * @param family

       * @return A WriterLength, containing a new HFile.Writer.

       * @throws IOException

       */

      private WriterLength getNewWriter(byte[] family, Configuration conf)

          throws IOException {

        WriterLength wl = new WriterLength();

        Path familydir = new Path(outputdir, Bytes.toString(family));

        String compression = compressionMap.get(family);

        compression = compression == null ? defaultCompression : compression;

        wl.writer =

          HFile.getWriterFactory(conf).createWriter(fs,

          StoreFile.getUniqueFile(fs, familydir), blocksize,

          compression, KeyValue.KEY_COMPARATOR);

        this.writers.put(family, wl);

        return wl;

      }

 

      private void close(final HFile.Writer w) throws IOException {

        if (w != null) {

          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,

              Bytes.toBytes(System.currentTimeMillis()));

          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,

              Bytes.toBytes(context.getTaskAttemptID().toString()));

          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,

              Bytes.toBytes(true));

          w.appendFileInfo(StoreFile.TIMERANGE_KEY,

              WritableUtils.toByteArray(trt));

          w.close();

        }

      }

 

      public void close(TaskAttemptContext c)

      throws IOException, InterruptedException {

        for (WriterLength wl: this.writers.values()) {

          close(wl.writer);

        }

      }

    };

  }

 

  /*

   * Data structure to hold a Writer and amount of data written on it.

   */

  static class WriterLength {

    long written = 0;

    HFile.Writer writer = null;

  }

 

  /**

   * Return the start keys of all of the regions in this table,

   * as a list of ImmutableBytesWritable.

   */

  private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)

  throws IOException {

    byte[][] byteKeys = table.getStartKeys();

    ArrayList<ImmutableBytesWritable> ret =

      new ArrayList<ImmutableBytesWritable>(byteKeys.length);

    for (byte[] byteKey : byteKeys) {

      ret.add(new ImmutableBytesWritable(byteKey));

    }

    return ret;

  }

 

  /**

   * Write out a SequenceFile that can be read by TotalOrderPartitioner

   * that contains the split points in startKeys.

   * @param partitionsPath output path for SequenceFile

   * @param startKeys the region start keys

   */

  private static void writePartitions(Configuration conf, Path partitionsPath,

      List<ImmutableBytesWritable> startKeys) throws IOException {

    if (startKeys.isEmpty()) {

      throw new IllegalArgumentException("No regions passed");

    }

 

    // We're generating a list of split points, and we don't ever

    // have keys < the first region (which has an empty start key)

    // so we need to remove it. Otherwise we would end up with an

    // empty reducer with index 0

    TreeSet<ImmutableBytesWritable> sorted =

      new TreeSet<ImmutableBytesWritable>(startKeys);

 

    ImmutableBytesWritable first = sorted.first();

    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {

      throw new IllegalArgumentException(

          "First region of table should have empty start key. Instead has: "

          + Bytes.toStringBinary(first.get()));

    }

    sorted.remove(first);

 

    // Write the actual file

    FileSystem fs = partitionsPath.getFileSystem(conf);

    SequenceFile.Writer writer = SequenceFile.createWriter(fs,

        conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);

 

    try {

      for (ImmutableBytesWritable startKey : sorted) {

        writer.append(startKey, NullWritable.get());

      }

    } finally {

      writer.close();

    }

  }

 

  /**

   * Configure a MapReduce Job to perform an incremental load into the given

   * table. This

   * <ul>

   *   <li>Inspects the table to configure a total order partitioner</li>

   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>

   *   <li>Sets the number of reduce tasks to match the current number of regions</li>

   *   <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>

   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or

   *     PutSortReducer)</li>

   * </ul> 

   * The user should be sure to set the map output value class to either KeyValue or Put before

   * running this function.

   */

  public static void configureIncrementalLoad(Job job, HTable table)

  throws IOException {

    Configuration conf = job.getConfiguration();

    Class<? extends Partitioner> topClass;

    try {

      topClass = getTotalOrderPartitionerClass();

    } catch (ClassNotFoundException e) {

      throw new IOException("Failed getting TotalOrderPartitioner", e);

    }

    job.setPartitionerClass(topClass);

    job.setOutputKeyClass(ImmutableBytesWritable.class);

    job.setOutputValueClass(KeyValue.class);

    job.setOutputFormatClass(HFileOutputFormat.class);

 

    // Based on the configured map output class, set the correct reducer to properly

    // sort the incoming values.

    // TODO it would be nice to pick one or the other of these formats.

    if (KeyValue.class.equals(job.getMapOutputValueClass())) {

      job.setReducerClass(KeyValueSortReducer.class);

    } else if (Put.class.equals(job.getMapOutputValueClass())) {

      job.setReducerClass(PutSortReducer.class);

    } else {

      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());

    }

 

    LOG.info("Looking up current regions for table " + table);

    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);

    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +

        "to match current region count");

    job.setNumReduceTasks(startKeys.size());

 

    Path partitionsPath = new Path(job.getWorkingDirectory(),

        "partitions_" + System.currentTimeMillis());

    LOG.info("Writing partition information to " + partitionsPath);

 

    FileSystem fs = partitionsPath.getFileSystem(conf);

    writePartitions(conf, partitionsPath, startKeys);

    partitionsPath.makeQualified(fs);

 

    URI cacheUri;

    try {

      // Below we make explicit reference to the bundled TOP.  Its cheating.

      // We are assume the define in the hbase bundled TOP is as it is in

      // hadoop (whether 0.20 or 0.22, etc.)

      cacheUri = new URI(partitionsPath.toString() + "#" +

        org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);

    } catch (URISyntaxException e) {

      throw new IOException(e);

    }

    DistributedCache.addCacheFile(cacheUri, conf);

    DistributedCache.createSymlink(conf);

 

    // Set compression algorithms based on column families

    configureCompression(table, conf);

 

    TableMapReduceUtil.addDependencyJars(job);

    LOG.info("Incremental table output configured.");

  }

 

  /**

   * If > hadoop 0.20, then we want to use the hadoop TotalOrderPartitioner.

   * If 0.20, then we want to use the TOP that we have under hadoopbackport.

   * This method is about hbase being able to run on different versions of

   * hadoop.  In 0.20.x hadoops, we have to use the TOP that is bundled with

   * hbase.  Otherwise, we use the one in Hadoop.

   * @return Instance of the TotalOrderPartitioner class

   * @throws ClassNotFoundException If can't find a TotalOrderPartitioner.

   */

  private static Class<? extends Partitioner> getTotalOrderPartitionerClass()

  throws ClassNotFoundException {

    Class<? extends Partitioner> clazz = null;

    try {

      clazz = (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner");

    } catch (ClassNotFoundException e) {

      clazz =

        (Class<? extends Partitioner>) Class.forName("org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner");

    }

    return clazz;

  }

 

  /**

   * Run inside the task to deserialize column family to compression algorithm

   * map from the

   * configuration.

   * 

   * Package-private for unit tests only.

   * 

   * @return a map from column family to the name of the configured compression

   *         algorithm

   */

  static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {

    Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);

    String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");

    for (String familyConf : compressionConf.split("&")) {

      String[] familySplit = familyConf.split("=");

      if (familySplit.length != 2) {

        continue;

      }

 

      try {

        compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),

            URLDecoder.decode(familySplit[1], "UTF-8"));

      } catch (UnsupportedEncodingException e) {

        // will not happen with UTF-8 encoding

        throw new AssertionError(e);

      }

    }

    return compressionMap;

  }

 

  /**

   * Serialize column family to compression algorithm map to configuration.

   * Invoked while configuring the MR job for incremental load.

   * 

   * Package-private for unit tests only.

   * 

   * @throws IOException

   *           on failure to read column family descriptors

   */

  static void configureCompression(HTable table, Configuration conf) throws IOException {

    StringBuilder compressionConfigValue = new StringBuilder();

    HTableDescriptor tableDescriptor = table.getTableDescriptor();

    if(tableDescriptor == null){

      // could happen with mock table instance

      return;

    }

    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();

    int i = 0;

    for (HColumnDescriptor familyDescriptor : families) {

      if (i++ > 0) {

        compressionConfigValue.append('&');

      }

      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));

      compressionConfigValue.append('=');

      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));

    }

    // Get rid of the last ampersand

    conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: