您的位置:首页 > 大数据 > Hadoop

读取HDFS文件中的数据写入到HBase的表中

2018-04-01 18:00 866 查看
核心:Our_Reducer继承TableReducer.
准备工作:
将数据文件上传到HDFS :hadoop fs -mkdir -p /student/input
hadoop fs -put /student.txt /student/input在HBase中创建相关的表(可以通过命令也可以通过代码):create 'users', 'info'MR代码:import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HDFS2HBASEMR extends Configured implements Tool{
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "hadoop2:2181,hadoop3:2181,hadoop4:2181";

public static void main(String[] args) throws Exception {
int isDone = ToolRunner.run(new HDFS2HBASEMR(), args);
System.exit(isDone);
}

@Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
conf.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
conf.set("fs.defaultFS", "hdfs://myha1");
conf.addResource("config/core-site.xml");
conf.addResource("config/hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");

Job job = Job.getInstance(conf, "hdfs2hbase");
job.setJarByClass(HDFS2HBASEMR.class);

/**
* mapper
*/
job.setMapperClass(H2H_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

//指定输入格式 可以不用指明 默认的是这个
job.setInputFormatClass(TextInputFormat.class);

/**
* reducer
*/
//因为要往表里面的插入数据 所以使用:initTableReducerJob
TableMapReduceUtil.initTableReducerJob("users", H2H_Reducer.class, job, null, null, null, null, false);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);

FileInputFormat.addInputPath(job, new Path("/student/input"));

boolean isDone = job.waitForCompletion(true);

return isDone ? 0 : 1;
}

/*
* mapper端直接输出到reduce端进行处理
*/
public static class H2H_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}

public static class H2H_Reducer extends TableReducer<Text, NullWritable, NullWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values ,Context context) throws IOException, InterruptedException {
/**
* key === 95011,包小柏,男,18,MA
*
* 95001: rowkey
* 包小柏 : name
* 18 : age
* 男 : sex
* MA : department
*
* column family : cf
*/
String[] lines = key.toString().split(",");
Put put = new Put(lines[0].g
4000
etBytes());

put.addColumn("info".getBytes(), "name".getBytes(), lines[1].getBytes());
put.addColumn("info".getBytes(), "gender".getBytes(), lines[2].getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), lines[3].getBytes());
put.addColumn("info".getBytes(), "department".getBytes(), lines[4].getBytes());

context.write(NullWritable.get(), put);
}

}

}数据:student.txt
95002,刘晨,女,19,IS
95017,王风娟,女,18,IS
95018,王一,女,19,IS
95013,冯伟,男,21,CS
95014,王小丽,女,19,CS
95019,邢小丽,女,19,IS
95020,赵钱,男,21,IS
95003,王敏,女,22,MA
95004,张立,男,19,IS
95012,孙花,女,20,CS
95010,孔小涛,男,19,CS
95005,刘刚,男,18,MA
95006,孙庆,男,23,CS
95007,易思玲,女,19,MA
95008,李娜,女,18,CS
95021,周二,男,17,MA
95022,郑明,男,20,MA
95001,李勇,男,20,CS
95011,包小柏,男,18,MA
95009,梦圆圆,女,18,MA
95015,王君,男,18,MA
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: