队列实现异步--通过redis
2017-02-25 13:43
309 查看
假如有a,b,c,d四个事件,同步就是a,b,c,d四个事件全部做完才能返回;异步就是只做了a事件就可返回,b、c、d三个事件可以延后做,不关注具体做的时间。
如上图:左侧Biz是一个总的入口,通过这个入口把各种事件放到队列里,右侧EventConsumer把队列中的事件取出,然后去找到对应的handle去处理这个事件。EventHandler是一个接口,使程序有更好地扩展性,LoginHandler、LikeHandler是EventHandler的实现类。
代码示例:
(2)首先,创建JedisAdapt类,初始化Redis,并可以通过这个类把事件push到队列里,或者把事件从队列里取出。
(2)创建事件类型的枚举类,如登录,评论,点赞·····
(2)每个事件的模型
(3)创建EventProducer类,作为发送事件的入口,
(4)创建EventHandler接口,
(5)创建EventCustomer获取队列里的所有事件,并将事件分发给对应的handler处理
最后创建EventHandler接口的实现类LoginHandler
需要异步处理的事件通过创建EventModel类,并调用EventProducer的fireEvent方法,push到队列里,如:
如上图:左侧Biz是一个总的入口,通过这个入口把各种事件放到队列里,右侧EventConsumer把队列中的事件取出,然后去找到对应的handle去处理这个事件。EventHandler是一个接口,使程序有更好地扩展性,LoginHandler、LikeHandler是EventHandler的实现类。
代码示例:
//安装好[redis](https://github.com/MSOpenTech/redis)后引入jedis包 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> <type>jar</type> <scope>compile</scope> </dependency>
(2)首先,创建JedisAdapt类,初始化Redis,并可以通过这个类把事件push到队列里,或者把事件从队列里取出。
//初始化JedisPool, public class JedisAdapt implements InitializingBean{ private JedisPool pool; private static final Logger logger = LoggerFactory.getLogger(JedisAdapt.class); @Override public void afterPropertiesSet() throws Exception { pool = new JedisPool("redis://localhost:6379/10"); } //使用redis实现队列 public long lpush(String key, String value){ Jedis jedis = null; try { jedis = pool.getResource(); return jedis.lpush(key,value); }catch (Exception e){ logger.error("发生异常"+e.getMessage()); }finally { if (jedis != null){ jedis.close(); } } return 0; } //获取队列中的元素 public List<String> brpop(int timeout, String key){ Jedis jedis = null; try { jedis = pool.getResource(); return jedis.brpop(timeout,key); }catch (Exception e){ logger.error("发生异常"+e.getMessage()); }finally { if (jedis != null){ jedis.close(); } } return null; } }
(2)创建事件类型的枚举类,如登录,评论,点赞·····
//事件的类型 public enum EventType { LIKE(0), COMMENT(1), LOGIN(2), MAIL(3); private int value; EventType(int value){ this.value = value; } public int getValue() { return value; } }
(2)每个事件的模型
//事件的模型 public class EventModel { private EventType type; //事件的类型 private int entityId; //触发的载体,如评论 private int entityType; private int entityOwnerId; private int actorId; //触发者 private Map<String,String> exts = new HashMap<String,String>(); //扩展的对象 public EventModel(){} public EventModel(EventType type){ this.type = type; } public EventType getType() { return type; } public EventModel setType(EventType type) { this.type = type; return this; } public int getEntityId() { return entityId; } public void setEntityId(int entityId) { this.entityId = entityId; } public int getEntityType() { return entityType; } public EventModel setEntityType(int entityType) { this.entityType = entityType; return this; } public int getEntityOwnerId() { return entityOwnerId; } public EventModel setEntityOwnerId(int entityOwnerId) { this.entityOwnerId = entityOwnerId; return this; } public int getActorId() { return actorId; } public EventModel setActorId(int actorId) { this.actorId = actorId; return this; } public String getExts(String key) { return exts.get(key); } public EventModel setExts(String key, String value) { exts.put(key,value);return this; } public Map<String, String> getExts() { return exts; } public void setExts(Map<String, String> exts) { this.exts = exts; } }
(3)创建EventProducer类,作为发送事件的入口,
public class EventProducer { private static Logger logger = LoggerFactory.getLogger(EventProducer.class); @Autowired JedisAdapt jedisAdapt; //把事件发送出去,即把事件保存到队列里 //另外一种方法实现队列,BlockingQueue<EventModel> public boolean fireEvent(EventModel eventModel){ try { String eventkey = RedisKeyUtil.getEventKey(); String json = JSONObject.toJSONString(eventModel); jedisAdapt.lpush(eventkey, json); return true; }catch (Exception e){ logger.error("加入队列失败"); return false; } } }
public class RedisKeyUtil { private static String BIZ_EVENT = "EVENT_QUEUE"; //获取队列key的工具类 public static String getEventKey(){ return BIZ_EVENT; } }
(4)创建EventHandler接口,
public interface EventHandler { void doHandle(EventModel eventModel); //处理事件 List<EventType> getSupportTypes(); //实现类关注的事件 }
(5)创建EventCustomer获取队列里的所有事件,并将事件分发给对应的handler处理
public class EventConsumer implements InitializingBean,ApplicationContextAware{ private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class); private ApplicationContext applicationContext; private Map<EventType,List<EventHandler>> config = new HashMap<EventType,List<EventHandler>>(); @Autowired JedisAdapt jedisAdapt; @Override public void afterPropertiesSet() throws Exception { Map<String,EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class); if (beans != null){ for (Map.Entry<String,EventHandler> entry : beans.entrySet()){ List<EventType> eventTypes = entry.getValue().getSupportTypes(); for (EventType type : eventTypes){ if(!config.containsKey(type)){ config.put(type,new ArrayList<EventHandler>()); } config.get(type).add(entry.getValue()); } } } Thread thread = new Thread(new Runnable() { @Override public void run() { while (true) { String eventKey = RedisKeyUtil.getEventKey(); List<String> events = jedisAdapt.brpop(0, eventKey); for (String message : events){ if (message.equals(eventKey)){ continue; } EventModel eventModel = JSONObject.parseObject(message,EventModel.class); if(!config.containsKey(eventModel.getType())){ logger.error("不能识别的事件"); continue; } for (EventHandler handler : config.get(eventModel.getType())){ handler.doHandle(eventModel); } } } } }); thread.start(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
最后创建EventHandler接口的实现类LoginHandler
public class LoginHandler implements EventHandler { private static Logger logger = LoggerFactory.getLogger(LoginHandler.class); @Override public void doHandle(EventModel eventModel) { logger.info("登录成功"); } @Override public List<EventType> getSupportTypes() { return Arrays.asList(EventType.LOGIN); } }
需要异步处理的事件通过创建EventModel类,并调用EventProducer的fireEvent方法,push到队列里,如:
EventModel model = new EventModel(EventType.LOGIN); eventProducer.fireEvent(model);
相关文章推荐
- [置顶] Redis应用3-基于Redis消息队列实现的异步操作
- JavaScript 通过队列实现异步流控制
- 异步redis队列实现 数据入库
- celery配合rabbitmq任务队列实现任务的异步调度执行[celery redis] 推荐
- [置顶] 【Excel_To_DB】SpringBoot+EasyPoi+Redis消息队列实现Excel批量异步导入数据库(三)
- 使用Redis实现异步消息队列
- [置顶] 【Excel_To_DB】SpringBoot+EasyPoi+Redis消息队列实现Excel批量异步导入数据库(一)
- redis实现异步队列
- 通过Socket实现进程间异步通讯(四)
- 利用 Remoting 实现异步队列机制
- 通过异步程序调用(APC)实现的定时功能
- 系出名门Android(10) - HTTP 通信, XML 解析, 通过 Hander 实现异步消息处理
- 系出名门Android(10) - HTTP 通信, XML 解析, 通过 Hander 实现异步消息处理
- 通过异步程序调用(APC)实现的定时功能 CreateWaitableTimer和SetWaitableTimer函数
- Silverlight 异步任务队列实现
- HTTP 通信, XML 解析, 通过 Hander 实现异步消息处理
- UNIX环境高级编程学习之第十五章进程间通信 - 通过消息队列实现进程间通信
- 通过Socket实现进程间异步通讯(三)
- 通过Socket实现进程间异步通讯(二)
- [原创]一个通过BackgroundWorker实现WinForm异步操作的例子