您的位置:首页 > 其它

延云YDB v1.0.3-beta版本发布

2015-12-09 15:40 375 查看
 延云YDB v1.0.3-beta版本发布

延云YDB v1.0.3-beta版本发布
http://ycloud.net.cn/newsitem/277188457
支持导出

支持文件与多表关联过滤

支持kafka数据源

支持自定义任意数据源

导出功能

使用后导出功能,order by、limit、group by均失效,只能对查询明细进行导出。
该过程的执行时间视导出数据的大小在几秒到十几分钟左右,如果导出数据量很大,导出时间过长的话,http链接有可能超时中断,需要您需要采用后面所接受的异步方式的导出。
 
1.        
ydb_site.xml配置每个task最大允许的导入条数
ydb.export.max.return.docset.size:1000000
2.        
sql中参数说明
export.path
导出路径
export.joinchar
导出时列与列之间的分隔符默认\t,复杂字符串请用urlencode进行编码
export.max.return.docset.size
用于覆盖配置文件中的参数,配置每个task最大允许的导入条数
3.        
Sql写法例子
导出结果到/data/ycloud/ydb/001目录

select indexnum,label from ydbexample where ydbpartion in (20151011)  and
ydbkv='export.path:/data/ycloud/ydb/001'
  limit 0,10

配置分隔符

select indexnum,label from ydbexample where ydbpartion in (20151011)  and ydbkv='export.path:/data/ycloud/ydb/001' and ydbkv='export.joinchar:%09'  limit 0,10

覆盖配置文件中的默认导出条数限制ydb.export.max.return.docset.size

select indexnum,label,content,contentcjk from ydbexample where ydbpartion in (20151011)  and ydbkv='export.path:/data/ycloud/ydb/001' and ydbkv='export.max.return.docset.size:30'  limit 0,10
4.        
导出完毕后,会在hdfs目录里看到如下的文件

 
文本文件的内容格式如下,默认按照\t分割

文件与多表关联过滤

用来实现类似如下的SQL
select indexnum from ydbexample where ydbp
4000
artion in (20151011)   and indexnum in (
         select indexnum from table where xxx=xxx
)
 
但ydb中要实现这样的功能需要三步
第一:将要关联表的数据导出到HDFS中,或者从本地上传文件到HDFS中。
第二;将HDFS中的文件与目标表进行预关联,创建关联索引。
第三:利用关联索引实现基于文件的 in
过滤。
 
1.        
将HDFS中的文件与目标表进行预关联,创建关联索引
参数说明:
join.pre.split:原始文件所使用的分隔符,必须填写
join.pre.src:原始文件的源路径
join.pre.index:生成好的关联索引的存储路径
 
示例如下
select indexnum,label from ydbexample where ydbpartion in (20151011) and  ydbkv='join.pre.split:%09' and ydbkv='join.pre.src:/data/ycloud/ydb/001' and ydbkv='join.pre.index:/data/ycloud/ydb/001index' limit 0,10
注,这里select两个维度,表示原始join.pre.src所指向的目录有两个维度,我们对这两个维度都进行创建预关联
2.        
利用关联索引实现基于文件的 in
过滤
实现in
的匹配,如下面的参数
join.in:indexnum@/data/ycloud/ydb/001index
下面参数由两部分组成,由@符号分隔,分别为要进行关联的列名,以及关联索引的存储位置,也可以再同一个join.in内,写多个关联条件,之间用;分隔,多个条件之间是or的关系,举例如下

select label,indexnum from ydbexample where ydbpartion in ('20151011')   and ydbkv='join.in:indexnum@/data/ycloud/ydb/001index;indexnum@/data/ycloud/ydb/002index'
 limit 0,1000
 
 
可以同时使用多个join.in参数对多个字段或多个关联索引一起关联,多个join.in之间只能是and关系,如果想要实现or的关系,请写在同一个join.in内。
 
