您的位置:首页
HBase高速导入数据--BulkLoad
2017-06-30 09:23
344 查看
Apache HBase是一个分布式的、面向列的开源数据库。它能够让我们随机的、实时的訪问大数据。可是如何有效的将数据导入到HBase呢?HBase有多种导入数据的方法。最直接的方法就是在MapReduce作业中使用
Bulkload利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。
与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。
Bulkload过程主要包含三部分:
1.从数据源(一般是文本文件或其它的数据库)提取数据并上传到HDFS
这一步不在HBase的考虑范围内,无论数据源是什么,仅仅要在进行下一步之前将数据上传到HDFS就可以。
2.利用一个MapReduce作业准备数据
这一步须要一个MapReduce作业。而且大多数情况下还须要我们自己编写Map函数,而Reduce函数不须要我们考虑。由HBase提供。
该作业须要使用rowkey(行键)作为输出
3.告诉RegionServers数据的位置并导入数据
这一步是最简单的,通常须要使用
下图简单明白的说明了整个过程
图片来自How-to: Use HBase Bulk Loading, and Why
Note:在进行BulkLoad之前,要在HBase中创建与程序中同名且结构同样的空表
Java实现例如以下:
BulkLoadDriver.java
BulkLoadMapper.java
HFileLoader.java
将程序编译打包,提交到Hadoop执行
上述命令使用方法可參考44. HBase, MapReduce, and the CLASSPATH
作业执行情况:
BulLoad过程的第三步也能够在用MapReduce作业生成HBase数据文件后在命令行中进行,不一定要与MapReduce过程写在一起。
若在提交作业是产生例如以下异常:
将cdh2和cdh3机器的
參考资料:
http://hbase.apache.org/book.html#arch.bulk.load
http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/
TableOutputFormat作为输出。或者使用标准的客户端API,可是这些都不是很有效的方法。
Bulkload利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。
与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。
Bulkload过程主要包含三部分:
1.从数据源(一般是文本文件或其它的数据库)提取数据并上传到HDFS
这一步不在HBase的考虑范围内,无论数据源是什么,仅仅要在进行下一步之前将数据上传到HDFS就可以。
2.利用一个MapReduce作业准备数据
这一步须要一个MapReduce作业。而且大多数情况下还须要我们自己编写Map函数,而Reduce函数不须要我们考虑。由HBase提供。
该作业须要使用rowkey(行键)作为输出
Key,KeyValue、Put或者Delete作为输出
Value。MapReduce作业须要使用
HFileOutputFormat2来生成HBase数据文件。为了有效的导入数据,须要配置
HFileOutputFormat2使得每个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用Hadoop的
TotalOrderPartitioner类依据表的key值将输出切割开来。
HFileOutputFormat2的方法
configureIncrementalLoad()会自己主动的完毕上面的工作。
3.告诉RegionServers数据的位置并导入数据
这一步是最简单的,通常须要使用
LoadIncrementalHFiles(更为人所熟知是
completebulkload工具)。将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到对应的区域。
下图简单明白的说明了整个过程
图片来自How-to: Use HBase Bulk Loading, and Why
Note:在进行BulkLoad之前,要在HBase中创建与程序中同名且结构同样的空表
Java实现例如以下:
BulkLoadDriver.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Created by shaobo on 15-6-9. */ public class BulkLoadDriver extends Configured implements Tool { private static final String DATA_SEPERATOR = "\\s+"; private static final String TABLE_NAME = "temperature";//表名 private static final String COLUMN_FAMILY_1="date";//列组1 private static final String COLUMN_FAMILY_2="tempPerHour";//列组2 public static void main(String[] args) { try { int response = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadDriver(), args); if(response == 0) { System.out.println("Job is successfully completed..."); } else { System.out.println("Job failed..."); } } catch(Exception exception) { exception.printStackTrace(); } } public int run(String[] args) throws Exception { String outputPath = args[1]; /** * 设置作业參数 */ Configuration configuration = getConf(); configuration.set("data.seperator", DATA_SEPERATOR); configuration.set("hbase.table.name", TABLE_NAME); configuration.set("COLUMN_FAMILY_1", COLUMN_FAMILY_1); configuration.set("COLUMN_FAMILY_2", COLUMN_FAMILY_2); Job job = Job.getInstance(configuration, "Bulk Loading HBase Table::" + TABLE_NAME); job.setJarByClass(BulkLoadDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);//指定输出键类 job.setMapOutputValueClass(Put.class);//指定输出值类 job.setMapperClass(BulkLoadMapper.class);//指定Map函数 FileInputFormat.addInputPaths(job, args[0]);//输入路径 FileSystem fs = FileSystem.get(configuration); Path output = new Path(outputPath); if (fs.exists(output)) { fs.delete(output, true);//假设输出路径存在,就将其删除 } FileOutputFormat.setOutputPath(job, output);//输出路径 Connection connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf(TABLE_NAME); HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName)); job.waitForCompletion(true); if (job.isSuccessful()){ HFileLoader.doBulkLoad(outputPath, TABLE_NAME);//导入数据 return 0; } else { return 1; } } }
BulkLoadMapper.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * Created by shaobo on 15-6-9. */ public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private String hbaseTable; private String dataSeperator; private String columnFamily1; private String columnFamily2; public void setup(Context context) { Configuration configuration = context.getConfiguration();//获取作业參数 hbaseTable = configuration.get("hbase.table.name"); dataSeperator = configuration.get("data.seperator"); columnFamily1 = configuration.get("COLUMN_FAMILY_1"); columnFamily2 = configuration.get("COLUMN_FAMILY_2"); } public void map(LongWritable key, Text value, Context context){ try { String[] values = value.toString().split(dataSeperator); ImmutableBytesWritable rowKey = new ImmutableBytesWritable(values[0].getBytes()); Put put = new Put(Bytes.toBytes(values[0])); put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("month"), Bytes.toBytes(values[1])); put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("day"), Bytes.toBytes(values[2])); for (int i = 3; i < values.length; ++i){ put.addColumn(Bytes.toBytes(columnFamily2), Bytes.toBytes("hour : " + i), Bytes.toBytes(values[i])); } context.write(rowKey, put); } catch(Exception exception) { exception.printStackTrace(); } } }
HFileLoader.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; /** * Created by shaobo on 15-6-9. */ public class HFileLoader { public static void doBulkLoad(String pathToHFile, String tableName){ try { Configuration configuration = new Configuration(); HBaseConfiguration.addHbaseResources(configuration); LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration); HTable hTable = new HTable(configuration, tableName);//指定表名 loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//导入数据 System.out.println("Bulk Load Completed.."); } catch(Exception exception) { exception.printStackTrace(); } } }
将程序编译打包,提交到Hadoop执行
HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar BulkLoad.jar inputpath outputpath
上述命令使用方法可參考44. HBase, MapReduce, and the CLASSPATH
作业执行情况:
15/06/14 14:31:07 INFO mapreduce.HFileOutputFormat2: Looking up current regions for table temperature(表名) 15/06/14 14:31:07 INFO mapreduce.HFileOutputFormat2: Configuring 1 reduce partitions to match current region count 15/06/14 14:31:07 INFO mapreduce.HFileOutputFormat2: Writing partition information to /home/shaobo/hadoop/tmp/partitions_5d464f1e-d412-4dbe-bb98-367f8431bdc9 15/06/14 14:31:07 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 15/06/14 14:31:07 INFO compress.CodecPool: Got brand-new compressor [.deflate] 15/06/14 14:31:08 INFO mapreduce.HFileOutputFormat2: Incremental table temperature(表名) output configured. 15/06/14 14:31:08 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8032 15/06/14 14:31:15 INFO input.FileInputFormat: Total input paths to process : 2 15/06/14 14:31:15 INFO mapreduce.JobSubmitter: number of splits:2 15/06/14 14:31:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1434262360688_0002 15/06/14 14:31:17 INFO impl.YarnClientImpl: Submitted application application_1434262360688_0002 15/06/14 14:31:17 INFO mapreduce.Job: The url to track the job: http://shaobo-ThinkPad-E420:8088/proxy/application_1434262360688_0002/ 15/06/14 14:31:17 INFO mapreduce.Job: Running job: job_1434262360688_0002 15/06/14 14:31:28 INFO mapreduce.Job: Job job_1434262360688_0002 running in uber mode : false 15/06/14 14:31:28 INFO mapreduce.Job: map 0% reduce 0% 15/06/14 14:32:24 INFO mapreduce.Job: map 49% reduce 0% 15/06/14 14:32:37 INFO mapreduce.Job: map 67% reduce 0% 15/06/14 14:32:43 INFO mapreduce.Job: map 100% reduce 0% 15/06/14 14:33:39 INFO mapreduce.Job: map 100% reduce 67% 15/06/14 14:33:42 INFO mapreduce.Job: map 100% reduce 70% 15/06/14 14:33:45 INFO mapreduce.Job: map 100% reduce 88% 15/06/14 14:33:48 INFO mapreduce.Job: map 100% reduce 100% 15/06/14 14:33:52 INFO mapreduce.Job: Job job_1434262360688_0002 completed successfully ... ... ... 15/06/14 14:34:02 WARN mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://localhost:9000/user/output/_SUCCESS 15/06/14 14:34:03 INFO hfile.CacheConfig: CacheConfig:disabled 15/06/14 14:34:03 INFO hfile.CacheConfig: CacheConfig:disabled 15/06/14 14:34:07 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://localhost:9000/user/output/date/c64cd2524fba48738bab26630d550b61 first=AQW00061705 last=USW00094910 15/06/14 14:34:07 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://localhost:9000/user/output/tempPerHour/43af29456913444795a820544691eb3d first=AQW00061705 last=USW00094910 Bulk Load Completed.. Job is successfully completed...
BulLoad过程的第三步也能够在用MapReduce作业生成HBase数据文件后在命令行中进行,不一定要与MapReduce过程写在一起。
$ hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] outputpath tablename
若在提交作业是产生例如以下异常:
15/06/16 11:41:06 INFO mapreduce.Job: Job job_1434420992867_0003 failed with state FAILED due to: Application application_1434420992867_0003 failed 2 times due to AM Container for appattempt_1434420992867_0003_000002 exited with exitCode: -1000 For more detailed output, check application tracking page:http://cdh1:8088/proxy/application_1434420992867_0003/Then, click on links to logs of each attempt. Diagnostics: Rename cannot overwrite non empty destination directory /data/yarn/nm/usercache/hdfs/filecache/16 java.io.IOException: Rename cannot overwrite non empty destination directory /data/yarn/nm/usercache/hdfs/filecache/16 at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:716) at org.apache.hadoop.fs.FilterFs.renameInternal(FilterFs.java:228) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:659) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:909) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:364) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Failing this attempt. Failing the application. 15/06/16 11:41:06 INFO mapreduce.Job: Counters: 0
将cdh2和cdh3机器的
/data/yarn/nm/usercache/hdfs/filecache下的文件删除就可以。可參考http://stackoverflow.com/questions/30857413/hadoop-complains-about-attempting-to-overwrite-nonempty-destination-directory
參考资料:
http://hbase.apache.org/book.html#arch.bulk.load
http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/
相关文章推荐
- HBase数据快速导入之ImportTsv&Bulkload
- HBase数据快速导入之ImportTsv&Bulkload
- HBase 0.96.0 的数据导入: 利用ImportTsv,completebulkload,Import
- HBase快速导入数据--BulkLoad
- HBase之Bulk Load实现快速导入数据
- HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据 .
- HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据
- 基于MapReduce,使用bulkload方式像hbase导入数据
- HBase导入大数据三大方式之(三)——mapreduce+completebulkload 方式
- HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据
- Bulk Load-HBase数据导入最佳实践
- Bulk Load-HBase数据导入最佳实践
- HBase数据导入----improttsv&Bulk Load
- Hbase通过BulkLoad快速导入数据
- HBase数据导入之completebulkload方式
- Bulk Load-HBase数据导入最佳实践
- Hbase的completebulkload导入数据
- hbase bulkload导入数据
- HBase数据快速导入之ImportTsv&Bulkload
- HBase导入大数据三大方式之(二)——importtsv +completebulkload 方式