您的位置:首页 > 数据库 > Redis

基于Redis实现的延迟消息队列

2018-02-11 08:54 1076 查看
1. 设计方案

2. 代码实现
2.1 技术说明

2.2 核心代码
2.2.1 Message对象

2.2.2 Route(消息路由器)

2.2.3 RedisMq(消息队列)

2.2.4 RedisMq消息队列配置:

2.2.5 消费者

4. 测试

3. 总结

4. 源码连接

本文以redis为数据结构基础,配合Spring管理机制,使用java实现了一个轻量级、可配置的消息队列。适合的项目特点:

Spring框架管理对象

有消息需求,但不想维护mq中间件

有使用redis

对消息持久化并没有很苛刻的要求

需要使用rabbitmq实现延迟消息请参考这里

1. 设计方案

设计主要包含以下几点:

将整个Redis当做消息池,以kv形式存储消息

使用ZSET做优先队列,按照score维持优先级

使用LIST结构,以先进先出的方式消费

zset和list存储消息地址(对应消息池的每个key)

自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list

使用定时器维持路由

根据TTL规则实现消息延迟



2. 代码实现

2.1 技术说明

示例使用Springboot,gradle,redis,jdk8。

2.2 核心代码

核心代码主要包含消息对象Message,路由器Route和消息队列RedisMQ。

2.2.1 Message对象

package git.yampery.msmq;

/**
* @decription Message
* <p>封装消息元数据</p>
* @author Yampery
* @date 2017/11/2 15:50
*/
public class Message {

/**
* 消息主题
*/
private String topic;
/**
* 消息id
*/
private String id;
/**
* 消息延迟
*/
private long delay;
/**
* 消息优先级
*/
private int priority;
/**
* 消息存活时间
*/
private int ttl;
/**
* 消息体,对应业务内容
*/
private String body;
/**
* 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
* 用来消除时间的影响
*/
private long createTime;
/**
* 消息状态(延迟-0;待发送-1;已发送-2;发送失败-3)
*/
private int status;

/**
* getset略...
*/
}


2.2.2 Route(消息路由器)

package git.yampery.msmq;

/**
* @decription Route
* <p>消息路由器,主要控制将消息从指定的队列路
4000
由到待消费的list<br>
* 通过这种方式实现自定义延迟以及优先级发送</p>
* @author Yampery
* @date 2017/11/3 14:33
*/
public class Route {

/**
* 存放消息的队列
*/
private String queue;

/**
* 待消费的列表
*/
private String list;

public Route(String queue, String list) {
this.queue = queue;
this.list = list;
}

/**
* getset略...
*/
}


2.2.3 RedisMq(消息队列)

package git.yampery.msmq;

