您的位置:首页 > 其它

es续写性能提高5-10倍(1w->10w qps)

2019-06-17 20:09 344 查看

https://blog.csdn.net/weixin_39031707/article/details/91571210
如上的文章介绍了如何提高写入的qps,但是这种提高智能是针对原始的空白写入index提高性能会比较大,在生产中,发现如果es的某个index写入了10亿数据以后,在给这个index增量写入的时候,速度就会很慢,qps大概只有1-2w 最多也就3w左右,为此深表头疼
在实践中发现,es单个index随着数据量的增加,在做增量导入的时候,速度会线性下降,比如10亿条数据的index,可能增量插入的qps为3w/s, 30亿条数据的增量插入的qps可能就只有1w/s每秒了。
正是因为es的这个特性,我在生产中,就想能不能把30亿条数据打散了,然后把一个index分成8个index,每个index通过hash打散分配到不通的index中去,这样在做增量插入的时候,一个index中可能就只有3亿+条数据,这样的速度,应该就会有所提升
原理是这样的我插入的时候指定mobile-id为_id,那么插入的时候,就利用hash函数对mobile_id进行打散,将其平均分到8个index中
代码如下:
注册udf函数:

ss.udf.register("getRouteCode",(routeKey:String) =>  {
val routeCode: Int =routeKey match  {
case null|""=> 99
case _=> {
(routeKey.hashCode() & Integer.MAX_VALUE) % 8
}
}
routeCode
})

利用udf函数将hive中的数据进行打散写入到不通的index中
代码如下:

for(routeCode <- 0 to 7){
val df = ss.sql(s"select appuid,demographics,financial,interests,consumption,geo,device from dmp.t_dmp_user_tags lateral view explode(appuids) num as appuid where appuid is not null and appuid<>'' and $routeCode=getRouteCode(appuid) and updateday='$day'")
df.saveToEs(s"t_dmp_user_tags_$routeCode/_doc",esOptions)
df.unpersist(true)
}

经过优化以后,相比原来的30亿条数据,写入的qps可以达到5-10w/s qps 速度足足提升了5-10倍,如果mobile_id是新的多那么,速度就快,如果大量都是老的mobile_id速度就会慢一些

es的建索引语句如下:

PUT /t_dmp_user_tags_0
{
"settings":{
"number_of_shards":20,
"number_of_replicas":0
},
"mappings":{
"properties":{
"demographics":{"type":"keyword"},
"xx":{"type":"keyword"},
"xx":{"type":"keyword"},
"xxx":{"type":"keyword"},
"geo":{"type":"keyword"},
"device":{"type":"keyword"}
}

}
}

优化语句:

PUT /t_dmp_user_tags_0/_settings
{ "refresh_interval":  "60s",
"index.translog.durability": "async",
"translog.sync_interval":"60s",
"index.translog.flush_threshold_size": "1024mb"
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: