读取HDFS文件中的数据写入到HBase的表中
2018-04-01 18:00
866 查看
核心:Our_Reducer继承TableReducer.
准备工作:
将数据文件上传到HDFS :hadoop fs -mkdir -p /student/input
hadoop fs -put /student.txt /student/input在HBase中创建相关的表(可以通过命令也可以通过代码):create 'users', 'info'MR代码:import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HDFS2HBASEMR extends Configured implements Tool{
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "hadoop2:2181,hadoop3:2181,hadoop4:2181";
public static void main(String[] args) throws Exception {
int isDone = ToolRunner.run(new HDFS2HBASEMR(), args);
System.exit(isDone);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
conf.set("fs.defaultFS", "hdfs://myha1");
conf.addResource("config/core-site.xml");
conf.addResource("config/hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
Job job = Job.getInstance(conf, "hdfs2hbase");
job.setJarByClass(HDFS2HBASEMR.class);
/**
* mapper
*/
job.setMapperClass(H2H_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//指定输入格式 可以不用指明 默认的是这个
job.setInputFormatClass(TextInputFormat.class);
/**
* reducer
*/
//因为要往表里面的插入数据 所以使用:initTableReducerJob
TableMapReduceUtil.initTableReducerJob("users", H2H_Reducer.class, job, null, null, null, null, false);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPath(job, new Path("/student/input"));
boolean isDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
/*
* mapper端直接输出到reduce端进行处理
*/
public static class H2H_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class H2H_Reducer extends TableReducer<Text, NullWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values ,Context context) throws IOException, InterruptedException {
/**
* key === 95011,包小柏,男,18,MA
*
* 95001: rowkey
* 包小柏 : name
* 18 : age
* 男 : sex
* MA : department
*
* column family : cf
*/
String[] lines = key.toString().split(",");
Put put = new Put(lines[0].g
4000
etBytes());
put.addColumn("info".getBytes(), "name".getBytes(), lines[1].getBytes());
put.addColumn("info".getBytes(), "gender".getBytes(), lines[2].getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), lines[3].getBytes());
put.addColumn("info".getBytes(), "department".getBytes(), lines[4].getBytes());
context.write(NullWritable.get(), put);
}
}
}数据:student.txt
准备工作:
将数据文件上传到HDFS :hadoop fs -mkdir -p /student/input
hadoop fs -put /student.txt /student/input在HBase中创建相关的表(可以通过命令也可以通过代码):create 'users', 'info'MR代码:import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HDFS2HBASEMR extends Configured implements Tool{
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "hadoop2:2181,hadoop3:2181,hadoop4:2181";
public static void main(String[] args) throws Exception {
int isDone = ToolRunner.run(new HDFS2HBASEMR(), args);
System.exit(isDone);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
conf.set("fs.defaultFS", "hdfs://myha1");
conf.addResource("config/core-site.xml");
conf.addResource("config/hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
Job job = Job.getInstance(conf, "hdfs2hbase");
job.setJarByClass(HDFS2HBASEMR.class);
/**
* mapper
*/
job.setMapperClass(H2H_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//指定输入格式 可以不用指明 默认的是这个
job.setInputFormatClass(TextInputFormat.class);
/**
* reducer
*/
//因为要往表里面的插入数据 所以使用:initTableReducerJob
TableMapReduceUtil.initTableReducerJob("users", H2H_Reducer.class, job, null, null, null, null, false);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPath(job, new Path("/student/input"));
boolean isDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
/*
* mapper端直接输出到reduce端进行处理
*/
public static class H2H_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class H2H_Reducer extends TableReducer<Text, NullWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values ,Context context) throws IOException, InterruptedException {
/**
* key === 95011,包小柏,男,18,MA
*
* 95001: rowkey
* 包小柏 : name
* 18 : age
* 男 : sex
* MA : department
*
* column family : cf
*/
String[] lines = key.toString().split(",");
Put put = new Put(lines[0].g
4000
etBytes());
put.addColumn("info".getBytes(), "name".getBytes(), lines[1].getBytes());
put.addColumn("info".getBytes(), "gender".getBytes(), lines[2].getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), lines[3].getBytes());
put.addColumn("info".getBytes(), "department".getBytes(), lines[4].getBytes());
context.write(NullWritable.get(), put);
}
}
}数据:student.txt
95002,刘晨,女,19,IS 95017,王风娟,女,18,IS 95018,王一,女,19,IS 95013,冯伟,男,21,CS 95014,王小丽,女,19,CS 95019,邢小丽,女,19,IS 95020,赵钱,男,21,IS 95003,王敏,女,22,MA 95004,张立,男,19,IS 95012,孙花,女,20,CS 95010,孔小涛,男,19,CS 95005,刘刚,男,18,MA 95006,孙庆,男,23,CS 95007,易思玲,女,19,MA 95008,李娜,女,18,CS 95021,周二,男,17,MA 95022,郑明,男,20,MA 95001,李勇,男,20,CS 95011,包小柏,男,18,MA 95009,梦圆圆,女,18,MA 95015,王君,男,18,MA
相关文章推荐
- 从hdfs读取数据写入hbase
- MapReduce中,从HDFS读取数据计算后写入HBase
- spark读取hdfs上的文件和写入数据到hdfs上面
- Hadoop学习笔记——1.java读取Oracle中表的数据,创建新文件写入Hdfs
- 通过mapreduce程序读取hdfs文件写入hbase
- HDFS如何读取文件以及写入文件-加米谷大数据
- HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引
- 【HBase基础教程】7、HBase之读取HBase数据写入HDFS
- 用java读取一个文件往hbase里插入数据(List<PUT>)
- python Pandas 读取数据,写入文件
- 从摄像头中读取数据并写入文件
- 笔记:HDFS读取和写入数据流
- C/C++文件——数据写入、读取
- XCode数据类型转换代码 文件读取,写入,XY坐标获取,ASCII转换等
- 如何通过JDBC向数据库写入/读取大数据文件?
- ean13码的生成,python读取csv中数据并处理返回并写入到另一个csv文件中
- C#读取EXCEL 文件同时向文件中写入数据和Excel的Range对象
- c语言创建写入和读取TXT文件数据
- HDFS文件读取和写入
- ios开发系列之文件的写入读取,NSUserDefaults存储数据,NSFileManager操作文件和文件夹