import git.yampery.utils.JedisUtils;
import org.springframework.scheduling.annotation.Scheduled;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* @decription RedisMQ
* <p>基于redis的消息队列</p>
* <p>将整个redis作为消息池存储消息体,以ZSET为消息队列,LIST作为待消费列表<br>
* 用Spring定时器作为监听器,每次监听ZSET中指定数量的消息<br>
* 根据SCORE确定是否达到发送要求,如果达到,利用消息路由{@link Route}将消息路由到待消费list</p>
* @author Yampery
* @date 2017/11/2 15:49
*/
public class RedisMQ {

/**
* 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
* 的消息体body作为值存储
*/
private static final String MSG_POOL = "Message:Pool:";
/**
* 默认监听数量,对应监听zset队列前多少个元素
*/
private static final int DEFAUT_MONITOR = 10;
@Resource private JedisUtils jedisUtils;

/**
* 每次监听queue中元素的数量,可配置
*/
private int monitorCount = DEFAUT_MONITOR;

/**
* 消息路由
*/
private List<Route> routes;

/**
* 存入消息池
* @param message
* @return
*/
public boolean addMsgPool(Message message) {

if (null != message) {
return jedisUtils.setex(MSG_POOL + message.getId(), message.getBody(), message.getTtl());
}
return false;
}

/**
* 从消息池中删除消息
* @param id
* @return
*/
public boolean deMsgPool(String id) {

return jedisUtils.del(MSG_POOL + id);
}

/**
* 像队列中添加消息
* @param key
* @param score 优先级
* @param val
* @return 返回消息id
*/
public String enMessage(String key, long score, String val) {

if (jedisUtils.zadd(key, score, val)) {
return val;
}
return "";
}

/**
* 从队列删除消息
* @param id
* @return
*/
public boolean deMessage(String key, String id) {

return jedisUtils.zdel(key, id);
}

/**
* 消费
* @return
*/
public List<String> consume(String key) {

long count = jedisUtils.countList(key);
if (0 < count) {
// 可根据需求做限制
List<String> ids = jedisUtils.rangeList(key, 0, count - 1);
if (ids != null) {
List<String> result = new ArrayList<>();
ids.forEach(l -> result.add(jedisUtils.get(MSG_POOL + l, "")));
jedisUtils.removeListValue(key, ids);
return result;
} /// if end~
}

return null;
}

/**
* 消息队列监听器<br>
* 监听所有路由器,将消息队列中的消息路由到待消费列表
*/
@Scheduled(cron="*/5 * * * * *")
public void monitor() {
// 获取消息路由
int route_size;
if (null == routes || 1 > (route_size = routes.size())) return;
String queue, list;
Set<String> set;
for (int i = 0; i < route_size; i++) {
queue = routes.get(i).getQueue();
list = routes.get(i).getList();
set = jedisUtils.getSoredSetByRange(queue, 0, monitorCount, true);
if (null != set) {
long current = System.currentTimeMillis();
long score;
for (String id : set) {
score = jedisUtils.getScore(queue, id).longValue();
if (current >= score) {
// 添加到list
if (jedisUtils.insertList(list, id)) {
// 删除queue中的元素
deMessage(queue, id);
} /// if end~
} /// if end~
} /// for end~
} /// if end~
} /// for end~
}

public int getMonitorCount() {
return monitorCount;
}

public void setMonitorCount(int monitorCount) {
this.monitorCount = monitorCount;
}

public List<Route> getRoutes() {
return routes;
}

public void setRoutes(List<Route> routes) {
this.routes = routes;
}
}


2.2.4 RedisMq消息队列配置:

mq.properties文件

# 队列的监听数量
mq.monitor.count                 =30
# 队列一
mq.queue.first                  =queue:1
# 队列二
mq.queue.second                 =queue:2
# 消费列表一
mq.consumer.first               =list:1
# 消费列表二
mq.consumer.second              =list:2


MqConfig.java文件

package git.yampery.config;

import git.yampery.msmq.RedisMQ;
import git.yampery.msmq.Route;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.ArrayList;
import java.util.List;

/**
* @decription MqConfig
* <p>消息队列配置</p>
* @author Yampery
* @date 2018/2/9 14:26
*
* 根据不同的架构可选择使用XML配置
* ---------------------------------------------------
*
<bean id="redisMQ" class="git.yampery.msmq.RedisMQ">
<property name="monitorCount" value="15"/>
<property name="routes">
<list>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.first}"/>
<property name="list" value="${mq.consumer.first}"/>
</bean>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.second}"/>
<property name="list" value="${mq.consumer.second}"/>
</bean>
</list>
</property>
</bean>

* ----------------------------------------------------
*/
@Configuration
public class MqConfig {

@Bean(name = "redisMQ")
@Primary
public RedisMQ getRedisMq() {
RedisMQ redisMQ = new RedisMQ();
// 配置监听队列元素数量
redisMQ.setMonitorCount(monitorCount);
// 配置路由表
redisMQ.setRoutes(routeList());
return redisMQ;
}

/**
* 返回路由表
* @return
*/
public List<Route> routeList() {
List<Route> routeList = new ArrayList<>();
Route routeFirst = new Route(queueFirst, listFirst);
Route routeSecond = new Route(queueSecond, listSecond);
routeList.add(routeFirst);
routeList.add(routeSecond);
return routeList;
}

@Value("${mq.monitor.count}")
private int monitorCount;
@Value("${mq.queue.first}")
private String queueFirst;
@Value("${mq.queue.second}")
private String queueSecond;
@Value("${mq.consumer.first}")
private String listFirst;
@Value("${mq.consumer.second}")
private String listSecond;
}


