您的位置:首页 > 编程语言 > Java开发

Hbase总结(三)--使用spring-data-hadoop进行hbase的读写操作

2017-03-09 00:00 766 查看
摘要: spring-data-hadoop是spring-data项目的一部分。spring-data-hadoop项目中封装了对hbase的操作的API。本篇文章将展示一部分使用该框架对hbase进行读写操作的示例。

1.新建一个maven项目引入spring-data-hadoop框架

在pom文件中添加以下依赖

<!-- hbase 的主要依赖-->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<!-- hbase pom start -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.2.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.2</version>
<scope>compile</scope>
</dependency>
<!-- hbase pom end -->

<!-- hadoop config start -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
<scope>compile</scope>
</dependency>

2.配置hbase的文件

将hbase的的配置放入到pom文件的profile中,注意hbase url是使用主机名host-name进行访问的

<profile>
<id>develop_new</id>
<properties>
<!--hbase测试 -->
<hbase-hdfs-dir>hdfs://hdfs-hbase-master:9000/hbase</hbase-hdfs-dir>
<zookeeper-znode-parent-dir>/hbase</zookeeper-znode-parent-dir>
<hbase-zookper-list>192.168.10.52,192.168.10.53,192.168.10.54</hbase-zookper-list>
<hbase-hbase-zookper-port>2181</hbase-hbase-zookper-port>
<hadoop-tmp-dir>/tmp/hbase-root</hadoop-tmp-dir>
<config-file-name>hbase-site.xml</config-file-name>
<log4j2-log-level>debug</log4j2-log-level>

</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
</profiles>

3.配置hbase的spring文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:hdp="http://www.springframework.org/schema/hadoop"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> 
<context:property-placeholder location="hbase.properties"/>

<!--扫描spring-data框架中的依赖-->
<context:component-scan base-package="org.springframework.samples.hadoop.hbase"/>

<hdp:configuration id="hadoopConfiguration">
${hbase-hdfs-dir}
</hdp:configuration>
<!-- -->

<hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="${hbase.zk.host}" zk-port="${hbase.zk.port}">
</hdp:hbase-configuration>

<!-- 配置hbase config bean-->
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
<property name="configuration" ref="hbaseConfiguration"/>
</bean>
<!-- hadoop的配置文件-->
<hdp:configuration resources="${config-file-name}"></hdp:configuration>
</beans>

4.使用实现数据查询方法

/**
* @Description: 实现Hbase部分查询
*/
@Repository("nlocationInfoJDOImplNew")
public class LocationInfoJDOImplNew implements InitializingBean {
private static final Logger log = LogManager.getLogger(LocationInfoJDOImplNew.class);
@Autowired
private HbaseTemplate hbaseTemplate;
@Resource(name = "hbaseConfiguration")
private Configuration config;
private HBaseAdmin admin;

private static final String cf_name = "lf";
/**
*从hbase中获取查询结果
*/
public List<Object> queryLocDataByTable(List<String> rowkeyPrefix, Date startTime, Date endTime) {

String tableName = "";
String cf_name = "demo";
byte[] cf_bytes = Bytes.toBytes(cf_name);

tableName = "lo_20170309";
Scan scan = new Scan();
scan.setCaching(50);//全表扫描设置cache
scan.addFamily(cf_name.getBytes());
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
try {
//时间戳,过滤器
scan.setTimeRange(startTime.getTime(), endTime.getTime());
} catch (IOException e) {
e.printStackTrace();
}

//批量增加过滤条件 ,使用行键的PrefixFilter进行过滤
rowkeyPrefix.forEach(s -> {
filterList.addFilter(new PrefixFilter(Bytes.toBytes(s)));
});
scan.setFilter(filterList);
//java8 特性 , 可以通过该方法设定返回值来实现

List<Object> entities = hbaseTemplate.find(tableName, scan, results -> {
List<Object> objectList = new ArrayList<>();
Iterator<Result> iterator = results.iterator(); //这个迭代器只能获取一次
while (iterator.hasNext()) {
//迭代获取数据
//                        SimpleLocationEntity temp = new SimpleLocationEntity();
Result next = iterator.next();
next.getFamilyMap(cf_bytes).forEach((bytes, values) -> {
byte[] row = next.getRow();
System.out.println("rowKey:" + Bytes.toString(next.getRow(), 0, 7) + Bytes.toIntUnsafe(row, 7));
});
objectList.add(next);
log.debug("get one recode");
}
return objectList;
});
return entities;
}

@Override
public void afterPropertiesSet() throws Exception {
Configuration configuration = config;
admin = new HBaseAdmin(configuration);
}
}

