批量(bulkload)载入数据到hbase
2015-08-17 17:07
435 查看
HBase提供了操作表的java api,但是这种方式是单条数据插入,对于大量数据载入来说效率太低。
对于批量数据导入,直接生成HBase的内部存储结构:HFile,并将其导入到Hbase中的效率无疑是最高。
步骤如下:
通过mapreduce将源数据导出为HFile文件
HBaseBulkLoadDriver.java
CanMapper.java
2 . 导入HFile文件到HBase
HBaseBulkLoad.java
maven依赖:
对于批量数据导入,直接生成HBase的内部存储结构:HFile,并将其导入到Hbase中的效率无疑是最高。
步骤如下:
通过mapreduce将源数据导出为HFile文件
HBaseBulkLoadDriver.java
import com.sq.platform.hbaseLoadService.constant.MapperType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * hbase批量载入工具 * @author lijiang * @date 2015/7/28 16:25 */ public class HBaseBulkLoadDriver extends Configured implements Tool { private static final String DATA_SEPARATOR = "\t"; public static final String COLUMN_FAMILY = "data"; @Override public int run(String[] args) throws Exception { if (args.length != 2) return -1; int result = 0; String inputPath = args[0]; String outputPath = args[1]; Configuration configuration = getConf(); configuration.set("DATA_SEPARATOR", DATA_SEPARATOR); configuration.set("TABLE_NAME", tableName); configuration.set("COLUMN_FAMILY", COLUMN_FAMILY); Job job = Job.getInstance(configuration); job.setJarByClass(HBaseBulkLoadDriver.class); job.setJobName("Bulk Loading HBase Table::" + tableName); job.setMapOutputKeyClass(ImmutableBytesWritable.class); // 设置Mapper job.setMapperClass(CanMapper.class); FileInputFormat.addInputPaths(job, inputPath); FileSystem.getLocal(getConf()).delete(new Path(outputPath), true); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.setMapOutputValueClass(Put.class); HFileOutputFormat2.configureIncrementalLoad(job, new HTable(configuration, tableName)); job.waitForCompletion(true); if (job.isSuccessful()) { // mapreduce执行完成后进行批量hbase文件载入 HBaseBulkLoad.doBulkLoad(outputPath, tableName); } else { result = -1; } return result; } public static void main(String[] args) { try { int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args); if (response == 0) { System.out.println("Job is successfully completed..."); } else { System.out.println("Job failed..."); } } catch (Exception exception) { exception.printStackTrace(); } } }
CanMapper.java
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import java.io.IOException; /** * @author lijiang * @date 2015/8/14 10:50 */< d5a6 /span> public class CanMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { /** * 数据字段分隔符 */ protected String dataSeperator; /** * 列族 */ protected String columnFamily; /** * 表名 */ protected ImmutableBytesWritable hbaseTableName; public void setup(Mapper.Context context) { Configuration configuration = context.getConfiguration(); dataSeperator = configuration.get("DATA_SEPARATOR"); columnFamily = configuration.get("COLUMN_FAMILY"); hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(configuration.get("TABLE_NAME"))); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String[] values = value.toString().split(dataSeperator); // tick作为rowkey String rowKey = values[6]; Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("type"), Bytes.toBytes(values[0])); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("value"), Bytes.toBytes(values[1])); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("typeTag"), Bytes.toBytes(values[2])); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("result"), Bytes.toBytes(values[3])); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("dm1Info"), Bytes.toBytes(values[4])); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("dm1Code"), Bytes.toBytes(values[5])); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("time_t"), Bytes.toBytes(values[7])); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("stamp"), Bytes.toBytes(values[8])); context.write(hbaseTableName, put); } catch (Exception exception) { exception.printStackTrace(); } } }
2 . 导入HFile文件到HBase
HBaseBulkLoad.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; /** * hbase批量载入 * @author lijiang * @date 2015/7/29 10:31 */ public class HBaseBulkLoad { public static void doBulkLoad(String outputPath, String tableName) { try { Configuration configuration = new Configuration(); configuration.set("mapreduce.child.java.opts", "-Xmx1g"); HBaseConfiguration.addHbaseResources(configuration); LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration); HTable hTable = new HTable(configuration, tableName); loadFfiles.doBulkLoad(new Path(outputPath), hTable); } catch (Exception exception) { exception.printStackTrace(); } } }
maven依赖:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>0.99.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.99.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>0.99.0</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-jaxrs</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-xc</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>0.99.0</version> </dependency>
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树
- Hadoop_2.1.0 MapReduce序列图