MapReduce生成HFile文件,再使用BulkLoad导入HBase中(完全分布式运行)
2014-05-09 21:16
405 查看
声明:若要转载,请标明出处.
前提:在对于大量的数据导入到HBase中,如果一条一条进行插入,则太耗时了,所以可以先采用MapReduce生成HFile文件,然后使用BulkLoad导入HBase中.
引用:
一、这种方式有很多的优点:
1.如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源,一个比较高效便捷的方法就是使用“BulkLoading”方法,即HBase提供的HFileOutputFormat类。
2.它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
二、这种方式也有很大的限制:
1.仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
2.HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群.
本文代码采用Eclipse编辑器(Linux环境下)
一.网上的大部分代码都是或多或少有问题,比如他们或者不是运行在集群上,或者运行时有问题,后面会对产生哪些问题进行说明,先不说这么多了,先上代码吧.
二.源代码(注:作者亲测运行在集群上成功,集群基于Ubuntu12.04,Hadoop-1.2.1与HBase-0.98,使用自带的ZooKeeper)
1.MapReduce生产HFile文件
首先,需要导入的数据的表格(BigClientEnergyInfo表)有四个列族,每个列族下面有一些列,这些信息都使用常量配置类CONSTANT_HADOOP与CONSTANT_HBASE进行说明,如下:
接着,使用创建一个生成四个列族的HFile的MapRedJob,每个列族一个Job,源代码如下(类BigClientEnergyInfoHFileGenerator):
其中有三点需要特别注意:
(1)
生成HFile程序说明:
①.最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是:<ImmutableBytesWritable,KeyValue>或者<ImmutableBytesWritable,Put>。
②.最终输出部分,Value类型是KeyValue或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
③.MR例子中job.setOutputFormatClass(HFileOutputFormat.class);HFileOutputFormat只适合一次对单列族组织成HFile文件。好像最新的版本可以多个列族.
④.MR例子中HFileOutputFormat.configureIncrementalLoad(job,table);自动对job进行配置。TotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。
⑤.MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于moveHFile到HBase的Region中,HFile子目录的列族内容没有了。
然后,使用BulkLoad工具将HFile文件导入HBase中,源代码如下(类BigClientEnergyInfoHFileLoader):
最后,使用一个Driver类,先创建HTable,然后调用上面的两个类,源代码如下(类BigClientEnergyInfoLoadDriver):
说明:因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split,所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先创建分区,可以查阅相关资料(关于HBase调优的资料),而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey,
即startKey与endKey.
特别注意:对HBase进行操作时,在获取HBaseconf时,即publicstaticConfigurationHBASE_CONFIG=HBaseConfiguration.create();的时候,一定要进行如下设置:
到这里就基本大功告成了.可以通过node1:50030查看job的运行情况,通过node1:60010查看HBase的相关情况.
下一篇将讲述中间遇到的问题以及解决办法.
前提:在对于大量的数据导入到HBase中,如果一条一条进行插入,则太耗时了,所以可以先采用MapReduce生成HFile文件,然后使用BulkLoad导入HBase中.
引用:
一、这种方式有很多的优点:
1.如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源,一个比较高效便捷的方法就是使用“BulkLoading”方法,即HBase提供的HFileOutputFormat类。
2.它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
二、这种方式也有很大的限制:
1.仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
2.HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群.
本文代码采用Eclipse编辑器(Linux环境下)
一.网上的大部分代码都是或多或少有问题,比如他们或者不是运行在集群上,或者运行时有问题,后面会对产生哪些问题进行说明,先不说这么多了,先上代码吧.
二.源代码(注:作者亲测运行在集群上成功,集群基于Ubuntu12.04,Hadoop-1.2.1与HBase-0.98,使用自带的ZooKeeper)
1.MapReduce生产HFile文件
首先,需要导入的数据的表格(BigClientEnergyInfo表)有四个列族,每个列族下面有一些列,这些信息都使用常量配置类CONSTANT_HADOOP与CONSTANT_HBASE进行说明,如下:
packagecn.hey.loaddata2hbase; /** * *@authorHeYong *@version1 *@time2014-05-09 * */ publicclassCONSTANT_HADOOP{ //大客户表BigClientEnergyInfo的HFile生成Job名字 publicstaticfinalStringBigClientEnergyInfo_JobName="BigClientEnergyInfo_HFileGenerator_Job"; //大客户表BigClientEnergyInfo的输入原始文本信息的HDFS路径 publicstaticfinalStringBigClientEnergyInfo_inDir="hdfs://node1:49000/user/hadoop/input/BigClientEnergyInfo/"; //大客户表BigClientEnergyInfo的HFile文件的输出HDFS路径 publicstaticfinalStringBigClientEnergyInfo_HFile_outDir="hdfs://node1:49000/user/hadoop/output/BigClientEnergyInfo/"; //说明:因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split //所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先进行分区, //而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey //即startKey与endKey //获取最大rowKey与最小rowKey的Job名字 publicstaticfinalStringGetMaxAndMinRowKey_JobName="GetMaxAndMinRowKey_Job"; //大客户表BigClientEnergyInfo的输入原始文本信息的HDFS路径 publicstaticfinalStringGetMaxAndMinRowKey_inDir="hdfs://node1:49000/user/hadoop/input/BigClientEnergyInfo/"; //最大rowKey与最小rowKey的输出HDFS路径 publicstaticfinalStringGetMaxAndMinRowKey_outDir="hdfs://node1:49000/user/hadoop/output/GetMaxAndMinRowKey/"; }
packagecn.hey.loaddata2hbase; importjava.util.LinkedList; importjava.util.List; importorg.apache.hadoop.hbase.client.HTable; /** * *@authorHeYong *@version1 *@time2014-05-09 * */ publicclassCONSTANT_HBASE{ publicstaticfinallongtimeStamp=System.currentTimeMillis(); //表集合 publicstaticList<HTable>htables=newLinkedList<HTable>(); publicstaticfinalString[]TableNames={"BigClientEnergyInfo"}; /** *大客户表信息 */ //列族信息 publicstaticfinalString[]TB0_FamilyNames={"DateTime","MeterEnergy","ObjInfo","ClientInfo"}; //第1个列族中的列 publicstaticfinalString[]TB0_FN0ColNames={"DATETIME"}; //第2个列族中的列 publicstaticfinalString[]TB0_FN1ColNames={"DT","OBJ_ID","E0","E1","E2","E3","E4","E5"}; //第3个列族中的列 publicstaticfinalString[]TB0_FN2ColNames={"STAT_TYPE","CITY_NO","OBJ_ID","OBJ_NAME","LAYER","LAYER_ID","OBJ_TYPE","TYPE_VALUE", "TYPE_VALUE_GROUP","SORT","SYS_ID","STATION_NO","FLAG"}; //第4个列族中的列 publicstaticfinalString[]TB0_FN3ColNames={"CITY_NO","CONSUMERID","CONSUMERNAME","CUSTOMERTYPE","USERSTATUS","USERADDR","ZONEID","INDUSTRYTYPE", "LINKMAN","LINKPHONE","USETYPE","LINEID"}; //列族信息集合 publicstaticfinalString[][]TB0_FNColNames={TB0_FN0ColNames,TB0_FN1ColNames,TB0_FN2ColNames,TB0_FN3ColNames}; //每个列族的列索引 publicstaticfinalint[]FNColIndex={1,2,10,23}; }
接着,使用创建一个生成四个列族的HFile的MapRedJob,每个列族一个Job,源代码如下(类BigClientEnergyInfoHFileGenerator):
其中有三点需要特别注意:
(1)
//特别注意:一定要设置,不然会报cannotreadpartitionerfile错误 conf.set("fs.default.name","node1:49000");(2)
//特别注意:一定要设置,不然不会运行在集群上 conf.set("mapred.job.tracker","node1:49001");(3)
//特别注意:对相关Class文件以及依赖的jar包(如HBase的jar,)进行打包,这是运行在集群上必须要做的一步,不然集群找不到相关的Mapper等类文件 Filejarpath; try{ jarpath=JarTools.makeJar("bin"); conf.set("mapred.jar",jarpath.toString()); }catch(Exceptione){ logger.error("进行jar打包出错!"); e.printStackTrace(); return; }特别注意:因为我这里是对工程下的bin目录里面的内容进行打包,所以需要把依赖的jar包先放入bin文件夹中,再BulidPath->AddtoBuildPath,不然会出现在运行时,依赖的包中的类找不到,如HBase包中的ImmutableBytesWritable类等.当然你也可以放在别的目录下,然后进行打包,反正需要将相关Class文件与依赖的jar包进行打包.这里自己写了一个JarTools类进行对指定文件夹下面的内容进行打包
packagecn.hey.loaddata2hbase; importjava.io.File; importjava.io.IOException; importjava.net.URI; importjava.util.ArrayList; importjava.util.Iterator; importjava.util.List; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.FileSystem; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.hbase.KeyValue; importorg.apache.hadoop.hbase.client.HTable; importorg.apache.hadoop.hbase.io.ImmutableBytesWritable; importorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat; importorg.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; importorg.apache.hadoop.hbase.util.Bytes; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.log4j.LogManager; importorg.apache.log4j.Logger; importcn.hey.file.FileOperation; importcn.hey.hbase.HbaseOperation; importcn.hey.utils.JarTools; /** * *@authorHeYong *@version1 *@time2014-05-09 * */ publicclassBigClientEnergyInfoHFileGenerator{ publicstaticLoggerlogger=LogManager.getLogger(BigClientEnergyInfoHFileGenerator.class); /** * *@paramargs第一个元素表示第几个表,第二个元素表示该表的列族个数 *@throwsIOException *@throwsInterruptedException *@throwsClassNotFoundException *@throwsException */ publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException,Exception{ if(args.length<2){ logger.error("参数个数不对!"); return; } inttableIndex=Integer.parseInt(args[0]); intfamilyNum=Integer.parseInt(args[1]); intindex=0; longbeginTime=0,endTime=0; while(index<familyNum){ beginTime=System.currentTimeMillis(); GeneratorJob(tableIndex,index); endTime=System.currentTimeMillis(); FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime1.txt",(((endTime-beginTime)/(1.0*60*1000)))+"\n"); ++index; } FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime1.txt","-----------------------------"); } publicstaticclassHFileGenerateMapperextends Mapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{ privatestaticintfamilyIndex=0; privatestaticConfigurationconf=null; @Override protectedvoidsetup(Contextcontext)throwsIOException, InterruptedException{ conf=context.getConfiguration(); familyIndex=conf.getInt("familyIndex",0); } @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext) throwsIOException,InterruptedException{ ImmutableBytesWritablerowkey=newImmutableBytesWritable( value.toString().split(",")[0].getBytes()); List<KeyValue>list=null; list=createKeyValue(value.toString()); Iterator<KeyValue>it=list.iterator(); while(it.hasNext()){ KeyValuekv=newKeyValue(); kv=it.next(); if(kv!=null){ context.write(rowkey,kv); } } } privateList<KeyValue>createKeyValue(Stringstr){ List<KeyValue>list=newArrayList<KeyValue>(CONSTANT_HBASE.TB0_FNColNames[familyIndex].length); String[]values=str.toString().split(","); String[]qualifiersName=CONSTANT_HBASE.TB0_FNColNames[familyIndex]; for(inti=0;i<qualifiersName.length;i++){ Stringrowkey=values[0]; Stringfamily=CONSTANT_HBASE.TB0_FamilyNames[familyIndex]; Stringqualifier=qualifiersName[i]; Stringvalue_str=values[i+CONSTANT_HBASE.FNColIndex[familyIndex]]; KeyValuekv=newKeyValue(Bytes.toBytes(rowkey), Bytes.toBytes(family),Bytes.toBytes(qualifier), CONSTANT_HBASE.timeStamp,Bytes.toBytes(value_str)); list.add(kv); } returnlist; } } //测试Mapper,用来进行测试的,后面没有用到 publicstaticclassHFileMapperextendsMapper<LongWritable,Text,ImmutableBytesWritable,KeyValue>{ protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ String[]values=value.toString().split("",-1); byte[]rkey=Bytes.toBytes(values[0]);//rowkey byte[]family=Bytes.toBytes("info");//列族 byte[]column=Bytes.toBytes("name");//列 byte[]val=Bytes.toBytes(values[1]);//值 //PuttmpPut=newPut(subject); ImmutableBytesWritablerowKey=newImmutableBytesWritable(rkey); KeyValuekvProtocol=newKeyValue(rkey,family,column,val); context.write(rowKey,kvProtocol); } } /** * *@paramtableIndex表示第几个表(从0开始),具体参见CONSTANT_HBASE类 *@paramfamilyIndex表示该表的第几个列族(从0开始),具体参见CONSTANT_HBASE类 *@throwsIOException */ publicstaticvoidGeneratorJob(inttableIndex,intfamilyIndex)throwsIOException{ Configurationconf=HbaseOperation.HBASE_CONFIG; //特别注意:一定要设置,不然会爆cannotreadpartitionerfile错误 conf.set("fs.default.name","node1:49000"); //特别注意:一定要设置,不然不会运行在集群上 conf.set("mapred.job.tracker","node1:49001"); //特别注意:对相关Class以及依赖的jar包(如HBase的jar)进行打包,这是运行在集群上必须要做的一步,不然集群找不到相关的Mapper等类文件 Filejarpath; try{ jarpath=JarTools.makeJar("bin"); conf.set("mapred.jar",jarpath.toString()); }catch(Exceptione){ logger.error("进行jar打包出错!"); e.printStackTrace(); return; } //设置job Jobjob=newJob(conf,CONSTANT_HADOOP.BigClientEnergyInfo_JobName); job.setJarByClass(BigClientEnergyInfoHFileGenerator.class); //设置Map任务输出Key-Value类型,一定要为该类型,Value可以改为HBase的Put类型 job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); //设置Mapper与Reducer类 job.setMapperClass(HFileGenerateMapper.class); job.setReducerClass(KeyValueSortReducer.class); //不需要设置,系统会根据相关信息调用HFileOutputFormat //job.setOutputFormatClass(HFileOutputFormat.class); //不需要设置,系统会根据表的Region数创建多少Reducer //job.setNumReduceTasks(4); //job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class); HTabletable=newHTable(conf,CONSTANT_HBASE.TableNames[tableIndex]); HFileOutputFormat.configureIncrementalLoad(job,table); //设置数据输入输出目录 Stringstr_inPath=CONSTANT_HADOOP.BigClientEnergyInfo_inDir; Stringstr_outPath=CONSTANT_HADOOP.BigClientEnergyInfo_HFile_outDir+CONSTANT_HBASE.TB0_FamilyNames[familyIndex]; //创建HDFS对象 FileSystemfs=FileSystem.get(URI.create(str_inPath),conf); //如果输出路径存在就先删掉,因为不允许输出路径事先存在 PathoutPath=newPath(str_outPath); if(fs.exists(outPath)) fs.delete(outPath,true); FileInputFormat.addInputPath(job,newPath(str_inPath)); FileOutputFormat.setOutputPath(job,newPath(str_outPath)); try{ job.waitForCompletion(true); }catch(InterruptedExceptione){ logger.info(CONSTANT_HADOOP.BigClientEnergyInfo_JobName+"任务运行出错!"); e.printStackTrace(); }catch(ClassNotFoundExceptione){ logger.info(CONSTANT_HADOOP.BigClientEnergyInfo_JobName+"任务运行出错!"); e.printStackTrace(); } } }
生成HFile程序说明:
①.最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是:<ImmutableBytesWritable,KeyValue>或者<ImmutableBytesWritable,Put>。
②.最终输出部分,Value类型是KeyValue或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
③.MR例子中job.setOutputFormatClass(HFileOutputFormat.class);HFileOutputFormat只适合一次对单列族组织成HFile文件。好像最新的版本可以多个列族.
④.MR例子中HFileOutputFormat.configureIncrementalLoad(job,table);自动对job进行配置。TotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。
⑤.MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于moveHFile到HBase的Region中,HFile子目录的列族内容没有了。
然后,使用BulkLoad工具将HFile文件导入HBase中,源代码如下(类BigClientEnergyInfoHFileLoader):
packagecn.hey.loaddata2hbase; importjava.io.File; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; importorg.apache.log4j.LogManager; importorg.apache.log4j.Logger; importcn.hey.file.FileOperation; importcn.hey.hbase.HbaseOperation; /** * *@authorHeYong *@version1 *@time2014-05-09 * */ publicclassBigClientEnergyInfoHFileLoader{ publicstaticLoggerlogger=LogManager.getLogger(HFileLoader.class); publicstaticvoidmain(String[]args)throwsException{ if(args.length<2){ logger.error("参数个数不对!"); return; } LoadIncrementalHFilesloader=newLoadIncrementalHFiles( HbaseOperation.HBASE_CONFIG); inttableIndex=Integer.parseInt(args[0]); intfamilyNum=Integer.parseInt(args[1]); inti=0; longbeginTime=0,endTime=0; while(i<familyNum){ beginTime=System.currentTimeMillis(); Stringstr_outPath=CONSTANT_HADOOP.str_outPath+CONSTANT_HBASE.TB0_FamilyNames[i]; loader.doBulkLoad(newPath(str_outPath),CONSTANT_HBASE.htables.get(tableIndex)); endTime=System.currentTimeMillis(); //将用时相关写入文件 FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime2.txt",(((endTime-beginTime)/(1.0*60*1000)))+"\n"); ++i; } FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime2.txt","------------------------"); } }
最后,使用一个Driver类,先创建HTable,然后调用上面的两个类,源代码如下(类BigClientEnergyInfoLoadDriver):
说明:因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split,所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先创建分区,可以查阅相关资料(关于HBase调优的资料),而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey,
即startKey与endKey.
packagecn.hey.loaddata2hbase; importjava.io.IOException; importjava.util.ArrayList; importjava.util.List; importorg.apache.hadoop.hbase.client.HTable; importorg.apache.log4j.LogManager; importorg.apache.log4j.Logger; importcn.hey.hbase.HbaseOperation; importcn.hey.hdfs.HDFSOperation; /** * *@authorHeYong *@version1 *@time2014-05-09 * */ publicclassBigClientEnergyInfoLoadDriver{ protectedstaticLoggerlogger=LogManager.getLogger(BigClientEnergyInfoLoadDriver.class); /** *@paramargs *@throwsClassNotFoundException *@throwsInterruptedException *@throwsIOException */ publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{ //首先删除在CONSTANT_HBASE类中的第0个表,即BigClientEnergyInfo表 dropHTable(0); /** *说明:因为在创建HBase表的时候,默认只有一个Region,只有等到这个Region的大小超过一定的阈值之后,才会进行split, *所以为了利用完全分布式加快生成HFile和导入HBase中以及数据负载均衡,所以需要在创建表的时候预先创建分区,可以查阅相关资料(关于HBase调优的资料), *而进行分区时要利用startKey与endKey进行rowKey区间划分(因为导入HBase中,需要rowKey整体有序),所以在导入之前,自己先写一个MapReduce的Job求最小与最大的rowKey, *即startKey与endKey. * */ //调用GetMaxAndMinRowKeyDriver.获取startKey与endKey GetMaxAndMinRowKeyDriver.main(null); //读取startKey与endKey,readHDFSFile方法即读取指定HDFS文件中的内容,每一行作为一个字符串 List<String>strList=HDFSOperation.readHDFSFile(CONSTANT_HADOOP.GetMaxAndMinRowKey_outDir+"part-r-00000"); if(strList==null||strList.size()<2){ logger.info("startKey与endKey读取失败!"); return; } StringstartKey=strList.get(0); StringendKey=strList.get(1); if(startKey==null||"".equals(startKey)||endKey==null||"".equals(endKey)){ logger.info("startKey或endKey为空!"); return; } args=newString[2]; //第0个表,表的索引,即表BigClientEnergyInfo args[0]="0"; //该表所拥有的列族的数目 args[1]=""+CONSTANT_HBASE.TB0_FamilyNames.length; //创建第0个表,即大客户表 booleanflag=false; try{ //创建表时预先创建的Region个数 intnumPreRegions=7; flag=createHTable(0,startKey,endKey,numPreRegions); }catch(IOExceptione1){ e1.printStackTrace(); } if(flag){ //产生该表的HFile文件 try{ BigClientEnergyInfoHFileGenerator.main(args); }catch(IOExceptione){ e.printStackTrace(); }catch(InterruptedExceptione){ e.printStackTrace(); }catch(ClassNotFoundExceptione){ e.printStackTrace(); }catch(Exceptione){ e.printStackTrace(); } //将HFile导入HBase中 try{ HFileLoader.main(args); }catch(Exceptione){ e.printStackTrace(); } } } /** * *@paramindex第几个表 *@paramstartKey创建预先分区的startKey *@paramendKey创建预先分区的endKey *@paramnumRegions创建预先分区个数 *@return是否创建成功 *@throwsIOException */ publicstaticbooleancreateHTable(intindex,StringstartKey,StringendKey,intnumRegions)throwsIOException{ if(index<0||index>=CONSTANT_HBASE.TableNames.length){ logger.error("表下标越界!"); returnfalse; } if(startKey==null||"".equals(startKey)){ logger.error("startKey不能为空!"); returnfalse; } if(endKey==null||"".equals(endKey)){ logger.error("endKey不能为空!"); returnfalse; } if(numRegions<0){ logger.error("分区个数<0!"); returnfalse; } List<String>list=newArrayList<String>(); StringtableName=CONSTANT_HBASE.TableNames[index]; for(StringfamilyName:CONSTANT_HBASE.TB0_FamilyNames){ list.add(familyName); } if(HbaseOperation.createTable(tableName,list,startKey,endKey,numRegions)){ logger.info("创建HTable:"+tableName+"成功"); } HTabletable=newHTable(HbaseOperation.HBASE_CONFIG,tableName); CONSTANT_HBASE.htables.add(table); returntrue; } publicstaticvoiddropHTable(intindex){ StringtableName=CONSTANT_HBASE.TableNames[index]; HbaseOperation.dropTable(tableName); } }
注:HbaseOperation.createTable方法,即创建表,HbaseOperation.dropTable方法,即删除表,源代码如下: /** *创建表 * *@paramtableName *@paramfamily列族集名称 *@paramStringstartKey,StringendKey,intnumRegions预先分区相关信息 */ publicstaticbooleancreateTable(StringtableName,List<String>family,StringstartKey,StringendKey,intnumRegions){ try{ hBaseAdmin=newHBaseAdmin(HBASE_CONFIG); //如果表已存在,则返回 if(hBaseAdmin.tableExists(tableName)){ //hBaseAdmin.disableTable(tableName); //hBaseAdmin.deleteTable(tableName); logger.info("表:"+tableName+"已经存在!"); returnfalse; } HTableDescriptortableDescriptor=newHTableDescriptor(tableName); for(Stringname:family){ tableDescriptor.addFamily(newHColumnDescriptor(name)); } hBaseAdmin.createTable(tableDescriptor,Bytes.toBytes(startKey),Bytes.toBytes(endKey),numRegions); }catch(MasterNotRunningExceptione){ e.printStackTrace(); }catch(ZooKeeperConnectionExceptione){ e.printStackTrace(); }catch(IOExceptione){ e.printStackTrace(); } returntrue; }
/** *删除一张表 * *@paramtableName表名 */ publicstaticvoiddropTable(StringtableName){ if(tableName==null||"".equals(tableName)){ logger.error("表名不能为空!"); return; } try{ hBaseAdmin=newHBaseAdmin(HBASE_CONFIG); hBaseAdmin.disableTable(tableName); hBaseAdmin.deleteTable(tableName); }catch(MasterNotRunningExceptione){ e.printStackTrace(); }catch(ZooKeeperConnectionExceptione){ e.printStackTrace(); }catch(IOExceptione){ e.printStackTrace(); } }
特别注意:对HBase进行操作时,在获取HBaseconf时,即publicstaticConfigurationHBASE_CONFIG=HBaseConfiguration.create();的时候,一定要进行如下设置:
static{ //设置HMaster HBASE_CONFIG.set("hbase.zookeeper.master","node1:60000"); //设置Zookeeper集群 HBASE_CONFIG.set("hbase.zookeeper.quorum","node2,node3,node4,node5,node6,node7,node8"); }
不然会出现RegionServer的Zookeeper连接不上HMaster,千万要注意.
到这里就基本大功告成了.可以通过node1:50030查看job的运行情况,通过node1:60010查看HBase的相关情况.
下一篇将讲述中间遇到的问题以及解决办法.
相关文章推荐
- MapReduce生成HFile文件,再使用BulkLoad导入HBase中(完全分布式运行)
- mapreduce生成HFile通过bulkload入hbase库问题
- HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据 .
- 非mapreduce生成Hfile,然后导入hbase当中
- 非mapreduce生成Hfile,然后导入hbase当中
- 转载----非mapreduce生成Hfile,然后导入hbase当中
- HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据
- HBase数据迁移(2)- 使用bulk load 工具从TSV文件中导入数据
- 非mapreduce生成Hfile,然后导入hbase当中
- 非mapreduce生成Hfile,然后导入hbase当中
- hbase 学习(十二)非mapreduce生成Hfile,然后导入hbase当中
- 基于MapReduce,使用bulkload方式像hbase导入数据
- 转载----非mapreduce生成Hfile,然后导入hbase当中
- 用MR生成HFile文件格式后,数据批量导入HBase
- HBase导入大数据三大方式之(三)——mapreduce+completebulkload 方式
- MapReduce生成HFile入库到HBase
- 对于HBase的MapReduce性能提升方案之BulkLoad
- HBase数据导入之completebulkload方式
- MapReduce生成HFile入库到HBase
- HBase数据快速导入之ImportTsv&Bulkload