如果使用的是xml配置 请参考:

<bean id="redisMQ" class="git.yampery.msmq.RedisMQ">
<property name="monitorCount" value="15"/>
<property name="routes">
<list>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.first}"/>
<property name="list" value="${mq.consumer.first}"/>
</bean>
<bean class="git.yampery.msmq.Route">
<property name="queue" value="${mq.queue.second}"/>
<property name="list" value="${mq.consumer.second}"/>
</bean>
</list>
</property>
</bean>


2.2.5 消费者

并没有内置消费者监听器来实现,可以直接使用定时器实现

package git.yampery.task;

import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.RedisMQ;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.List;

/**
* @decription MsgTask
* <p>发送消息</p>
* @author Yampery
* @date 2018/2/9 18:04
*/
@Component
public class MsgTask {

@Resource private RedisMQ redisMQ;
// @Value("${mq.list.first}") private String MQ_LIST_FIRST;

@Scheduled(cron="*/5 * * * * *")
public void sendMsg() {
// 消费
List<String> msgs = redisMQ.consume(redisMQ.getRoutes().get(0).getList());
int len;
if (null != msgs && 0 < (len = msgs.size())) {
// 将每一条消息转为JSONObject
JSONObject jObj;
for (int i = 0; i < len; i++) {
if (!StringUtils.isEmpty(msgs.get(i))) {
jObj = JSONObject.parseObject(msgs.get(i));
// 取出消息
System.out.println(jObj.toJSONString());
}
}
}
}
}


4. 测试

测试设置20秒延迟,发布消息到queue:1,在list:1消费。

package git.yampery.mq;

import com.alibaba.fastjson.JSONObject;
import git.yampery.msmq.Message;
import git.yampery.msmq.RedisMQ;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.UUID;

/**
* @decription TestMQ
* <p>测试</p>
* @author Yampery
* @date 2018/2/9 18:43
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMQ {

@Resource
private RedisMQ redisMQ;
@Value("${mq.queue.first}")
private String MQ_QUEUE_FIRST;

@Test
public void testMq() {

JSONObject jObj = new JSONObject();
jObj.put("msg", "这是一条短信");

String seqId = UUID.randomUUID().toString();

// 将有效信息放入消息队列和消息池中
Message message = new Message();
message.setBody(jObj.toJSONString());
// 可以添加延迟配置
message.setDelay(20);
message.setTopic("SMS");
message.setCreateTime(System.currentTimeMillis());
message.setId(seqId);
// 设置消息池ttl,防止长期占用
message.setTtl(20 * 60);
message.setStatus(0);
message.setPriority(0);
redisMQ.addMsgPool(message);
redisMQ.enMessage(MQ_QUEUE_FIRST,
message.getCreateTime() + message.getDelay() + message.getPriority(), message.getId());
}
}


3. 总结

文章利用redis已有的数据存储结构,实现了轻量级的消息队列,并未真正实现消息持久化。示例是针对点对点的消息路由方式,当然,也可以扩展成广播和主题的方式,不过,这样就得不偿失了,如果需求比较复杂,可靠性要求较高,反而不如直接维护rabbitmq之类的消息队列。

需要使用rabbitmq实现延迟消息请参考这里

4. 源码连接

文章并未贴出所有代码,gradle构建、jedisUtils以及一些配置等可以参考源码,源码只需要设置自己的redis配置即可。

https://github.com/Yampery/rdsmq.git
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis 消息 spring 延迟