您的位置:首页 > 大数据 > Hadoop

从hdfs读取数据写入hbase

2014-11-16 11:31 609 查看
首先介绍几个建立hbase连接必须的API

hbase

-------------------------------------------------------------------------------

Configuration

Client端通过该接口获取HBase的配置环境,如存储地址,zookeeper等信息。建立:

Configuration config=HBaseConfiguration.create();
HBaseAdmin
HBaseAdmin用于创建数据库表格,并管理表格的元数据信息.建立:
HBaseAdmin admin=new HBaseAdmin(config);

HConnection

hbase客户端到hbase集群的连接。建立:
Hconnection hcnnection = HconnectionManager.createConnection(config);
其中,HconnectionManager是一个不可实例化的类,该类为连接池的关键,专门用于创建HConnection

hdfs

------------------------------------------------------------------------------

Configuration

hadoop有独立的配置文件管理系统,并提供自己的API,即使用org.apache.hadoop.conf.Configuration处理配置信息。使用Configuration类的一般过程是:构造Configuration对象,并通过类的addResource()方法添加需要加载的资源;然后就可以使用get*方法和set*方法访问/设置配置项,资源会在第一次使用的时候自动加载到对象中。

FileSystem

FileSystem是用户操作hdfs的核心类,用于获得URI对应的hdfs文件系统。建立:

FileSystem fs = null;

fs = FileSystem.get(URI.create(filePath), hadoopConf);

---------------------------------------------------------------------------------------------

示例

public static void bulkInsert(String filePath, String tableName) {

Configuration conf = null;

HConnection conn = null;

HBaseAdmin hAdmin = null;

HTableInterface htableInterface = null;

FileSystem fs = null;

BufferedReader bufferedReader = null;

try {

conf = HBaseConfiguration.create();

conf.addResource("hbase-site.xml");

conn = HConnectionManager.createConnection(conf);

hAdmin = new HBaseAdmin(conn);

TableName table = TableName.valueOf(tableName);

String columnFamily = "cf";//列族

// 若表不存在则新建表

if (!hAdmin.tableExists(table)) {

HTableDescriptor htableDescriptor = new HTableDescriptor(table);

htableDescriptor.addFamily(new HColumnDescriptor(columnFamily));

hAdmin.createTable(htableDescriptor);

hAdmin.close();

}

htableInterface = conn.getTable(tableName);

htableInterface.setAutoFlush(false, false);

String line = null;

//得到hdfs配置

Configuration hadoopConf = new Configuration();

hadoopConf.addResource("core-site.xml");

hadoopConf.addResource("hdfs-site.xml");

hadoopConf.set("fs.hdfs.impl",

org.apache.hadoop.hdfs.DistributedFileSystem.class

.getName());

hadoopConf.set("fs.file.impl",

org.apache.hadoop.fs.LocalFileSystem.class.getName());

fs = FileSystem.get(URI.create(filePath), hadoopConf);//操作hdfs文件系统

InputStream hdfsInStream = null;

hdfsInStream = fs.open(filepath);

bufferedReader = new BufferedReader(new InputStreamReader(

hdfsInStream));

while ((line = bufferedReader.readLine()) != null) {

Put put = new Put(

Bytes.toBytes(“”);//Rowkey

put.add(Bytes.toBytes(columnFamily),Bytes.toBytes("列名“), Bytes.toBytes(值));//可以加入多个列

htableInterface.put(put);

}

bufferedReader.close();

hdfsInStream.close();

}

}

htableInterface.close();

conn.close();

} catch (Exception e) {

e.printStackTrace();

} finally {

if (bufferedReader != null) {

try {

bufferedReader.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if (fs != null) {

try {

fs.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if (htableInterface != null) {

try {

htableInterface.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if (conn != null) {

try {

conn.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: