您的位置:首页 > 其它

HBase导入TXT文件

2016-12-19 13:50 316 查看
1.文件导入HDFS

创建一个txt测试文件
(列与列之间用 ","分割)
vi test.txt

将文件传入HDFS
hadoop fs -put /user/training

查看目录下的文件
hadoop fs -ls /user/training
查看文件内容
hadoop fs -cat test.txt

2.在Hbase中新建表

hbase(main):004:0> create 'dbtest','cf'

3.在eclipse中运行下面的代码

import java.io.IOException;

import java.text.SimpleDateFormat;

import java.util.Date;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Counter;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class BatchImport {

static Configuration conf = null;

static {

conf = HBaseConfiguration.create();

}

static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{

SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");

Text v2 = new Text();

protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {

final String[] splited = value.toString().split(",");

try {

final Date date = new Date(Long.parseLong(splited[0].trim()));

final String dateFormat = dateformat1.format(date);

String rowKey = splited[1]+":"+dateFormat;

v2.set(rowKey+","+value.toString());

context.write(key, v2);

} catch (NumberFormatException e) {

final Counter counter = context.getCounter("BatchImport", "ErrorFormat");

counter.increment(1L);

System.out.println("error"+splited[0]+" "+e.getMessage());

}

};

}

static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{

protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {

for (Text text : values) {

final String[] splited = text.toString().split(",");

for(int i=0;i<splited.length;i++){

System.out.println(splited[i]);

}

final Put put = new Put(Bytes.toBytes(splited[0]));

put.add(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes(splited[1]));

put.add(Bytes.toBytes("cf"), Bytes.toBytes("col2"), Bytes.toBytes(splited[2]));

put.add(Bytes.toBytes("cf"),Bytes.toBytes("col3"), Bytes.toBytes(splited[3]));

context.write(NullWritable.get(), put);

}

};

}

public static void main(String[] args) throws Exception {

conf.set("hbase.zookeeper.quorum", "localhost");

conf.set(TableOutputFormat.OUTPUT_TABLE, "dbtest");

conf.set("dfs.socket.timeout", "180000");

final Job job = new Job(conf, "HBaseBatchImport");

job.setMapperClass(BatchImportMapper.class);

job.setReducerClass(BatchImportReducer.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TableOutputFormat.class);

FileInputFormat.setInputPaths(job, "hdfs://localhost:8020/user/training/test_2.txt");

job.waitForCompletion(true);

}

}

4.查看HBase中数据

hbase(main):004:0> scan 'dbtest'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: