基于redis的延迟消息队列设计
2017-08-21 00:00
603 查看
摘要:基于redis的延迟消息队列
订单完成一个小时之后通知用户对上门服务进行评价
业务执行失败之后隔10分钟重试一次
类似的场景比较多简单的处理方式就是使用定时任务假如数据比较多的时候有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。
rabbitmq来满足需求但是不打算使用,因为目前太多的业务使用了另外的MQ中间件。
开发前需要考虑的问题?
及时性消费端能按时收到
同一时间消息的消费权重
可靠性消息不能出现没有被消费掉的情况
可恢复假如有其他情况导致消息系统不可用了至少能保证数据可以恢复
可撤回因为是延迟消息没有到执行时间的消息支持可以取消消费
高可用多实例这里指HA/主备模式并不是多实例同时一起工作
消费端如何消费
当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score延迟时间毫秒)这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序同时也支持maplist数据结构。
简单定义一个消息数据结构
运行原理:
用Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
将id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
使用timer实时监控zset有序列表中top10的数据。如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费运行时间需要<=当前时间的如果不满足重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。
http访问方式。
添加延时消息添加成功之后返回消费唯一IDPOST/push{…..消息体}
删除延时消息需要传递消息IDGET/delete?id=
恢复延时消息GET/reStore?expire=true|falseexpire是否恢复已过期未执行的消息。
恢复单个延时消息需要传递消息IDGET/reStore/id
获取消息需要长连接GET/get/topic
用nginx暴露服务,配置为轮询在添加延迟消息的时候就可以流量平均分配。
目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下如果是延迟消息就用延迟消息系统处理。
mysql的binlog等。
这里我们直接采用mysql数据库作为记录日志。
目前打算创建以下2张表:
消息表字段包括整个消息体
消息流转表字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip
定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系这里也可以是zsetkey。
举个栗子
假如redis服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表1也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。
当然恢复单个任务也可以这么干。
zookeeper吧。
如果有多个实例最多同时只能有1个实例工作这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用zookeeper或者redis就能实现分布式锁了。
最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。
zset队列个数可配置避免大数据带来高延迟的问题。
目前存在日志和redis元数据有可能不一致的问题如mysql挂了,写日志不会成功。
设计图:
更多信息请关注公众号techxxx
需求背景
用户下订单成功之后隔20分钟给用户发送上门服务通知短信订单完成一个小时之后通知用户对上门服务进行评价
业务执行失败之后隔10分钟重试一次
类似的场景比较多简单的处理方式就是使用定时任务假如数据比较多的时候有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。
队列设计
目前可以考虑使用开发前需要考虑的问题?
及时性消费端能按时收到
同一时间消息的消费权重
可靠性消息不能出现没有被消费掉的情况
可恢复假如有其他情况导致消息系统不可用了至少能保证数据可以恢复
可撤回因为是延迟消息没有到执行时间的消息支持可以取消消费
高可用多实例这里指HA/主备模式并不是多实例同时一起工作
消费端如何消费
当然初步选用
简单定义一个消息数据结构
privateStringid;/***自动生成全局惟一snowflake**/ privateStringbizKey; privatelongdelay;/***延时毫秒数**/ privateintpriority;//优先级 privatelongttl;/**消费端消费的ttl**/ privateStringbody;/***消息体**/ privatelongcreateTime=System.currentTimeMillis(); privateintstatus=Status.WaitPut.ordinal(); |
用
将
使用timer实时监控
客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费运行时间需要<=当前时间的如果不满足重新放入
客户端
因为涉及到不同程序语言的问题,所以当前默认支持添加延时消息添加成功之后返回消费唯一IDPOST/push{…..消息体}
删除延时消息需要传递消息IDGET/delete?id=
恢复延时消息GET/reStore?expire=true|falseexpire是否恢复已过期未执行的消息。
恢复单个延时消息需要传递消息IDGET/reStore/id
获取消息需要长连接GET/get/topic
用nginx暴露服务,配置为轮询在添加延迟消息的时候就可以流量平均分配。
目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下如果是延迟消息就用延迟消息系统处理。
消息可恢复
实现恢复的原理正常情况下一般都是记录日志,比如这里我们直接采用
目前打算创建以下2张表:
消息表字段包括整个消息体
消息流转表字段包括消息ID、变更状态、变更时间、
定义
举个栗子
假如
当然恢复单个任务也可以这么干。
关于高可用
分布式协调还是选用如果有多个实例最多同时只能有1个实例工作这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用
最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。
扩展
支持目前存在日志和
设计图:
更多信息请关注公众号techxxx
相关文章推荐
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- 基于Redis实现的延迟消息队列
- 基于Redis的消息队列封装和测试
- SpringBoot 基于Redis快速实现消息队列
- 基于netty的消息队列设计(一)
- 基于Redis的消息队列php-resque
- 基于Redis实现延迟队列
- 基于Redis实现分布式消息队列(1)
- 基于Redis实现分布式消息队列
- KMQueue 基于Redis的分布式消息队列
- 基于Redis实现延迟队列
- 基于Redis的简单消息队列模块(Node.js)
- 基于Redis实现分布式消息队列(2)
- [转载] 基于Redis实现分布式消息队列
- 基于Redis实现分布式消息队列(3)
- 分布式延迟消息队列实现分析与设计