Hbase总结(三)--使用spring-data-hadoop进行hbase的读写操作
2017-03-09 00:00
766 查看
摘要: spring-data-hadoop是spring-data项目的一部分。spring-data-hadoop项目中封装了对hbase的操作的API。本篇文章将展示一部分使用该框架对hbase进行读写操作的示例。
1.新建一个maven项目引入spring-data-hadoop框架
在pom文件中添加以下依赖
2.配置hbase的文件
将hbase的的配置放入到pom文件的profile中,注意hbase url是使用主机名host-name进行访问的
3.配置hbase的spring文件
4.使用实现数据查询方法
5.编写hbase的存储类
6.项目源代码地址
spring-hbase-demo
1.新建一个maven项目引入spring-data-hadoop框架
在pom文件中添加以下依赖
<!-- hbase 的主要依赖--> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.2.0.RELEASE</version> </dependency> <!-- hbase pom start --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.2.0</version> <type>pom</type> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.2</version> <scope>compile</scope> </dependency> <!-- hbase pom end --> <!-- hadoop config start --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> <scope>compile</scope> </dependency>
2.配置hbase的文件
将hbase的的配置放入到pom文件的profile中,注意hbase url是使用主机名host-name进行访问的
<profile> <id>develop_new</id> <properties> <!--hbase测试 --> <hbase-hdfs-dir>hdfs://hdfs-hbase-master:9000/hbase</hbase-hdfs-dir> <zookeeper-znode-parent-dir>/hbase</zookeeper-znode-parent-dir> <hbase-zookper-list>192.168.10.52,192.168.10.53,192.168.10.54</hbase-zookper-list> <hbase-hbase-zookper-port>2181</hbase-hbase-zookper-port> <hadoop-tmp-dir>/tmp/hbase-root</hadoop-tmp-dir> <config-file-name>hbase-site.xml</config-file-name> <log4j2-log-level>debug</log4j2-log-level> </properties> <activation> <activeByDefault>true</activeByDefault> </activation> </profile> </profiles>
3.配置hbase的spring文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd"> <context:property-placeholder location="hbase.properties"/> <!--扫描spring-data框架中的依赖--> <context:component-scan base-package="org.springframework.samples.hadoop.hbase"/> <hdp:configuration id="hadoopConfiguration"> ${hbase-hdfs-dir} </hdp:configuration> <!-- --> <hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="${hbase.zk.host}" zk-port="${hbase.zk.port}"> </hdp:hbase-configuration> <!-- 配置hbase config bean--> <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate"> <property name="configuration" ref="hbaseConfiguration"/> </bean> <!-- hadoop的配置文件--> <hdp:configuration resources="${config-file-name}"></hdp:configuration> </beans>
4.使用实现数据查询方法
/** * @Description: 实现Hbase部分查询 */ @Repository("nlocationInfoJDOImplNew") public class LocationInfoJDOImplNew implements InitializingBean { private static final Logger log = LogManager.getLogger(LocationInfoJDOImplNew.class); @Autowired private HbaseTemplate hbaseTemplate; @Resource(name = "hbaseConfiguration") private Configuration config; private HBaseAdmin admin; private static final String cf_name = "lf"; /** *从hbase中获取查询结果 */ public List<Object> queryLocDataByTable(List<String> rowkeyPrefix, Date startTime, Date endTime) { String tableName = ""; String cf_name = "demo"; byte[] cf_bytes = Bytes.toBytes(cf_name); tableName = "lo_20170309"; Scan scan = new Scan(); scan.setCaching(50);//全表扫描设置cache scan.addFamily(cf_name.getBytes()); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); try { //时间戳,过滤器 scan.setTimeRange(startTime.getTime(), endTime.getTime()); } catch (IOException e) { e.printStackTrace(); } //批量增加过滤条件 ,使用行键的PrefixFilter进行过滤 rowkeyPrefix.forEach(s -> { filterList.addFilter(new PrefixFilter(Bytes.toBytes(s))); }); scan.setFilter(filterList); //java8 特性 , 可以通过该方法设定返回值来实现 List<Object> entities = hbaseTemplate.find(tableName, scan, results -> { List<Object> objectList = new ArrayList<>(); Iterator<Result> iterator = results.iterator(); //这个迭代器只能获取一次 while (iterator.hasNext()) { //迭代获取数据 // SimpleLocationEntity temp = new SimpleLocationEntity(); Result next = iterator.next(); next.getFamilyMap(cf_bytes).forEach((bytes, values) -> { byte[] row = next.getRow(); System.out.println("rowKey:" + Bytes.toString(next.getRow(), 0, 7) + Bytes.toIntUnsafe(row, 7)); }); objectList.add(next); log.debug("get one recode"); } return objectList; }); return entities; } @Override public void afterPropertiesSet() throws Exception { Configuration configuration = config; admin = new HBaseAdmin(configuration); } }
5.编写hbase的存储类
@Repository public class LocationRepository implements InitializingBean { private static final Logger log = LogManager.getLogger(LocationRepository.class); @Autowired private HbaseTemplate hbaseTemplate; @Resource(name = "hbaseConfiguration") private Configuration config; private HBaseAdmin admin; public static final String gps_table_basic = "lo_"; public static byte[] CF_INFO = Bytes.toBytes("lf"); @Override public void afterPropertiesSet() throws Exception { admin = new HBaseAdmin(config); } /** * 数据发送到 Hbase数据库中 * 1.自定义业务前缀+ (一天的总毫秒数- 当前的时间在一天中的毫秒数) //得到最新数据总是在最前面的hbase表 * 2.使用map的层级结构自动映射为hbase的列族和列名 */ public byte[] saveObjSimpleFun(final HashMap<String, LinkedHashMap<String, Object>> srcData, final String businessID, final String tableName) { LocalDateTime now = LocalDateTime.now(); byte[] execute = hbaseTemplate.execute(tableName, table -> { byte[] add = Bytes.add(Bytes.toBytes(businessID), Bytes.toBytes(Integer.MAX_VALUE - now.get(ChronoField.MILLI_OF_DAY))); Put putObj = new Put(add); srcData.forEach((cf_name, file_map) -> file_map.forEach((filed_key, filed_value) -> { byte[] family = CF_INFO; log.debug("convert filed code is {}", filed_key); if (filed_value !=null ){ byte[] value = toBytes(filed_value); if (value.length >0) { putObj.addColumn(family, Bytes.toBytes(filed_key), value); } } })); table.put(putObj); return add; }); log.debug("end of save data to hbase ....."); return execute; } public byte[] toBytes(Object obj){ byte[] result = new byte[]{}; if (obj!=null){ if (obj instanceof String) { result = Bytes.toBytes((String)obj); }else if (obj instanceof Integer){ result = Bytes.toBytes((Integer)obj); }else if(obj instanceof Date){ result = Bytes.toBytes(obj.toString()); }else if (obj instanceof Short){ result = Bytes.toBytes((Short)obj); }else if (obj instanceof Long){ result = Bytes.toBytes((Long)obj); }else if (obj instanceof Double){ result = Bytes.toBytes((Double) obj); }else if (obj instanceof Float){ result = Bytes.toBytes((Float) obj); }else if (obj instanceof Boolean){ result = Bytes.toBytes((Boolean) obj); }else if (obj instanceof BigDecimal){ result = Bytes.toBytes((BigDecimal)obj); }else if (obj instanceof ByteBuffer){ result = Bytes.toBytes((ByteBuffer) obj); }else if (obj instanceof ArrayList){ result = convertArrayList(obj); } else { System.out.println("invalid Data Type....." + obj.toString()); } } return result; } private byte[] convertArrayList(Object object){ ArrayList<?> arrayList = (ArrayList) object; final byte[] result = new byte[arrayList.size()]; for (int i = 0; i < arrayList.size(); i++) { Object o = arrayList.get(i); if (o instanceof Integer) { result[i] = (byte) (int) o; }else { System.out.println("error convertArrayList"); } } return result; } }
6.项目源代码地址
spring-hbase-demo
相关文章推荐
- 使用spring-data-redis进行对redis的操作,封装的一些操作方法
- 使用Spring-data进行Redis操作
- 使用spring-data-jpa的JpaRepository来进行类的db操作(配置)
- 使用spring-data-redis进行对redis的操作,封装的一些操作方法
- Spring中使用getSession()与通过HibernateTemplate进行数据操作的差别
- C#不使用DataSet操作XML,XmlDocument读写xml所有节点及读取xml节点的数据总结
- 使用C++(通过Thrift)访问/操作/读写Hbase
- 使用hadoop的datajoin包进行关系型join操作
- Fileatream表示文件流,它能够打开和关闭文件,并对文件进行单字节的读写操作。 StreamReader和StreamWriter以文本方式对流进行读写操作。建立一个文本文件,分别使用上面两种方
- 转载:SQL through PowerShell 使用Powershell访问SQL并对数据进行读写增加删除操作
- 个人hadoop学习总结:Hadoop集群+HBase集群+Zookeeper集群+chukwa监控(包括单机、伪分布、完全分布安装操作)
- 使用Spring BlazeDS Integration进行数据推送服务(push data)
- Hadoop总结:在java中使用FileSystem的api读写数据(能力工场)
- 使用C++(通过Thrift)访问/操作/读写Hbase
- 转载:使用Spring进行数据访问(Data Access With Spring)
- Hadoop编程学习(四):使用FileSystem类进行文件读写及查看文件信息
- 使用Spring进行数据访问(Data Access With Spring)
- Spring中使用getSession()与通过HibernateTemplate进行数据操作的差别
- 分别使用(字符流)和(字节流)对文件进行读写操作
- C#使用DOM进行XML文件的读写操作介绍