5.编写hbase的存储类

@Repository
public class LocationRepository  implements InitializingBean {
private static final Logger log = LogManager.getLogger(LocationRepository.class);
@Autowired
private HbaseTemplate hbaseTemplate;
@Resource(name = "hbaseConfiguration")
private Configuration config;
private HBaseAdmin admin;

public static final String gps_table_basic = "lo_";
public static byte[] CF_INFO = Bytes.toBytes("lf");
@Override
public void afterPropertiesSet() throws Exception {
admin = new HBaseAdmin(config);
}
/**
* 数据发送到 Hbase数据库中
* 1.自定义业务前缀+ (一天的总毫秒数- 当前的时间在一天中的毫秒数) //得到最新数据总是在最前面的hbase表
* 2.使用map的层级结构自动映射为hbase的列族和列名
*/
public byte[] saveObjSimpleFun(final HashMap<String, LinkedHashMap<String, Object>> srcData, final String businessID, final String tableName) {
LocalDateTime now = LocalDateTime.now();
byte[] execute = hbaseTemplate.execute(tableName, table -> {
byte[] add = Bytes.add(Bytes.toBytes(businessID), Bytes.toBytes(Integer.MAX_VALUE - now.get(ChronoField.MILLI_OF_DAY)));
Put putObj = new Put(add);
srcData.forEach((cf_name, file_map)
-> file_map.forEach((filed_key, filed_value) -> {
byte[] family = CF_INFO;
log.debug("convert filed code is {}", filed_key);
if (filed_value !=null ){

byte[] value = toBytes(filed_value);
if (value.length >0) {
putObj.addColumn(family, Bytes.toBytes(filed_key), value);
}
}
}));
table.put(putObj);
return add;
});
log.debug("end of save data to hbase .....");
return execute;
}

public byte[] toBytes(Object obj){
byte[] result = new byte[]{};
if (obj!=null){
if (obj instanceof String) {
result = Bytes.toBytes((String)obj);
}else if (obj instanceof Integer){
result = Bytes.toBytes((Integer)obj);
}else if(obj instanceof Date){
result = Bytes.toBytes(obj.toString());
}else if (obj instanceof Short){
result = Bytes.toBytes((Short)obj);
}else if (obj instanceof Long){
result = Bytes.toBytes((Long)obj);
}else if (obj instanceof Double){
result = Bytes.toBytes((Double) obj);
}else if (obj instanceof Float){
result = Bytes.toBytes((Float) obj);
}else if (obj instanceof Boolean){
result = Bytes.toBytes((Boolean) obj);
}else if (obj instanceof BigDecimal){
result = Bytes.toBytes((BigDecimal)obj);
}else if (obj instanceof ByteBuffer){
result = Bytes.toBytes((ByteBuffer) obj);
}else if (obj instanceof ArrayList){
result = convertArrayList(obj);
} else {
System.out.println("invalid Data Type....." + obj.toString());
}
}
return result;
}

private byte[] convertArrayList(Object object){
ArrayList<?> arrayList = (ArrayList) object;
final byte[] result = new byte[arrayList.size()];
for (int i = 0; i < arrayList.size(); i++) {
Object o = arrayList.get(i);
if (o instanceof Integer) {
result[i] = (byte) (int) o;
}else {
System.out.println("error convertArrayList");
}
}
return result;
}
}

6.项目源代码地址
spring-hbase-demo
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hba Hadoop java
相关文章推荐