示例一,对同一个关联索引的使用不同的字段进行关联
select label,indexnum from ydbexample where ydbpartion in ('20151011')   and ydbkv='join.in:indexnum@/data/ycloud/ydb/001index'  and ydbkv='join.in:label@/data/ycloud/ydb/001index' limit 0,1000
 
示例二,使用同一个字段,不同的关联索引进行关联
select userage,label,indexnum from ydbexample where ydbpartion in ('20151011')   and ydbkv='join.in:indexnum@/data/ycloud/ydb/004_index'  and ydbkv='join.in:indexnum@/data/ycloud/ydb/003_index'  limit 0,1000
 
3.        
生成的关联索引,副本个数的配置
如果集群规模较大,生成的副本个数太少的话,有可能多个分片同时的并发查询,支撑不了太大的并发,故如果想增加副本个数,可以通过dfs.replication来配置。
示例如下:
select indexnum,label from ydbexample where ydbpartion in (20151011) and  ydbkv='join.pre.split:%09' and ydbkv='join.pre.src:/data/ycloud/ydb/001' and ydbkv='join.pre.index:/data/ycloud/ydb/001index'  and  ydbkv='dfs.replication:2'
limit 0,10
 
4.        
关联后的数据,可以继续导出

select indexnum,label from ydbexample where ydbpartion='20151209' and content='joinyannian' and ydbkv='export.path:/data/ycloud/ydb/001' and ydbkv='export.joinchar:%09'  and ydbkv='join.in:indexnum@/data/ycloud/ydb/0010index'    limit 0,10

异步的方式响应

导出数据,以及生成关联索引,如果涉及的数据量很大,处理时间太长,http连接有可能会断开,故针对导出数据以及生成关联索引这两个功能,YDB提供异步处理的方式,http可以立即返回,但是通过接口,可以查看某个任务导出是否完成。

 

异步的任务会放到独立的线程池里来处理,

yarn_site.yaml中可以通过ydb.async.threads这个配置项来调整线程池的大小。

 

1.        
参数的配置
如果要使用异步响应的方式,需要配置async.run:true参数即可。
异步执行返回后会返回requestid作为当前异步任务的ID。
如下图所示:

 
注:该参数目前只能作用于
导出数据以及生成关联索引,请勿在其他功能上使用。
 
2.        
异步导出数据示例

select indexnum,label from ydbexample where ydbpartion in (20151011)  and ydbkv='export.path:/data/ycloud/ydb/001' and ydbkv='async.run:true' limit 0,10
3.        
异步的方式生成关联索引
select indexnum,label from ydbexample where ydbpartion in (20151011) and  ydbkv='join.pre.split:%09' and ydbkv='join.pre.src:/data/ycloud/ydb/001' and ydbkv='join.pre.index:/data/ycloud/ydb/001index' and ydbkv='async.run:true'
limit 0,10
 
 
4.        
获取任务执行的状态
异步导出数据后,可以通过调用下面的接口,来判断异步任务是否执行完毕
async.get中数据任务的ID
select ydbsz_lyn from ydbexample where ydbpartion in (20151011)  and ydbkv='async.get:224374199'  limit 0,10
如果async.get输入的任务ID为0,则会输出全部正在运行的异步任务的状态。一个任务完成后,任务状态会保留约1个小时,之后会清除掉。
 
获取全部异步任务的执行状态
select ydbsz_lyn from ydbexample where ydbpartion in (20151011)  and ydbkv='async.get:0'  limit 0,10
 
 

接入Kafka以及其他自定义数据源的接入方法

接入kafka

ydb默认使用kafka的Consume group
的方式来消费数据,请参考官方wiki

该方式在使用的时候,为了确保消费的数据均衡,记得调整kafka的num.partitions的个数,要超过ydb.topoplgy.task.count的个数,不然会存在有的分区消费不到数据的情况

(注:有的时候,在kafka的配置项改动了num.partitions有可能也没有生效,需要将zookeeper里对应的kafka已经持久化的topic相关信息也删除掉)

 

