您的位置:首页 > 运维架构

通过MapReduce程序导出Hbase到Hadoop

2017-06-15 16:34 477 查看
import java.io.IOException;  

import java.util.List;  

  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.hbase.Cell;  

import org.apache.hadoop.hbase.CellUtil;  

import org.apache.hadoop.hbase.HBaseConfiguration;  

import org.apache.hadoop.hbase.client.Result;  

import org.apache.hadoop.hbase.client.Scan;  

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  

import org.apache.hadoop.hbase.mapreduce.TableMapper;  

import org.apache.hadoop.hbase.util.Bytes;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.Mapper;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

  

  

public class ExportToHdfs {  

  

    public static class NewMapper extends TableMapper<Text, Text>{  

        @Override  

  

        //key是hbase中的行键  

        //value是hbase中的所行键的所有数据  

        protected void map(ImmutableBytesWritable key, Result value,  

                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)  

                        throws IOException, InterruptedException {  

            Text v=null;  

            String columns = "";  

            List<Cell> cs=value.listCells();  

            for(Cell cell:cs){  

                columns += new String(CellUtil.cloneValue(cell)) + "|";  

                System.out.println(new String(key.get()) + " -->> " + new String(CellUtil.cloneValue(cell)));  

            }  

            columns = columns.substring(0, columns.length()-1);  

            context.write(new Text(key.get()), new Text(columns));  

        }  

  

    }  

  

  

  

  

    public static void main(String[] args) throws Exception {  

        Configuration conf = HBaseConfiguration.create();   

        conf.set("hbase.zookeeper.quorum", "192.168.42.132,192.168.42.131,192.168.42.130");  

        Job job = Job.getInstance(conf, ExportToHdfs.class.getSimpleName());   

        job.setJarByClass(ExportToHdfs.class);//将此类打成jar包  

  

        job.setMapperClass(NewMapper.class);   

        job.setMapOutputKeyClass(Text.class);   

        job.setMapOutputValueClass(Text.class);  

        job.setOutputFormatClass(TextOutputFormat.class);   

        job.setNumReduceTasks(0);   

          

        Scan scan=new Scan();  

  

        TableMapReduceUtil.initTableMapperJob(args[0], scan, NewMapper.class, Text.class, Text.class, job);  

  

        FileOutputFormat.setOutputPath(job, new Path(args[1]));   

  

        System.exit(job.waitForCompletion(true) ? 0 : 1);  

    }  

}  

2、导出为jar包,并把Hbase的lib路径添加到HADOOP_CLASSPATH。

[plain] view
plain copy

 print?

export HADOOP_CLASSPATH=/home/takchi/Bigdata/hbase-1.2.4/lib/*:$HADOOP_CLASSPATH  

3、运行。

[plain] view
plain copy

 print?

bin/hadoop jar /home/takchi/Desktop/_export.jar chan.takchi.mr.ExportToHdfs  students /tmp/students_mr  

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: