您的位置:首页 > 大数据 > Hadoop

HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引

2017-06-25 00:26 1101 查看

1. Hbase高级应用

1.1建表高级属性

下面几个shell 命令在hbase操作中可以起到很到的作用,且主要体现在建表的过程中,看下面几个create 属性

1、 BLOOMFILTER 默认是NONE 是否使用布隆过虑及使用何种方式

布隆过滤可以每列族单独启用。

使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 对列族单独启用布隆。

 Default = ROW 对行进行布隆过滤。

 对 ROW,行键的哈希在每次插入行时将被添加到布隆。

 对 ROWCOL,行键 + 列族 + 列族修饰的哈希将在每次插入行时添加到布隆

使用方法: create ‘table’,{BLOOMFILTER =>’ROW’}

启用布隆过滤可以节省读磁盘过程,可以有助于降低读取延迟

2、 VERSIONS 默认是1 这个参数的意思是数据保留1个 版本,如果我们认为我们的数据没有这么大的必要保留这么多,随时都在更新,而老版本的数据对我们毫无价值,那将此参数设为1 能节约2/3的空间

使用方法: create ‘table’,{VERSIONS=>’2’}

附:MIN_VERSIONS => ‘0’是说在compact操作执行之后,至少要保留的版本

3、 COMPRESSION 默认值是NONE 即不使用压缩

这个参数意思是该列族是否采用压缩,采用什么压缩算法

使用方法: create ‘table’,{NAME=>’info’,COMPRESSION=>’SNAPPY’}

建议采用SNAPPY压缩算法

HBase中,在Snappy发布之前(Google 2011年对外发布Snappy),采用的LZO算法,目标是达到尽可能快的压缩和解压速度,同时减少对CPU的消耗;

在Snappy发布之后,建议采用Snappy算法(参考《HBase: The Definitive Guide》),具体可以根据实际情况对LZO和Snappy做过更详细的对比测试后再做选择。

Algorithm% remainingEncodingDecoding
GZIP13.4%21 MB/s118 MB/s
LZO20.5%135 MB/s410 MB/s
Zippy/Snappy22.2%172 MB/s409 MB/s
如果建表之初没有压缩,后来想要加入压缩算法,可以通过alter修改schema

4. alter

使用方法:

如 修改压缩算法

disable ‘table’

alter ‘table’,{NAME=>’info’,COMPRESSION=>’snappy’}

enable ‘table’

但是需要执行major_compact ‘table’ 命令之后 才会做实际的操作。

5. TTL

默认是 2147483647 即:Integer.MAX_VALUE 值大概是68年

这个参数是说明该列族数据的存活时间,单位是s

这个参数可以根据具体的需求对数据设定存活时间,超过存过时间的数据将在表中不在显示,待下次major compact的时候再彻底删除数据

注意的是TTL设定之后 MIN_VERSIONS=>’0’ 这样设置之后,TTL时间戳过期后,将全部彻底删除该family下所有的数据,如果MIN_VERSIONS 不等于0那将保留最新的MIN_VERSIONS个版本的数据,其它的全部删除,比如MIN_VERSIONS=>’1’ 届时将保留一个最新版本的数据,其它版本的数据将不再保存。

6. describe ‘table’ 这个命令查看了create table 的各项参数或者是默认值。

7. disable_all ‘toplist.*’ disable_all 支持正则表达式,并列出当前匹配的表的如下:

toplist_a_total_1001

toplist_a_total_1002

toplist_a_total_1008

toplist_a_total_1009

toplist_a_total_1019

toplist_a_total_1035



Disable the above 25 tables (y/n)? 并给出确认提示

8. drop_all 这个命令和disable_all的使用方式是一样的

9. hbase 表预分区—-手动分区

默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按照region分区情况,在集群内做数据的负载均衡。

命令方式:

create ‘t1’, ‘f1’, {NUMREGIONS => 15, SPLITALGO => ‘HexStringSplit’}

也可以使用api的方式:

bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info

参数:

test_table是表名

HexStringSplit 是split 方式

-c 是分10个region

-f 是family

可在UI上查看结果,如图:



这样就可以将表预先分为15个区,减少数据达到storefile 大小的时候自动分区的时间消耗,并且还有以一个优势,就是合理设计rowkey 能让各个region 的并发请求平均分配(趋于均匀) 使IO 效率达到最高,但是预分区需要将filesize 设置一个较大的值,设置哪个参数呢 hbase.hregion.max.filesize 这个值默认是10G 也就是说单个region 默认大小是10G

