Springboot + rabbitMQ实现延迟消费以及spring与策略模式联合处理不同的业务(二)
2018-08-29 09:36
811 查看
由于太长了,所以分成两篇写,接上一篇讲解了消息的定义和发送,这里继续讲解消费者
由于可能每条消息所处理的逻辑可能不一样,例如:常规订单30分钟不支付则取消订单,团购订单一天拼团不成功则取消等等,为了避免在消费者监听类中使用大量if else,这里使用策略模式来处理(由于spring的bean的初始化的时候创建,如果用Java常规的反射获取类,则在具体策略类用注入别的bean的时候,会拿不到值,所以需要通过applicationContext获取类)
1.消息消费者类-MessageConsumer -- 使用注解的方式监听,这里包括消费确认
[code]@Component @RabbitListener(queues = QueueContent.MESSAGE_QUEUE_NAME) public class MessageConsumer { static Logger logger = LoggerFactory.getLogger(MessageConsumer.class); @RabbitHandler public void handler(String msg,Channel channel, Message message) throws IOException { if (!StringUtils.isEmpty(msg)) { MessagePojo messagePojo = JSONUtil.toBean(msg,MessagePojo.class); try { //这里使用策略模式和Spring的结合使用,通过applicationContext获取类 Strategy s = (Strategy)SpringContextUtil.getBean(messagePojo.getClassName()); s.doJob(messagePojo.getParams()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); logger.info("[MessageConsumer延时消息消费时间]"+DateUtil.datetoString(new Date()) + JSON.toJSONString(messagePojo) + ",消息ID:" + messagePojo.getMessageId()); } catch (Exception e) { logger.error("确认消费异常",e); //记录下这条消息 //丢弃这条消息(会通知服务器把此队列删掉) //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); //消息重新回到队列(监听会不断消费此消息) // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
2.定义策略接口-Strategy
[code]public interface Strategy { public void doJob(Map<String, Object> params) throws Exception; }
3.定义具体策略实现类---A B
[code]@Component("A") public class A implements Strategy { /** * * @param params 接口所需参数 */ @Override public void doJob(Map<String, Object> params) { System.out.println("用A方法处理"); } } @Component("B") public class B implements Strategy { /** * * @param params 接口所需参数 */ @Override public void doJob(Map<String, Object> params) { System.out.println("用B方法处理"); } }
4.获取上下文工具类,在spring容器中根据类名获取具体类---SpringContextUtil
[code]@Component public class SpringContextUtil implements ApplicationContextAware { // Spring应用上下文环境 private static ApplicationContext applicationContext; /** * 实现ApplicationContextAware接口的回调方法。设置上下文环境 * * @param applicationContext */ public void setApplicationContext(ApplicationContext applicationContext) { SpringContextUtil.applicationContext = applicationContext; } /** * @return ApplicationContext */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 获取对象 * * @param name * @return Object * @throws BeansException */ public static Object getBean(String name) throws BeansException { return applicationContext.getBean(name); } }
5.写一个controller测试
[code]@RestController public class MessageController { @Autowired private MessageProvider provider; Logger logger = LoggerFactory.getLogger(MessageController.class); @RequestMapping(value="/send_message",produces = "text/json;charset=UTF-8") @ResponseBody public String send_message(MessagePojo pojo){ try { provider.sendMessage(pojo); return JSON.toJSONString(pojo); } catch (Exception e) { e.printStackTrace(); return null; } } }
测试结果:延迟消费和具体方法处理具体业务也实现了
阅读更多
相关文章推荐
- 详细介绍Spring Boot + RabbitMQ实现延迟队列
- Spring Boot RabbitMQ 延迟消息实现完整版示例
- Spring Boot + RabbitMQ 实现消息队列场景
- spring boot Rabbitmq集成,延时消息队列实现
- Springboot+rabbitmq如何实现高并发的rpc调用
- springboot RabbitMq的安装以及使用
- Spring嵌套事务机制以及不同策略的配置实现
- spring boot Rabbitmq集成,延时消息队列实现
- spring+rabbitmq的简单实现和延迟队列实现
- spring+activemq实战之配置监听多队列实现不同队列消息消费
- Spring-Rabbit消费多个mq中相同的队列
- Spring Boot 自动配置理解 以及实现自定义Starter
- 【Spring实战】----Security4.1.3实现根据请求跳转不同登录页以及登录后根据权限跳转到不同页配置
- RabbitMq + Spring 实现ACK机制
- 浅谈SpringBoot集成Redis实现缓存处理(Spring AOP实现)
- spring boot Activitymq topic与 queue 并存实现方案
- SpringBoot + RabbitMQ 使用Demo
- Spring boot+RabbitMQ环境
- 菜鸟日志--在MQ的消费端上如何去实现自己的业务
- 使用springboot+springsession实现分布式session以及源码解析