您的位置:首页 > 编程语言 > Java开发

Springboot + rabbitMQ实现延迟消费以及spring与策略模式联合处理不同的业务(二)

2018-08-29 09:36 811 查看

由于太长了,所以分成两篇写,接上一篇讲解了消息的定义和发送,这里继续讲解消费者

由于可能每条消息所处理的逻辑可能不一样,例如:常规订单30分钟不支付则取消订单,团购订单一天拼团不成功则取消等等,为了避免在消费者监听类中使用大量if else,这里使用策略模式来处理(由于spring的bean的初始化的时候创建,如果用Java常规的反射获取类,则在具体策略类用注入别的bean的时候,会拿不到值,所以需要通过applicationContext获取类)

1.消息消费者类-MessageConsumer -- 使用注解的方式监听,这里包括消费确认

[code]@Component
@RabbitListener(queues = QueueContent.MESSAGE_QUEUE_NAME)
public class MessageConsumer {

static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@RabbitHandler
public void handler(String msg,Channel channel, Message message) throws IOException {
if (!StringUtils.isEmpty(msg)) {
MessagePojo messagePojo = JSONUtil.toBean(msg,MessagePojo.class);
try {
//这里使用策略模式和Spring的结合使用,通过applicationContext获取类
Strategy s =  (Strategy)SpringContextUtil.getBean(messagePojo.getClassName());
s.doJob(messagePojo.getParams());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
logger.info("[MessageConsumer延时消息消费时间]"+DateUtil.datetoString(new Date()) + JSON.toJSONString(messagePojo) + ",消息ID:" + messagePojo.getMessageId());
} catch (Exception e) {
logger.error("确认消费异常",e);
//记录下这条消息
//丢弃这条消息(会通知服务器把此队列删掉)
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
//消息重新回到队列(监听会不断消费此消息)
//                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

}

}
}

}

 

2.定义策略接口-Strategy

[code]public interface Strategy {

public void doJob(Map<String, Object> params) throws Exception;

}

3.定义具体策略实现类---A  B 

[code]@Component("A")
public class A implements Strategy {

/**
*
* @param params 接口所需参数
*/
@Override
public void doJob(Map<String, Object> params) {
System.out.println("用A方法处理");
}
}

@Component("B")
public class B implements Strategy {

/**
*
* @param params 接口所需参数
*/
@Override
public void doJob(Map<String, Object> params) {
System.out.println("用B方法处理");
}
}

4.获取上下文工具类,在spring容器中根据类名获取具体类---SpringContextUtil

[code]@Component
public class SpringContextUtil implements ApplicationContextAware {

// Spring应用上下文环境
private static ApplicationContext applicationContext;

/**
* 实现ApplicationContextAware接口的回调方法。设置上下文环境
*
* @param applicationContext
*/
public void setApplicationContext(ApplicationContext applicationContext) {
SpringContextUtil.applicationContext = applicationContext;
}

/**
* @return ApplicationContext
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}

/**
* 获取对象
*
* @param name
* @return Object
* @throws BeansException
*/
public static Object getBean(String name) throws BeansException {
return applicationContext.getBean(name);
}
}

5.写一个controller测试

[code]@RestController
public class MessageController {

@Autowired
private MessageProvider provider;

Logger logger = LoggerFactory.getLogger(MessageController.class);

@RequestMapping(value="/send_message",produces = "text/json;charset=UTF-8")
@ResponseBody
public String send_message(MessagePojo pojo){
try {
provider.sendMessage(pojo);
return JSON.toJSONString(pojo);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

}

测试结果:延迟消费和具体方法处理具体业务也实现了

源码地址:https://gitee.com/JanXs/springboot-rabbitMQ

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: