从hdfs读取数据写入hbase
2014-11-16 11:31
609 查看
首先介绍几个建立hbase连接必须的API
hbase
-------------------------------------------------------------------------------
Configuration
Client端通过该接口获取HBase的配置环境,如存储地址,zookeeper等信息。建立:
Configuration config=HBaseConfiguration.create();
HBaseAdmin
HBaseAdmin用于创建数据库表格,并管理表格的元数据信息.建立:
HBaseAdmin admin=new HBaseAdmin(config);
HConnection
hbase客户端到hbase集群的连接。建立:
Hconnection hcnnection = HconnectionManager.createConnection(config);
其中,HconnectionManager是一个不可实例化的类,该类为连接池的关键,专门用于创建HConnection
hdfs
------------------------------------------------------------------------------
Configuration
hadoop有独立的配置文件管理系统,并提供自己的API,即使用org.apache.hadoop.conf.Configuration处理配置信息。使用Configuration类的一般过程是:构造Configuration对象,并通过类的addResource()方法添加需要加载的资源;然后就可以使用get*方法和set*方法访问/设置配置项,资源会在第一次使用的时候自动加载到对象中。
FileSystem
FileSystem是用户操作hdfs的核心类,用于获得URI对应的hdfs文件系统。建立:
FileSystem fs = null;
fs = FileSystem.get(URI.create(filePath), hadoopConf);
---------------------------------------------------------------------------------------------
示例
public static void bulkInsert(String filePath, String tableName) {
Configuration conf = null;
HConnection conn = null;
HBaseAdmin hAdmin = null;
HTableInterface htableInterface = null;
FileSystem fs = null;
BufferedReader bufferedReader = null;
try {
conf = HBaseConfiguration.create();
conf.addResource("hbase-site.xml");
conn = HConnectionManager.createConnection(conf);
hAdmin = new HBaseAdmin(conn);
TableName table = TableName.valueOf(tableName);
String columnFamily = "cf";//列族
// 若表不存在则新建表
if (!hAdmin.tableExists(table)) {
HTableDescriptor htableDescriptor = new HTableDescriptor(table);
htableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
hAdmin.createTable(htableDescriptor);
hAdmin.close();
}
htableInterface = conn.getTable(tableName);
htableInterface.setAutoFlush(false, false);
String line = null;
//得到hdfs配置
Configuration hadoopConf = new Configuration();
hadoopConf.addResource("core-site.xml");
hadoopConf.addResource("hdfs-site.xml");
hadoopConf.set("fs.hdfs.impl",
org.apache.hadoop.hdfs.DistributedFileSystem.class
.getName());
hadoopConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
fs = FileSystem.get(URI.create(filePath), hadoopConf);//操作hdfs文件系统
InputStream hdfsInStream = null;
hdfsInStream = fs.open(filepath);
bufferedReader = new BufferedReader(new InputStreamReader(
hdfsInStream));
while ((line = bufferedReader.readLine()) != null) {
Put put = new Put(
Bytes.toBytes(“”);//Rowkey
put.add(Bytes.toBytes(columnFamily),Bytes.toBytes("列名“), Bytes.toBytes(值));//可以加入多个列
htableInterface.put(put);
}
bufferedReader.close();
hdfsInStream.close();
}
}
htableInterface.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (htableInterface != null) {
try {
htableInterface.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
hbase
-------------------------------------------------------------------------------
Configuration
Client端通过该接口获取HBase的配置环境,如存储地址,zookeeper等信息。建立:
Configuration config=HBaseConfiguration.create();
HBaseAdmin
HBaseAdmin用于创建数据库表格,并管理表格的元数据信息.建立:
HBaseAdmin admin=new HBaseAdmin(config);
HConnection
hbase客户端到hbase集群的连接。建立:
Hconnection hcnnection = HconnectionManager.createConnection(config);
其中,HconnectionManager是一个不可实例化的类,该类为连接池的关键,专门用于创建HConnection
hdfs
------------------------------------------------------------------------------
Configuration
hadoop有独立的配置文件管理系统,并提供自己的API,即使用org.apache.hadoop.conf.Configuration处理配置信息。使用Configuration类的一般过程是:构造Configuration对象,并通过类的addResource()方法添加需要加载的资源;然后就可以使用get*方法和set*方法访问/设置配置项,资源会在第一次使用的时候自动加载到对象中。
FileSystem
FileSystem是用户操作hdfs的核心类,用于获得URI对应的hdfs文件系统。建立:
FileSystem fs = null;
fs = FileSystem.get(URI.create(filePath), hadoopConf);
---------------------------------------------------------------------------------------------
示例
public static void bulkInsert(String filePath, String tableName) {
Configuration conf = null;
HConnection conn = null;
HBaseAdmin hAdmin = null;
HTableInterface htableInterface = null;
FileSystem fs = null;
BufferedReader bufferedReader = null;
try {
conf = HBaseConfiguration.create();
conf.addResource("hbase-site.xml");
conn = HConnectionManager.createConnection(conf);
hAdmin = new HBaseAdmin(conn);
TableName table = TableName.valueOf(tableName);
String columnFamily = "cf";//列族
// 若表不存在则新建表
if (!hAdmin.tableExists(table)) {
HTableDescriptor htableDescriptor = new HTableDescriptor(table);
htableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
hAdmin.createTable(htableDescriptor);
hAdmin.close();
}
htableInterface = conn.getTable(tableName);
htableInterface.setAutoFlush(false, false);
String line = null;
//得到hdfs配置
Configuration hadoopConf = new Configuration();
hadoopConf.addResource("core-site.xml");
hadoopConf.addResource("hdfs-site.xml");
hadoopConf.set("fs.hdfs.impl",
org.apache.hadoop.hdfs.DistributedFileSystem.class
.getName());
hadoopConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
fs = FileSystem.get(URI.create(filePath), hadoopConf);//操作hdfs文件系统
InputStream hdfsInStream = null;
hdfsInStream = fs.open(filepath);
bufferedReader = new BufferedReader(new InputStreamReader(
hdfsInStream));
while ((line = bufferedReader.readLine()) != null) {
Put put = new Put(
Bytes.toBytes(“”);//Rowkey
put.add(Bytes.toBytes(columnFamily),Bytes.toBytes("列名“), Bytes.toBytes(值));//可以加入多个列
htableInterface.put(put);
}
bufferedReader.close();
hdfsInStream.close();
}
}
htableInterface.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (bufferedReader != null) {
try {
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (htableInterface != null) {
try {
htableInterface.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
相关文章推荐
- HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引
- MapReduce中,从HDFS读取数据计算后写入HBase
- 【HBase基础教程】7、HBase之读取HBase数据写入HDFS
- 读取HDFS文件中的数据写入到HBase的表中
- Mapreduce读取和写入Hbase(从A表读取数据,统计结果放入B表,非常详细,附有代码说明以及流程)
- spark将数据写入hbase以及从hbase读取数据
- 多线程从MongoDB读取数据,并以固定大小写入HDFS
- HDFS如何读取文件以及写入文件-加米谷大数据
- HDFS的存储结构以及写入、读取hdfs数据操作流程简单总结
- 使用MapReduce从HBase中读取数据存入HDFS路径问题
- Spark将数据写入Hbase以及从Hbase读取数据
- Hadoop学习笔记——1.java读取Oracle中表的数据,创建新文件写入Hdfs
- 用MapReduce把hdfs数据写入HBase中
- hadoop MR从hbase中读取数据写入到hbase中
- park将数据写入hbase以及从hbase读取数据
- hbase写入性能测试(从hdfs向hbase写入数据)
- 笔记:HDFS读取和写入数据流
- 从hbase表1中读取数据,最终结果写入到hbase表2 ,如何通过MapReduce实现 ?
- 通过mapreduce程序读取hdfs文件写入hbase
- 读取HDFS写入HBase