您的位置:首页 > 编程语言 > Java开发

rabbmitmq和spring整合

2016-10-11 00:00 148 查看
摘要: 目前我的maven依赖整合了springmvc+mysql+redis+mybatis+fastjson+quartz+poi,可以根据需求适当的去除一些依赖

1、添加maven依赖的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <parent>
<artifactId>im_home</artifactId>
<groupId>com.autohome</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>im-statistics</artifactId>
<packaging>war</packaging>
<name>im-statistics Maven Webapp</name>
<url>http://maven.apache.org</url>
<properties>
<!-- spring版本号 -->
<spring.version>4.0.4.RELEASE</spring.version>
<!-- mybatis版本号 -->
<mybatis.version>3.2.6</mybatis.version>
<!-- log4j日志文件管理包版本 -->
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<dependencies>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- mybatis核心包 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<!-- mybatis/spring包 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.2.2</version>
</dependency>
<!-- 导入Mysql数据库链接jar包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>

<!-- 格式化对象,方便输出日志 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.41</version>
</dependency>
<!--数据库连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.18</version>
</dependency>
<!-- 导入java ee jar 包 -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>

<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.20</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs -->
<!--<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.3</version>
</dependency>-->

<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.12</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.0</version>
</dependency>

</dependencies>
<build>
<finalName>im-statistics</finalName>
<!-- <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>-->
<resources>
<resource>
<directory>src/main/profile/${prop.dir.name}</directory>
<targetPath>./</targetPath>
</resource>
<resource>
<directory>src/main/resources</directory>
<targetPath>./</targetPath>
</resource>
</resources>
</build>
<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<prop.dir.name>dev</prop.dir.name>
</properties>
</profile>
<profile>
<id>local</id>
<properties>
<prop.dir.name>local</prop.dir.name>
</properties>
</profile>
<profile>
<id>beta</id>
<properties>
<prop.dir.name>beta</prop.dir.name>
</properties>
</profile>
<profile>
<id>online</id>
<properties>
<prop.dir.name>online</prop.dir.name>
</properties>
</profile>
<profile>
<id>ds-production</id>
<properties>
<prop.dir.name>ds-production</prop.dir.name>
</properties>
</profile>
</profiles>
</project>

2、web.xml配置

<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
<!-- Spring和mybatis的配置文件 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mybatis.xml</param-value>
</context-param>
<!-- 编码过滤器 -->
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<!-- Spring监听器 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- 防止Spring内存溢出监听器 -->
<listener>
<listener-class>org.springframework.web.util.IntrospectorCleanupListener</listener-class>
</listener>

<!-- Spring MVC servlet -->
<servlet>
<servlet-name>SpringMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mybatis.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<!-- 此处可以可以配置成*.do,对应struts的后缀习惯 -->
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>

3、spring-rabbmitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >

<!-- 引入配置文件 -->
<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:rabbitmq.properties</value>
</list>
</property>
</bean>

<rabbit:connection-factory id="rabbitmqConnectionFactory"
username="${rmq.manager.user}"
password="${rmq.manager.password}"
host="${rmq.ip}"
virtual-host="${rmq.vhost}" />

<rabbit:admin connection-factory="rabbitmqConnectionFactory"/>

<rabbit:template connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"
exchange="testExchangeTopic" id="amqpTemplate"/>

<rabbit:queue id="lanyangyang123" durable="true" auto-delete="false" exclusive="false" name="lanyangyang123"/>
<rabbit:queue id="lanyangyang234" durable="true" auto-delete="false" exclusive="false" name="lanyangyang234"/>

<!--  1 direct exchange  一条消息可以发送到多个queue,只要这些queue的routeKey相同即可-->
<rabbit:direct-exchange name="testExchange3" durable="true" id="testExchange3">
<rabbit:bindings>
<!--此key就是routekey-->
<rabbit:binding queue="lanyangyang123" key="lanyangyang_queue_key" />
<rabbit:binding queue="lanyangyang234" key="lanyangyang_queue_key" />
</rabbit:bindings>
</rabbit:direct-exchange>

<!--  2 topic exchange-->
<rabbit:queue id="lanyangyang.log.test1" durable="true" auto-delete="false" exclusive="false" name="lanyangyang.log.test1"/>
<rabbit:queue id="lanyangyang.log.test2" durable="true" auto-delete="false" exclusive="false" name="lanyangyang.log.test2"/>
<rabbit:topic-exchange name="testExchangeTopic" durable="true" auto-delete="false" id="testExchangeTopic">
<rabbit:bindings>
<rabbit:binding queue="lanyangyang.log.test1" pattern="#.log.#" />
<rabbit:binding queue="lanyangyang.log.test2" pattern="#.log.#" />
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="auto">
<rabbit:listener queues="lanyangyang123" ref="rabbmitmqQueueListener"/>
</rabbit:listener-container>
<bean id="rabbmitmqQueueListener" class="com.autohome.statistics.mq.listener.RabbmitmqQueueListener" />
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<bean id="jsonMessageConverter"  class="com.autohome.statistics.mq.util.FastJsonMessageConverter" />

</beans>

4、rabbitmq.properties

rmq.ip=192.168.1.20
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin
rmq.vhost=/vhost1

5、FastJsonMessageConverter.java

public class FastJsonMessageConverter extends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

public static final String DEFAULT_CHARSET = "UTF-8";

private volatile String defaultCharset = DEFAULT_CHARSET;

public FastJsonMessageConverter() {
super();
//init();
}

public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = (defaultCharset != null) ? defaultCharset
: DEFAULT_CHARSET;
}

public Object fromMessage(Message message)
throws MessageConversionException {
return null;
}

public <T> T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) JSON.parseObject(json, t.getClass());
}

protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
try {
String jsonString = JSON.toJSONString(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);

}
}

6、RabbmitmqQueueListener.java #消费消息

public class RabbmitmqQueueListener implements MessageListener {

public static final Logger log = Logger.getLogger(RabbmitmqQueueListener.class);
public void onMessage(Message message) {
try {
String msg = new String(message.getBody(), "utf-8");
log.error(msg);
Thread.sleep(2000);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

7、RabbitmqController.java #生产消息

@Controller
@RequestMapping("rabbitmq")
public class RabbitmqController  {

public static final Logger log = Logger.getLogger(RabbitmqController.class);

@Autowired
private AmqpTemplate amqpTemplate;

@RequestMapping(value = "/testRabbitmq", produces = "application/json;charset=UTF-8")
@ResponseBody
public String testRabbitmq(HttpServletRequest request, HttpServletResponse response){
String key = request.getParameter("key");
Message msg = new Message(key.getBytes(), new org.springframework.amqp.core.MessageProperties());
//        amqpTemplate.send(msg);
//1.测试direct exchange
//        amqpTemplate.convertAndSend("lanyangyang_queue_key", msg);

//2.测试topic exchange
amqpTemplate.convertAndSend("#.log.#", msg);
log.info("消息发送成功");
return "success";

}
}

ps:可以通过访问http://192.168.1.20:15672,查看消息的产生和消费情况,以及queue和exchange的情况
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息