这个参数的默认值在0.90 到0.92到0.94.3各版本的变化:256M–1G–10G

但是如果MapReduce Input类型为TableInputFormat 使用hbase作为输入的时候,就要注意了,每个region一个map,如果数据小于10G 那只会启用一个map 造成很大的资源浪费,这时候可以考虑适当调小该参数的值,或者采用预分配region的方式,并将检测如果达到这个值,再手动分配region。


1.2 hbase应用案例看行键设计

表结构设计

1、列族数量的设定

以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族;

2、行键的设计

语音详单:

13877889988-20150625

13877889988-20150625

13877889988-20150626

13877889988-20150626

13877889989

13877889989

13877889989

—-将需要批量查询的数据尽可能连续存放

CMS系统—-多条件查询

尽可能将查询条件关键词拼装到rowkey中,查询频率最高的条件尽量往前靠

20150230-zhangsan-category…

20150230-lisi-category…

(每一个条件的值长度不同,可以通过做定长映射来提高效率)

参考:《hbase 实战》—-详细讲述了facebook /GIS等系统的表结构设计

1.3 Hbase和mapreduce结合

为什么需要用mapreduce去访问hbase的数据?

——加快分析速度和扩展分析能力

Mapreduce访问hbase数据作分析一定是在离线分析的场景下应用



1.3.1 从Hbase中读取数据、分析,写入hdfs

/**
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
* @author duanhaitao@itcast.cn
*
*/
public class HbaseReader {

public static String flow_fields_import = "flow_fields_import";
static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

byte[] bytes = key.copyBytes();
String phone = new String(bytes);
byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes());
String url = new String(urlbytes);
context.write(new Text(phone + "\t" + url), NullWritable.get());

}

}

static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

context.write(key, NullWritable.get());
}
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");

Job job = Job.getInstance(conf);

job.setJarByClass(HbaseReader.class);

//      job.setMapperClass(HdfsSinkMapper.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
job.setReducerClass(HdfsSinkReducer.class);

FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

job.waitForCompletion(true);
}

}


1.3.2 从hdfs中读取数据写入Hbase

/**
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
}
* @author duanhaitao@itcast.cn
*
*/
public class HbaseSinker {

public static String flow_fields_import = "flow_fields_import";
static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[0];
String url = fields[1];

FlowBean bean = new FlowBean(phone,url);

context.write(bean, NullWritable.get());
}
}

static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{

@Override
protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

Put put = new Put(key.getPhone().getBytes());
put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());

context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);

}

}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "spark01");

HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);

boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);
if(tableExists){
hBaseAdmin.disableTable(flow_fields_import);
hBaseAdmin.deleteTable(flow_fields_import);
}
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes());
desc.addFamily(hColumnDescriptor);

hBaseAdmin.createTable(desc);

Job job = Job.getInstance(conf);

job.setJarByClass(HbaseSinker.class);

job.setMapperClass(HbaseSinkMrMapper.class);
TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);

FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));

job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Mutation.class);

job.waitForCompletion(true);
}
}


1.3 hbase高级编程

1.3.1 协处理器—- Coprocessor

协处理器有两种:observer和endpoint

Observer允许集群在正常的客户端操作过程中可以有不同的行为表现

Endpoint允许扩展集群的能力,对客户端应用开放新的运算命令

 Observer协处理器

 正常put请求的流程:



 加入Observer协处理后的put流程:



1 客户端发出put请求

2 该请求被分派给合适的RegionServer和region

3 coprocessorHost拦截该请求,然后在该表上登记的每个RegionObserver上调用prePut()

4 如果没有被prePut()拦截,该请求继续送到region,然后进行处理

5 region产生的结果再次被CoprocessorHost拦截,调用postPut()

6 假如没有postPut()拦截该响应,最终结果被返回给客户端

 Observer的类型

1.RegionObs——这种Observer钩在数据访问和操作阶段,所有标准的数据操作命令都可以被pre-hooks和post-hooks拦截

**2.WALObserver——**WAL所支持的Observer;可用的钩子是pre-WAL和post-WAL

3.MasterObserver——钩住DDL事件,如表创建或模式修改

 Observer应用场景示例

