通过MapReduce程序导出Hbase到Hadoop
2017-06-15 16:34
477 查看
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class ExportToHdfs {
public static class NewMapper extends TableMapper<Text, Text>{
@Override
//key是hbase中的行键
//value是hbase中的所行键的所有数据
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
Text v=null;
String columns = "";
List<Cell> cs=value.listCells();
for(Cell cell:cs){
columns += new String(CellUtil.cloneValue(cell)) + "|";
System.out.println(new String(key.get()) + " -->> " + new String(CellUtil.cloneValue(cell)));
}
columns = columns.substring(0, columns.length()-1);
context.write(new Text(key.get()), new Text(columns));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.42.132,192.168.42.131,192.168.42.130");
Job job = Job.getInstance(conf, ExportToHdfs.class.getSimpleName());
job.setJarByClass(ExportToHdfs.class);//将此类打成jar包
job.setMapperClass(NewMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
Scan scan=new Scan();
TableMapReduceUtil.initTableMapperJob(args[0], scan, NewMapper.class, Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2、导出为jar包,并把Hbase的lib路径添加到HADOOP_CLASSPATH。
[plain] view
plain copy
print?
export HADOOP_CLASSPATH=/home/takchi/Bigdata/hbase-1.2.4/lib/*:$HADOOP_CLASSPATH
3、运行。
[plain] view
plain copy
print?
bin/hadoop jar /home/takchi/Desktop/_export.jar chan.takchi.mr.ExportToHdfs students /tmp/students_mr
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class ExportToHdfs {
public static class NewMapper extends TableMapper<Text, Text>{
@Override
//key是hbase中的行键
//value是hbase中的所行键的所有数据
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
Text v=null;
String columns = "";
List<Cell> cs=value.listCells();
for(Cell cell:cs){
columns += new String(CellUtil.cloneValue(cell)) + "|";
System.out.println(new String(key.get()) + " -->> " + new String(CellUtil.cloneValue(cell)));
}
columns = columns.substring(0, columns.length()-1);
context.write(new Text(key.get()), new Text(columns));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.42.132,192.168.42.131,192.168.42.130");
Job job = Job.getInstance(conf, ExportToHdfs.class.getSimpleName());
job.setJarByClass(ExportToHdfs.class);//将此类打成jar包
job.setMapperClass(NewMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
Scan scan=new Scan();
TableMapReduceUtil.initTableMapperJob(args[0], scan, NewMapper.class, Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2、导出为jar包,并把Hbase的lib路径添加到HADOOP_CLASSPATH。
[plain] view
plain copy
print?
export HADOOP_CLASSPATH=/home/takchi/Bigdata/hbase-1.2.4/lib/*:$HADOOP_CLASSPATH
3、运行。
[plain] view
plain copy
print?
bin/hadoop jar /home/takchi/Desktop/_export.jar chan.takchi.mr.ExportToHdfs students /tmp/students_mr
相关文章推荐
- 通过MapReduce程序导出Hbase到Hadoop
- HBase、Hive、MapReduce、Hadoop、Spark 开发环境搭建后的一些步骤(export导出jar包方式 或 Ant 方式)
- 基于HBase Hadoop 分布式集群环境下的MapReduce程序开发
- 通过mapreduce程序读取hdfs文件写入hbase
- 【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce
- 如何在Hadoop上编写MapReduce程序
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- crawler with data analysis (Hadoop, MapReduce, HBase) - Phase I - Data Modeling
- crawler with data analysis (Hadoop, MapReduce, HBase) - Phase I - Data Modeling
- 如何在Hadoop上编写MapReduce程序
- Hadoop学习全程记录——在Eclipse中运行第一个MapReduce程序
- 用PHP编写Hadoop的MapReduce程序
- 通过外接程序将Outlook邮件导出成Word文档
- Hadoop示例程序之单词统计MapReduce
- Hbase通过 Mapreduce 写入数据到Mysql
- 用PHP写hadoop的mapreduce程序
- hadoop MapReduce程序 不包含Reduce的设置
- 通过外接程序将Outlook邮件导出成Word文档!
- hadoop hdfs搭建 mapreduce环境搭建 wordcount程序简单注释
- Hadoop - Map/Reduce 通过理解org.apache.hadoop.mapreduce.Job类来学习hadoop的执行逻辑