hadoop学习笔记之mapreduce中使用hbase
2014-06-06 17:25
423 查看
import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.metrics.spi.NullContext; import org.apache.hadoop.util.GenericOptionsParser; //mapreduce中使用hbase public class InsertDataToHBase { //create 'table1','field1','field2','field3' //create 'table2','field1','field2','field3' //create 'table3','field1','field2','field3' public static class InsertDataToHBaseMapper extends Mapper<Object, Text, NullContext, NullWritable> { public static String table1[] = { "field1", "field2", "field3" }; public static String table2[] = { "field1", "field2", "field3" }; public static String table3[] = { "field1", "field2", "field3" }; public static HTable table = null; protected void setup(Context context) throws IOException, InterruptedException { HBaseConfiguration conf = new HBaseConfiguration(); String table_name = context.getConfiguration().get("tabel_name"); if (table == null) { table = new HTable(conf, table_name); } } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String arr_value[] = value.toString().split("\t"); String table_name = context.getConfiguration().get("tabel_name"); String temp_arr[] = table1; int temp_value_length = 0; if (table_name.trim().equals("table1")) { temp_arr = table1; temp_value_length = 3; } else if (table_name.trim().equals("table2")) { temp_arr = table2; temp_value_length = 3; } else if (table_name.trim().equals("table3")) { temp_arr = table3; temp_value_length = 3; } List<Put> list = new ArrayList<Put>(); if (arr_value.length == temp_value_length) { String rowname = System.currentTimeMillis() / 1000 + "" +new Random().nextInt()*100; Put p = new Put(Bytes.toBytes(rowname)); for (int i = 0; i < temp_arr.length; i++) { p.add(temp_arr[i].getBytes(), "".getBytes(), arr_value[i].getBytes()); } list.add(p); } table.put(list); table.flushCommits(); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //data.txt out table3 if (otherArgs.length != 3) { System.err .println("Usage: InsertDataToHBase <inpath> <outpath> <tablename>"); System.exit(2); } conf.set("tabel_name", otherArgs[2]); Job job = new Job(conf, "InsertDataToHBase"); job.setNumReduceTasks(0); job.setJarByClass(InsertDataToHBase.class); job.setMapperClass(InsertDataToHBaseMapper.class); Path inputPath=new Path(otherArgs[0]); Path outputPath=new Path(otherArgs[1]); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); FileSystem hdfs=FileSystem.get(conf); if(hdfs.exists(outputPath)) { hdfs.delete(outputPath); } // job.submit(); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关文章推荐
- hadoop学习笔记之mapreduce 基于hbase日志数据的最频繁访问ip统计
- Hadoop学习--HBase与MapReduce的使用
- Hadoop学习--HBase与MapReduce的使用
- Hadoop学习笔记(一)HBase脚本分析(三)hbase
- 文件数据云计算学习笔记---Hadoop HDFS和MapReduce 架构浅析
- 【hadoop】Hadoop学习笔记(七):使用distcp并行拷贝大数据文件
- Hadoop学习笔记(四):HBase
- 开始hadoop前的准备:ubuntu学习笔记-基本环境的搭建(ssh的安装,SecureCRT连接,vim的安装及使用、jdk的安装)
- HBase/Hadoop学习笔记
- Hadoop学习笔记(一):MapReduce的输入格式
- 【hadoop学习笔记】How MapReduce Works
- hadoop 1.2.1 Eclipse mapreduce hello word 学习笔记(二)
- Hadoop学习笔记(一)HBase脚本分析(一)start-hbase.sh
- Hadoop学习笔记之四:运行MapReduce作业做集成测试
- Hadoop学习笔记(二):MapReduce的特性-计数器、排序
- Hadoop学习笔记一:MapReduce的工作机制
- Hadoop学习笔记(一)HBase脚本分析(二)hbase-daemon.sh
- Hadoop学习笔记(七):使用distcp并行拷贝大数据文件
- hadoop学习笔记之-hbase完全分布模式安装
- Hbase 学习(八) 使用MapReduce