1.        
在 ydb_site.yaml中新增数据源
#该配置用于拓展reader,在实际项目中ydb中的数据,会来自多个不同的数据源,默认数据源default为上传json格式的数据到hdfs里去,kafka1是我们新增的一个数据源,下面给出的配置都是针对kafka1的配置,注意所有的配置项也都是以.kafka1为结尾
ydb.reader.list: "default,kafka1"
2.        
在ydb_site.yaml配置kafka1的reader

#配置reader为kafka的reader

 ydb.reader.read.class.kafka1: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

#配置缓冲队列的长度,该项配置为了过载保护而设计,用来解决ydb消费速度,不及kafka的读取速度的时候,缓存队列积累#越来越多导致的内存被占满的问题,如果队列中积累的消息的长度超过该配置,系统会暂停从kafka里消费数据

kafka.queue.size.kafka1: 128

 

#每个task使用多少个线程来消费kafka的数据

 kafka.threads.kafka1: 1

#订阅kafka的topic

 kafka.topic.kafka1: "ydbtest"

#当前kafka所使用的group

 kafka.group.kafka1: "ydbtest_g1"

#kafka所在的zk,多个地址之间用英文逗号分隔

 kafka.zookeeper.kafka1: "192.168.112.129:2181"

#首次读取kafka的数据的offset的配置,默认是从最小的offset开始读取,也可以配置为largest

 kafka.auto.offset.reset.kafka1: "smallest"

#是否将从kafka里消费到的原始数据备份到hdfs中,备份会多一份IO,但是如果您数据的生命周期很长,建议备份原始数据。

 kafka.backup.rawdata.kafka1: "true"

 

3.        
在ydb_site.yaml配置
kafka1的parser

如果从kafka里消费到的数据,还是ydb标准的json格式,那么使用json格式的parser即可。
         ydb.reader.parser.class.kafka1: "cn.net.ycloud.ydb.server.reader.JsonParser"

l  
但是我们有很多情况是,我们的数据时按照一定分隔符分隔的数据,针对这种情况,我们可以使用SplitParser,具体用法如下
###分隔符方式的split使用example#####
 ydb.reader.parser.class.kafka1: "cn.net.ycloud.ydb.server.reader.SplitParser"
 
#配置分隔符,切记分隔后的数据的顺序要跟创建表时候的字段顺序一致
 splitparser.split.kafka1: ","
#配置数据应该被导入到具体那张表里面取
 splitparser.tablename.kafka1: "ydbexample"
#数据如何进行分区,可以是一个列的名称,也可以按照当前系统时间进行自动分区,如果是按照系统时间自动分区,可以填写如下三个值, ydbparse_yyyyMM,ydbparse_yyyy,ydbparse_yyyyMMdd
#分区的个数是有限制的,当填写的是一个列的名称的时候,切记该列的个数不能是太多的。#
 splitparser.partion.field.kafka1: "ydbparse_yyyyMMdd"

 

自定义数据源,自定义格式

ydb支持自定义任意的数据源接入,以及解析任意格式的数据,只需要您实现reader与parser的接口

1.        
reader需要实现的接口如下
public
interface
 
RawDataReader
 {
   
public
void
init(String
prefix,Map
config,
int
readerIndex,
int
readerCount)
           
throws IOException;
   
public List<Object> read()
throws IOException;
   
public
void
close()
throws IOException;
}
 
2.        
Parser需要实现的接口如下
public
interface
Parser  {
   
public 
abstract
void
init(String
prefix,Map
config,

            
int
readerIndex,
int
readerCount)
  
throws IOException;
   
public YdbParse parse(Object
raw)
throws Throwable;
}
3.        
现有的reader与parser的源码,我们随软件本身,放在了reader_example目录下,大家可以参考、改和使用。
4.        
reader与Parser实现完成后,将其以jar包的方式放到lib目录下
5.        
更改ydb_site.xml变更parser与reader所指向的类名
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: