基于Redis实现的延迟消息队列
2018-02-11 08:54
1076 查看
1. 设计方案
2. 代码实现
2.1 技术说明
2.2 核心代码
2.2.1 Message对象
2.2.2 Route(消息路由器)
2.2.3 RedisMq(消息队列)
2.2.4 RedisMq消息队列配置:
2.2.5 消费者
4. 测试
3. 总结
4. 源码连接
本文以redis为数据结构基础,配合Spring管理机制,使用java实现了一个轻量级、可配置的消息队列。适合的项目特点:
Spring框架管理对象
有消息需求,但不想维护mq中间件
有使用redis
对消息持久化并没有很苛刻的要求
需要使用rabbitmq实现延迟消息请参考这里
将整个Redis当做消息池,以kv形式存储消息
使用ZSET做优先队列,按照score维持优先级
使用LIST结构,以先进先出的方式消费
zset和list存储消息地址(对应消息池的每个key)
自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list
使用定时器维持路由
根据TTL规则实现消息延迟
MqConfig.java文件
如果使用的是xml配置 请参考:
需要使用rabbitmq实现延迟消息请参考这里
https://github.com/Yampery/rdsmq.git
2. 代码实现
2.1 技术说明
2.2 核心代码
2.2.1 Message对象
2.2.2 Route(消息路由器)
2.2.3 RedisMq(消息队列)
2.2.4 RedisMq消息队列配置:
2.2.5 消费者
4. 测试
3. 总结
4. 源码连接
本文以redis为数据结构基础,配合Spring管理机制,使用java实现了一个轻量级、可配置的消息队列。适合的项目特点:
Spring框架管理对象
有消息需求,但不想维护mq中间件
有使用redis
对消息持久化并没有很苛刻的要求
需要使用rabbitmq实现延迟消息请参考这里
1. 设计方案
设计主要包含以下几点:将整个Redis当做消息池,以kv形式存储消息
使用ZSET做优先队列,按照score维持优先级
使用LIST结构,以先进先出的方式消费
zset和list存储消息地址(对应消息池的每个key)
自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list
使用定时器维持路由
根据TTL规则实现消息延迟
2. 代码实现
2.1 技术说明
示例使用Springboot,gradle,redis,jdk8。2.2 核心代码
核心代码主要包含消息对象Message,路由器Route和消息队列RedisMQ。2.2.1 Message对象
package git.yampery.msmq; /** * @decription Message * <p>封装消息元数据</p> * @author Yampery * @date 2017/11/2 15:50 */ public class Message { /** * 消息主题 */ private String topic; /** * 消息id */ private String id; /** * 消息延迟 */ private long delay; /** * 消息优先级 */ private int priority; /** * 消息存活时间 */ private int ttl; /** * 消息体,对应业务内容 */ private String body; /** * 创建时间,如果只有优先级没有延迟,可以设置创建时间为0 * 用来消除时间的影响 */ private long createTime; /** * 消息状态(延迟-0;待发送-1;已发送-2;发送失败-3) */ private int status; /** * getset略... */ }
2.2.2 Route(消息路由器)
package git.yampery.msmq; /** * @decription Route * <p>消息路由器,主要控制将消息从指定的队列路 4000 由到待消费的list<br> * 通过这种方式实现自定义延迟以及优先级发送</p> * @author Yampery * @date 2017/11/3 14:33 */ public class Route { /** * 存放消息的队列 */ private String queue; /** * 待消费的列表 */ private String list; public Route(String queue, String list) { this.queue = queue; this.list = list; } /** * getset略... */ }
2.2.3 RedisMq(消息队列)
package git.yampery.msmq; import git.yampery.utils.JedisUtils; import org.springframework.scheduling.annotation.Scheduled; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.Set; /** * @decription RedisMQ * <p>基于redis的消息队列</p> * <p>将整个redis作为消息池存储消息体,以ZSET为消息队列,LIST作为待消费列表<br> * 用Spring定时器作为监听器,每次监听ZSET中指定数量的消息<br> * 根据SCORE确定是否达到发送要求,如果达到,利用消息路由{@link Route}将消息路由到待消费list</p> * @author Yampery * @date 2017/11/2 15:49 */ public class RedisMQ { /** * 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message} * 的消息体body作为值存储 */ private static final String MSG_POOL = "Message:Pool:"; /** * 默认监听数量,对应监听zset队列前多少个元素 */ private static final int DEFAUT_MONITOR = 10; @Resource private JedisUtils jedisUtils; /** * 每次监听queue中元素的数量,可配置 */ private int monitorCount = DEFAUT_MONITOR; /** * 消息路由 */ private List<Route> routes; /** * 存入消息池 * @param message * @return */ public boolean addMsgPool(Message message) { if (null != message) { return jedisUtils.setex(MSG_POOL + message.getId(), message.getBody(), message.getTtl()); } return false; } /** * 从消息池中删除消息 * @param id * @return */ public boolean deMsgPool(String id) { return jedisUtils.del(MSG_POOL + id); } /** * 像队列中添加消息 * @param key * @param score 优先级 * @param val * @return 返回消息id */ public String enMessage(String key, long score, String val) { if (jedisUtils.zadd(key, score, val)) { return val; } return ""; } /** * 从队列删除消息 * @param id * @return */ public boolean deMessage(String key, String id) { return jedisUtils.zdel(key, id); } /** * 消费 * @return */ public List<String> consume(String key) { long count = jedisUtils.countList(key); if (0 < count) { // 可根据需求做限制 List<String> ids = jedisUtils.rangeList(key, 0, count - 1); if (ids != null) { List<String> result = new ArrayList<>(); ids.forEach(l -> result.add(jedisUtils.get(MSG_POOL + l, ""))); jedisUtils.removeListValue(key, ids); return result; } /// if end~ } return null; } /** * 消息队列监听器<br> * 监听所有路由器,将消息队列中的消息路由到待消费列表 */ @Scheduled(cron="*/5 * * * * *") public void monitor() { // 获取消息路由 int route_size; if (null == routes || 1 > (route_size = routes.size())) return; String queue, list; Set<String> set; for (int i = 0; i < route_size; i++) { queue = routes.get(i).getQueue(); list = routes.get(i).getList(); set = jedisUtils.getSoredSetByRange(queue, 0, monitorCount, true); if (null != set) { long current = System.currentTimeMillis(); long score; for (String id : set) { score = jedisUtils.getScore(queue, id).longValue(); if (current >= score) { // 添加到list if (jedisUtils.insertList(list, id)) { // 删除queue中的元素 deMessage(queue, id); } /// if end~ } /// if end~ } /// for end~ } /// if end~ } /// for end~ } public int getMonitorCount() { return monitorCount; } public void setMonitorCount(int monitorCount) { this.monitorCount = monitorCount; } public List<Route> getRoutes() { return routes; } public void setRoutes(List<Route> routes) { this.routes = routes; } }
2.2.4 RedisMq消息队列配置:
mq.properties文件# 队列的监听数量 mq.monitor.count =30 # 队列一 mq.queue.first =queue:1 # 队列二 mq.queue.second =queue:2 # 消费列表一 mq.consumer.first =list:1 # 消费列表二 mq.consumer.second =list:2
MqConfig.java文件
package git.yampery.config; import git.yampery.msmq.RedisMQ; import git.yampery.msmq.Route; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import java.util.ArrayList; import java.util.List; /** * @decription MqConfig * <p>消息队列配置</p> * @author Yampery * @date 2018/2/9 14:26 * * 根据不同的架构可选择使用XML配置 * --------------------------------------------------- * <bean id="redisMQ" class="git.yampery.msmq.RedisMQ"> <property name="monitorCount" value="15"/> <property name="routes"> <list> <bean class="git.yampery.msmq.Route"> <property name="queue" value="${mq.queue.first}"/> <property name="list" value="${mq.consumer.first}"/> </bean> <bean class="git.yampery.msmq.Route"> <property name="queue" value="${mq.queue.second}"/> <property name="list" value="${mq.consumer.second}"/> </bean> </list> </property> </bean> * ---------------------------------------------------- */ @Configuration public class MqConfig { @Bean(name = "redisMQ") @Primary public RedisMQ getRedisMq() { RedisMQ redisMQ = new RedisMQ(); // 配置监听队列元素数量 redisMQ.setMonitorCount(monitorCount); // 配置路由表 redisMQ.setRoutes(routeList()); return redisMQ; } /** * 返回路由表 * @return */ public List<Route> routeList() { List<Route> routeList = new ArrayList<>(); Route routeFirst = new Route(queueFirst, listFirst); Route routeSecond = new Route(queueSecond, listSecond); routeList.add(routeFirst); routeList.add(routeSecond); return routeList; } @Value("${mq.monitor.count}") private int monitorCount; @Value("${mq.queue.first}") private String queueFirst; @Value("${mq.queue.second}") private String queueSecond; @Value("${mq.consumer.first}") private String listFirst; @Value("${mq.consumer.second}") private String listSecond; }
如果使用的是xml配置 请参考:
<bean id="redisMQ" class="git.yampery.msmq.RedisMQ"> <property name="monitorCount" value="15"/> <property name="routes"> <list> <bean class="git.yampery.msmq.Route"> <property name="queue" value="${mq.queue.first}"/> <property name="list" value="${mq.consumer.first}"/> </bean> <bean class="git.yampery.msmq.Route"> <property name="queue" value="${mq.queue.second}"/> <property name="list" value="${mq.consumer.second}"/> </bean> </list> </property> </bean>
2.2.5 消费者
并没有内置消费者监听器来实现,可以直接使用定时器实现package git.yampery.task; import com.alibaba.fastjson.JSONObject; import git.yampery.msmq.RedisMQ; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.List; /** * @decription MsgTask * <p>发送消息</p> * @author Yampery * @date 2018/2/9 18:04 */ @Component public class MsgTask { @Resource private RedisMQ redisMQ; // @Value("${mq.list.first}") private String MQ_LIST_FIRST; @Scheduled(cron="*/5 * * * * *") public void sendMsg() { // 消费 List<String> msgs = redisMQ.consume(redisMQ.getRoutes().get(0).getList()); int len; if (null != msgs && 0 < (len = msgs.size())) { // 将每一条消息转为JSONObject JSONObject jObj; for (int i = 0; i < len; i++) { if (!StringUtils.isEmpty(msgs.get(i))) { jObj = JSONObject.parseObject(msgs.get(i)); // 取出消息 System.out.println(jObj.toJSONString()); } } } } }
4. 测试
测试设置20秒延迟,发布消息到queue:1,在list:1消费。package git.yampery.mq; import com.alibaba.fastjson.JSONObject; import git.yampery.msmq.Message; import git.yampery.msmq.RedisMQ; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; import java.util.UUID; /** * @decription TestMQ * <p>测试</p> * @author Yampery * @date 2018/2/9 18:43 */ @RunWith(SpringRunner.class) @SpringBootTest public class TestMQ { @Resource private RedisMQ redisMQ; @Value("${mq.queue.first}") private String MQ_QUEUE_FIRST; @Test public void testMq() { JSONObject jObj = new JSONObject(); jObj.put("msg", "这是一条短信"); String seqId = UUID.randomUUID().toString(); // 将有效信息放入消息队列和消息池中 Message message = new Message(); message.setBody(jObj.toJSONString()); // 可以添加延迟配置 message.setDelay(20); message.setTopic("SMS"); message.setCreateTime(System.currentTimeMillis()); message.setId(seqId); // 设置消息池ttl,防止长期占用 message.setTtl(20 * 60); message.setStatus(0); message.setPriority(0); redisMQ.addMsgPool(message); redisMQ.enMessage(MQ_QUEUE_FIRST, message.getCreateTime() + message.getDelay() + message.getPriority(), message.getId()); } }
3. 总结
文章利用redis已有的数据存储结构,实现了轻量级的消息队列,并未真正实现消息持久化。示例是针对点对点的消息路由方式,当然,也可以扩展成广播和主题的方式,不过,这样就得不偿失了,如果需求比较复杂,可靠性要求较高,反而不如直接维护rabbitmq之类的消息队列。需要使用rabbitmq实现延迟消息请参考这里
4. 源码连接
文章并未贴出所有代码,gradle构建、jedisUtils以及一些配置等可以参考源码,源码只需要设置自己的redis配置即可。https://github.com/Yampery/rdsmq.git
相关文章推荐
- 基于Redis实现分布式消息队列(1)
- 基于Redis实现分布式消息队列(2)
- [转载] 基于Redis实现分布式消息队列
- 基于Redis实现分布式消息队列(2)
- SpringBoot 基于Redis快速实现消息队列
- 基于redis的延迟消息队列设计
- 基于Redis实现分布式消息队列(3)
- 基于Redis实现延迟队列
- 基于Redis实现的延迟队列
- 基于redis的延迟消息队列设计
- 基于Redis实现分布式消息队列(4)
- [置顶] Redis应用3-基于Redis消息队列实现的异步操作
- 基于Redis实现分布式消息队列
- 基于Redis实现分布式消息队列(汇总目录)
- PHP基于Redis消息队列实现发布微博的方法
- 基于Redis实现延迟队列
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计
- 基于redis的延迟消息队列设计