activemq与spring整合,自动消费队列的配置与代码
2015-08-05 00:00
567 查看
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <!-- 连接池 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> </bean> <!-- 连接工厂 --> <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <!-- 配置消息目标 --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="com.netease.mobile.device" /> </bean> <!-- 消息模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory" /> <property name="defaultDestination" ref="destination" /> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </property> </bean> <!-- 消息监听器 --> <bean id="consumerMessageListener" class="com.netease.mobileMq.task.deviceCacheFlushTask"/> <!-- 可以获取session的MessageListener --> <!-- <bean id="consumerSessionAwareMessageListener" class="com.netease.mobileMq.task.deviceCacheFlushTask"> <property name="destination" ref="destination"/> </bean> --> <!-- 消息监听容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="activeMQConnectionFactory" /> <property name="destination" ref="destination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean> <!-- <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="activeMQConnectionFactory" /> <property name="destination" ref="destination" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> </bean> --> </beans>
package com.netease.mobileMq.task; import java.util.Date; import java.util.List; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import com.alibaba.fastjson.JSON; import com.netease.commonBean.FlushDeviceCacheBean; import com.netease.device.constant.EquipmentConstants; import com.netease.device.dao.EquipmentMapper; import com.netease.device.dao.FingerUserMapper; import com.netease.device.entity.EquipmentInfo; import com.netease.device.entity.FingerUserInfo; import com.netease.mobile.common.RedisUtil; /** * @author 作者 E-mail:ruanjianlxm@sina.com * @version 创建时间:2015年8月4日 下午4:44:39 * 类说明 */ public class deviceCacheFlushTask implements MessageListener{ private static Logger logger = LoggerFactory.getLogger("equipmentErrorLog"); @Autowired FingerUserMapper fingerUserMapper; @Autowired EquipmentMapper equipmentMapper; @Autowired JmsTemplate jmsTemplate; public void onMessage(Message message){ TextMessage textMsg = (TextMessage) message; String receiveMsg =null; try { receiveMsg = textMsg.getText(); } catch (JMSException e1) { // TODO Auto-generated catch block e1.printStackTrace(); return ; } System.out.println("receiveMsg:"+receiveMsg); if (StringUtils.isBlank(receiveMsg)) { logger.error("deviceCacheFlushTask receiveMsg is null Time is " + new Date()); return ; } else { logger.info("deviceCacheFlushTask receiveMsg " + receiveMsg);//日志中记录每个刷新的数据 } FlushDeviceCacheBean flushBean = JSON.parseObject(receiveMsg, FlushDeviceCacheBean.class);//将传过来的刷新对象进行格式化。 String mainssn = flushBean.getMainssn(); String[] refIds = flushBean.getUserIds(); /*---------------先更新用户缓存----------*/ if (mainssn != null) {//有主账号就更新主账号信息 List<FingerUserInfo> fingerUserInfos = null; try { fingerUserInfos = fingerUserMapper.getAllEqUserInfoByName(mainssn); } catch (Exception e) { // TODO: handle exception logger.error("EquipmentServiceImpl flushCache error", e); return ; } if (fingerUserInfos == null || fingerUserInfos.size() == 0) {// 如果有一个都没有 RedisUtil.delete(EquipmentConstants.EQUIPMENT_FINGER_USER_PRE + mainssn); } else { RedisUtil.set(EquipmentConstants.EQUIPMENT_FINGER_USER_PRE + mainssn, JSON.toJSONString(fingerUserInfos));// 新数据直接替换到缓存中 } } /*---------------再更新设备缓存----------*/ List<EquipmentInfo> equipmentInfos = null; if (refIds!=null&&refIds.length!=0) {//有ID就刷新ID信息 for (String refId : refIds) { try { equipmentInfos = equipmentMapper.getAllEquipmentInfoById(refId); } catch (Exception e) { // TODO: handle exception logger.error("EquipmentServiceImpl flushCache error", e); return ; } if (equipmentInfos == null || equipmentInfos.size() == 0) {// 如果有一个没有,说明查询数据出错失败了 RedisUtil.delete(EquipmentConstants.EQUIPMENT_EQUINFO_PRE + refId); } else { RedisUtil.set(EquipmentConstants.EQUIPMENT_EQUINFO_PRE + refId, JSON.toJSONString(equipmentInfos));// 新数据直接替换到缓存中 } } } } }
相关文章推荐
- activemq与spring整合,生产者端配置
- spring-amqp 整合rabbitmq生产者配置与代码
- spring-amqp整合rabbitmq消费者配置和代码
- web.xml文件中的Listener元素的作用
- Java基础-Date类
- spring读取配置文件
- eclipse 常用设置,常用快捷键修改
- java内部类学习笔记
- spring rabbitmq 动态绑定exchange,routingkey,queue
- Java接口学习总结
- 阻塞队列自定义实现 in Java
- 【JAVA】final修饰Field
- 一个java老菜鸟的而立之感(一)
- Spring 中设置依赖注入
- 阻塞队列自定义实现 in Java
- JAVA中的重载和重写
- Mac里安装配置Jdk
- Java心得6
- 写 Java 也得了解 CPU 缓存
- Eclipse添加默认的JRE