见下节;

 Endpoint—参考《Hbase 权威指南》

1.3.2 二级索引

row key在HBase中是以B+ tree结构化有序存储的,所以scan起来会比较效率。单表以row key存储索引,column value存储id值或其他数据 ,这就是Hbase索引表的结构。

由于HBase本身没有二级索引(Secondary Index)机制,基于索引检索数据只能单纯地依靠RowKey,为了能支持多条件查询,开发者需要将所有可能作为查询条件的字段一一拼接到RowKey中,这是HBase开发中极为常见的做法

比如,现在有一张1亿的用户信息表,建有出生地和年龄两个索引,我想得到一个条件是在杭州出生,年龄为20岁的按用户id正序排列前10个的用户列表。

有一种方案是,系统先扫描出生地为杭州的索引,得到一个用户id结果集,这个集合的规模假设是10万。然后扫描年龄,规模是5万,最后merge这些用户id,去重,排序得到结果。

这明显有问题,如何改良?

保证出生地和年龄的结果是排过序的,可以减少merge的数据量?但Hbase是按row key排序,value是不能排序的。

变通一下——将用户id冗余到row key里?OK,这是一种解决方案了,这个方案的图示如下:



merge时提取交集就是所需要的列表,顺序是靠索引增加了_id,以字典序保证的。

2, 按索引查询种类建立组合索引。

在方案1的场景中,想象一下,如果单索引数量多达10个会怎么样?10个索引,就要merge 10次,性能可想而知。



解决这个问题需要参考RDBMS的组合索引实现。

比如出生地和年龄需要同时查询,此时如果建立一个出生地和年龄的组合索引,查询时效率会高出merge很多。

当然,这个索引也需要冗余用户id,目的是让结果自然有序。结构图示如下:



这个方案的优点是查询速度非常快,根据查询条件,只需要到一张表中检索即可得到结果list。缺点是如果有多个索引,就要建立多个与查询条件一一对应的组合索引

而索引表的维护如果交给应用客户端,则无疑增加了应用端开发的负担

通过协处理器可以将索引表维护的工作从应用端剥离

 利用Observer自动维护索引表示例

在社交类应用中,经常需要快速检索各用户的关注列表t_guanzhu,同时,又需要反向检索各种户的粉丝列表t_fensi,为了实现这个需求,最佳实践是建立两张互为反向的表:

 一个表为正向索引关注表 “t_guanzhu”:

Rowkey: A-B

f1:From

f1:To

 另一个表为反向索引粉丝表:“t_fensi”:

Rowkey: B—A

f1:From

f1:To

插入一条关注信息时,为了减轻应用端维护反向索引表的负担,可用Observer协处理器实现:



1、编写自定义RegionServer

public class InverIndexCoprocessor extends BaseRegionObserver {

@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
// set configuration
Configuration conf = HBaseConfiguration.create();
// need conf.set...

HTable table = new HTable(conf, "t_fensi");
Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0);
Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0);
byte[] valueArray = fromCell.getValue();
String from = new String(valueArray);
valueArray = toCell.getValue();
String to = new String(valueArray);

Put putIndex = new Put((to+"-"+from).getBytes());
putIndex.add("f1".getBytes(), "From".getBytes(),from.getBytes());
putIndex.add("f1".getBytes(), "To".getBytes(),to.getBytes());

table.put(putIndex);
table.close();

}
}


2、打成jar包“fensiguanzhu.jar”上传hdfs

hadoop fs -put fensiguanzhu.jar /demo/

3、修改t_fensi的schema,注册协处理器

hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.itcast.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|'
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.


4、检查是否注册成功

hbase(main):018:0> describe ‘ff’

DESCRIPTION ENABLED

‘ff’, {TABLE_ATTRIBUTES => {coprocessor$1 => ‘hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.itcast.bi true

gdata.hbasecoprocessor.TestCoprocessor|1001|’}, {NAME => ‘f1’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMF

ILTER => ‘ROW’, REPLICATION_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0

‘, TTL => ‘2147483647’, KEEP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, B

LOCKCACHE => ‘true’}, {NAME => ‘f2’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMFILTER => ‘ROW’, REPLICATIO

N_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0’, TTL => ‘2147483647’, KE

EP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, BLOCKCACHE => ‘true’}

1 row(s) in 0.0250 seconds

5、向正向索引表中插入数据进行验证
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: