您的位置:首页 > 数据库 > Redis

队列实现异步--通过redis

2017-02-25 13:43 309 查看
假如有a,b,c,d四个事件,同步就是a,b,c,d四个事件全部做完才能返回;异步就是只做了a事件就可返回,b、c、d三个事件可以延后做,不关注具体做的时间。



如上图:左侧Biz是一个总的入口,通过这个入口把各种事件放到队列里,右侧EventConsumer把队列中的事件取出,然后去找到对应的handle去处理这个事件。EventHandler是一个接口,使程序有更好地扩展性,LoginHandler、LikeHandler是EventHandler的实现类。

代码示例:

//安装好[redis](https://github.com/MSOpenTech/redis)后引入jedis包
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>


(2)首先,创建JedisAdapt类,初始化Redis,并可以通过这个类把事件push到队列里,或者把事件从队列里取出。

//初始化JedisPool,
public class JedisAdapt implements InitializingBean{
private JedisPool pool;
private static final Logger logger = LoggerFactory.getLogger(JedisAdapt.class);
@Override
public void afterPropertiesSet() throws Exception {
pool = new JedisPool("redis://localhost:6379/10");
}
//使用redis实现队列
public long lpush(String key, String value){
Jedis jedis = null;
try {
jedis = pool.getResource();
return jedis.lpush(key,value);
}catch (Exception e){
logger.error("发生异常"+e.getMessage());
}finally {
if (jedis != null){
jedis.close();
}
}
return 0;
}
//获取队列中的元素
public List<String> brpop(int timeout, String key){
Jedis jedis = null;
try {
jedis = pool.getResource();
return jedis.brpop(timeout,key);
}catch (Exception e){
logger.error("发生异常"+e.getMessage());
}finally {
if (jedis != null){
jedis.close();
}
}
return null;
}
}


(2)创建事件类型的枚举类,如登录,评论,点赞·····

//事件的类型
public enum EventType {
LIKE(0),
COMMENT(1),
LOGIN(2),
MAIL(3);

private int value;
EventType(int value){
this.value = value;
}

public int getValue() {
return value;
}
}


(2)每个事件的模型

//事件的模型
public class EventModel {

private EventType type; //事件的类型
private int entityId;   //触发的载体,如评论
private int entityType;
private int entityOwnerId;
private int actorId; //触发者

private Map<String,String> exts = new HashMap<String,String>(); //扩展的对象

public EventModel(){}

public EventModel(EventType type){
this.type = type;
}

public EventType getType() {
return type;
}

public EventModel setType(EventType type) {
this.type = type;
return this;
}

public int getEntityId() {
return entityId;
}

public void setEntityId(int entityId) {
this.entityId = entityId;
}

public int getEntityType() {
return entityType;
}

public EventModel setEntityType(int entityType) {
this.entityType = entityType;
return this;
}

public int getEntityOwnerId() {
return entityOwnerId;
}

public EventModel setEntityOwnerId(int entityOwnerId) {
this.entityOwnerId = entityOwnerId;
return this;
}

public int getActorId() {
return actorId;
}

public EventModel setActorId(int actorId) {
this.actorId = actorId;
return this;
}

public String getExts(String key) {
return exts.get(key);
}

public EventModel setExts(String key, String value) {
exts.put(key,value);return this;
}

public Map<String, String> getExts() {
return exts;
}

public void setExts(Map<String, String> exts) {
this.exts = exts;
}
}


(3)创建EventProducer类,作为发送事件的入口,

public class EventProducer {

private static Logger logger = LoggerFactory.getLogger(EventProducer.class);
@Autowired
JedisAdapt jedisAdapt;
//把事件发送出去,即把事件保存到队列里
//另外一种方法实现队列,BlockingQueue<EventModel>
public boolean fireEvent(EventModel eventModel){
try {
String eventkey = RedisKeyUtil.getEventKey();
String json = JSONObject.toJSONString(eventModel);
jedisAdapt.lpush(eventkey, json);
return true;
}catch (Exception e){
logger.error("加入队列失败");
return false;
}
}
}


public class RedisKeyUtil {
private static String BIZ_EVENT = "EVENT_QUEUE";
//获取队列key的工具类
public static String getEventKey(){
return BIZ_EVENT;
}
}


(4)创建EventHandler接口,

public interface EventHandler {

void doHandle(EventModel eventModel); //处理事件

List<EventType> getSupportTypes(); //实现类关注的事件
}


(5)创建EventCustomer获取队列里的所有事件,并将事件分发给对应的handler处理

public class EventConsumer implements InitializingBean,ApplicationContextAware{
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
private ApplicationContext applicationContext;
private Map<EventType,List<EventHandler>> config = new HashMap<EventType,List<EventHandler>>();
@Autowired
JedisAdapt jedisAdapt;

@Override
public void afterPropertiesSet() throws Exception {
Map<String,EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);

if (beans != null){
for (Map.Entry<String,EventHandler> entry : beans.entrySet()){
List<EventType> eventTypes = entry.getValue().getSupportTypes();
for (EventType type : eventTypes){
if(!config.containsKey(type)){
config.put(type,new ArrayList<EventHandler>());
}
config.get(type).add(entry.getValue());
}
}
}

Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
String eventKey = RedisKeyUtil.getEventKey();
List<String> events = jedisAdapt.brpop(0, eventKey);
for (String message : events){
if (message.equals(eventKey)){
continue;
}
EventModel eventModel = JSONObject.parseObject(message,EventModel.class);
if(!config.containsKey(eventModel.getType())){
logger.error("不能识别的事件");
continue;
}
for (EventHandler handler : config.get(eventModel.getType())){
handler.doHandle(eventModel);
}
}
}
}
});
thread.start();

}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}


最后创建EventHandler接口的实现类LoginHandler

public class LoginHandler implements EventHandler {
private static Logger logger = LoggerFactory.getLogger(LoginHandler.class);

@Override
public void doHandle(EventModel eventModel) {
logger.info("登录成功");
}

@Override
public List<EventType> getSupportTypes() {
return Arrays.asList(EventType.LOGIN);
}
}


需要异步处理的事件通过创建EventModel类,并调用EventProducer的fireEvent方法,push到队列里,如:

EventModel model = new EventModel(EventType.LOGIN);
eventProducer.fireEvent(model);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: