redis list实现消息队列以及事件模块
2013-01-11 10:21
525 查看
消息队列的产品很多,什么RabbitMQ、ActiveMQ、收钱的Websphere MQ等等。因为项目的缓存使用的是redis,所以公司的高手直接使用redis list实现了消息队列,网上也有文章写了。这里顺便把整个事件服务框架都整理下。
事件服务本身就是一个thrift服务,服务端使用的语言是java。先看看消息队列的总体结构:
topic是事件的标题如:用户注册,event就是事件触发时提交的内容,我们这边使用的都是json格式的对象。
有事件发布就有事件处理。
当一个topic被触发时,事件的生产者发布这个topic的event(
存放在redis的list里面),事件处理采用用定时轮询策略,定时取topic下的event有的话就顺序调用
eventHandle处理这个event,因为这是个异步处理的过程,事件处理失败的话需要记录下来,以便后面处理。
首先看event发布的代码:
redis使用的是spring提供的RedisTemplate,这里就是向对应的事件topic list里面push一个新的event。
取注册在这个topic下的事件处理程序,然后顺序执行。
看这段:
redisPoolPeriod默认为30秒。rawKeys 为byte[][]类型存放所有topic。然后取topic和event,异步调用eventHandle执行。
进入process里面,处理事件时设置了超时事件,防止阻塞。
事件服务本身就是一个thrift服务,服务端使用的语言是java。先看看消息队列的总体结构:
topic1(事件topic) | event1、event2... |
topic2 | event1、event2... |
topic3 | event1、event2... |
有事件发布就有事件处理。
topic | eventHandle1,eventHandle2... |
存放在redis的list里面),事件处理采用用定时轮询策略,定时取topic下的event有的话就顺序调用
eventHandle处理这个event,因为这是个异步处理的过程,事件处理失败的话需要记录下来,以便后面处理。
首先看event发布的代码:
public void send(String topic, Object event) { Topic t = eventAdminService.getTopic(topic); if (t != null && t.isEnabled()) { redis.opsForList().leftPush(topic, event); } }
redis使用的是spring提供的RedisTemplate,这里就是向对应的事件topic list里面push一个新的event。
取注册在这个topic下的事件处理程序,然后顺序执行。
private void poolRedisEvent() { while (poolRunning) { try { redis.execute(new RedisCallback() { @Override public Object doInRedis(RedisConnection con) throws DataAccessException { if (ArrayUtils.isEmpty(rawKeys)) { return null; } List<byte[]> list = conn.getNativeConnection().blpop(redisPoolPeriod, rawKeys); if (CollectionUtils.isEmpty(list)) { return null; } final String topic = redis.getKeySerializer().deserialize(list.get(0)); final Object event = redis.getValueSerializer().deserialize(list.get(1)); executor.execute(new Runnable() { @Override public void run() { process(topic, null, event); } }); return null; } }); } catch (Throwable e) { LOG.warn("Event queue pool error," + e.getMessage()); try { Thread.sleep(5000); } catch (InterruptedException ignored) { } } } }
看这段:
conn.getNativeConnection().blpop(redisPoolPeriod, rawKeys);
redisPoolPeriod默认为30秒。rawKeys 为byte[][]类型存放所有topic。然后取topic和event,异步调用eventHandle执行。
final String topic = redis.getKeySerializer().deserialize(list.get(0)); final Object event = redis.getValueSerializer().deserialize(list.get(1)); executor.execute(new Runnable() { @Override public void run() { process(topic, null, event); } });
进入process里面,处理事件时设置了超时事件,防止阻塞。
executor.submit(new Runnable() { @Override public void run() { process(topicId, handleId, context); } }).get(timeout, TimeUnit.SECONDS); // timeout默认是10秒现在看看真正的处理代码:
private void process(String topicId, String handleId, ContextImpl context) { Map<String, EventHandle> hMap = topicHandles.get(topicId);//获取所有注册的eventHandle ...... if (handleId != null) {//指定了处理程序id,就单个调用 processHandle(handleId, hMap.get(handleId), lastTopics.getByKey(topicId).getHandleConfig(handleId), context); } else { for (Map.Entry<String, EventHandle> entry : hMap.entrySet()) {//顺序执行 handleId = entry.getKey(); HandleConfig config = lastTopics.getByKey(topicId).getHandleConfig(handleId); processHandle(handleId, entry.getValue(), config, context); } } }
相关文章推荐
- redis中list和messageListern实现消息队列的区别
- PHP消息队列实现及应用:流量削峰案列(Redis的List类型实现秒杀)
- Java Jedis操作Redis示例(二)——list 生产者/消费者模式实现消息队列
- 利用redis实现消息队列之topic模式
- Java利用Redis实现消息队列
- PHP中利用redis实现消息队列处理高并发请求--简洁代码实现效果
- 微服务框架Spring Cloud之使用事件和消息队列实现分布式事务
- java redis使用之利用jedis实现redis消息队列
- Redis PHP Set集合实现消息队列
- 用redis实现支持优先级的消息队列
- webRTC base模块MessageQueue消息队列的实现
- 用redis实现消息队列
- Redis和RabbitMQ实现消息队列
- Redis实现消息队列
- php实现redis消息队列将数据保存到mysql
- php-共享内存以及利用共享内存实现消息队列
- SpringBoot非官方教程 | 第十四篇:在springboot中用redis实现消息队列
- 转: Redis消息队列的若干实现方式
- Spring和ActiveMQ集成实现队列消息以及PUB/SUB模型