spring集成rabbitMq(基于topic和fanout模式)
2017-06-18 09:37
477 查看
本文代码样例都是在spring集成环境下写的,都测试通过。
pom文件需要加入spring集成rabbitMq的依赖:
2
3
4
5
1
2
3
4
5
图解:
使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
发送端spring的xml配置:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
发送端的Java代码:
2
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
接收端(我配置的接收端与发送端不在同一个项目)spring的xml配置:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
接收端java代码(只列出一个监听,另外一个类似):
2
3
4
5
6
1
2
3
4
5
6
图解:
使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。
发送端spring的xml配置:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
发送端java代码:
2
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
客户端spring的xml配置(我举例的配置客户端与发送端不在同一项目下):
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
相关理论说明文章:
1、http://www.2cto.com/kf/201612/575219.html
2、http://www.cnblogs.com/luxiaoxun/p/3918054.html
3、http://hwcrazy.com/34195c9068c811e38a44000d601c5586/be62fc2668c811e3adba000d601c5586/
4、http://blog.csdn.net/lmj623565791/article/details/37706355
pom文件需要加入spring集成rabbitMq的依赖:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.3.RELEASE</version> </dependency>1
2
3
4
5
1
2
3
4
5
一、 rabbitMq的topic模式:
图解: 使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
发送端spring的xml配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- rabbitMQ配置 --> <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="192.168.1.11"/> <property name="username" value="root"/> <property name="password" value="lee13233"/> <property name="channelCacheSize" value="8"/> <property name="port" value="5672"></property> </bean> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!-- autoDelete:是否自动删除 durable:持久化 --> <rabbit:queue name="test123queue" durable="true"/> <rabbit:queue name="test321queue" durable="true"/> <!-- topic主题 --> <rabbit:topic-exchange name="leo.pay.topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> <bindings> <binding queue="test123queue" pattern="*.*.test123" /> <binding queue="test321queue" pattern="test321.#" /> </bindings> </rabbit:topic-exchange> <!-- 创建rabbitTemplate 消息模板类 --> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="rabbitConnectionFactory"></constructor-arg> </bean> </beans>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
发送端的Java代码:
@Test public void testRabbitMq() throws Exception { RabbitTemplate rabbitTemplate = (RabbitTemplate) LeoContext.getContext().getApplication().getBean("rabbitTemplate"); //第二个参数为路由key(routingKey)的值,当路由可以为test321.hello.test123时,两个消费队列都可以收到消息,当值为test321.hello.aaa时,只有绑定了test321.#的队列才可以收到消息,当值为ta1.hello.test123,只有绑定了*.*.test123的队列才可收到消息 for(int i = 1; i <= 10; i++) { String str = "hello" + i; rabbitTemplate.send("leo.pay.topic.exchange", "test321.hello.test123", new Message(str.getBytes(), new MessageProperties())); } }1
2
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
接收端(我配置的接收端与发送端不在同一个项目)spring的xml配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" default-autowire="byName"> <!-- rabbitMQ配置 --> <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="192.168.1.11"/> <property name="username" value="leo"/> <property name="password" value="lee31211"/> <property name="channelCacheSize" value="8"/> <property name="port" value="5672"></property> </bean> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <rabbit:queue name="test123queue" durable="true" /> <rabbit:queue name="test321queue" durable="true" /> <!-- 该处是指将路由leo.pay.topic.exchange与两个队列绑定在一块,也可以在rabbitMq的控制台上手动绑定,手动绑定之后,该处代码可以省略,其实发送端已经绑定过了,也没必要绑定了,所以该代码可以省略 --> <!-- <rabbit:topic-exchange name="leo.pay.topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> --> <!-- <bindings> --> <!-- <binding queue="test123queue" pattern="test123.*" /> --> <!-- <binding queue="test321queue" pattern="test321.*" /> --> <!-- </bindings> --> <!-- </rabbit:topic-exchange> --> <!-- 启动两个队列对应的监听(消费者) --> <bean id="detailQueueConsumer" class="com.leo.website.cousumer.DetailQueueConsumer"></bean> <bean id="testQueueConsumer" class="com.leo.website.cousumer.TestQueueConsumer"></bean> <!-- 将两个队列加入监听容器中,每个队列的监听都对应一个监听器 --> <rabbit:listener-container connection-factory="rabbitConnectionFactory" concurrency= "8"> <rabbit:listener queues="test123queue" ref="detailQueueConsumer" method="onMessage"/> <rabbit:listener queues="test321queue" ref="testQueueConsumer" method="onMessage"/> </rabbit:listener-container> </beans>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
接收端java代码(只列出一个监听,另外一个类似):
public class DetailQueueConsumer implements MessageListener { @Override public void onMessage(Message message) { System.out.println("DetailQueueConsumer: " + new String(message.getBody())); } }1
2
3
4
5
6
1
2
3
4
5
6
二、 rabbitMq的fanout模式(发布者订阅者模式):
图解: 使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。
发送端spring的xml配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- rabbitMQ配置 --> <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="192.168.1.12"/> <property name="username" value="root"/> <property name="password" value="lee323"/> <property name="channelCacheSize" value="8"/> <property name="port" value="5672"></property> </bean> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <rabbit:queue name="test123queue" durable="true"/> <rabbit:queue name="test321queue" durable="true"/> <rabbit:fanout-exchange name="leo.pay.fanout.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> </rabbit:fanout-exchange> <!-- 创建rabbitTemplate 消息模板类 --> <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> <constructor-arg ref="rabbitConnectionFactory"></constructor-arg> </bean> </beans>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
发送端java代码:
@Test public void testRabbitMq() throws Exception { RabbitTemplate rabbitTemplate = (RabbitTemplate) LeoContext.getContext().getApplication().getBean("rabbitTemplate"); //往名字为leo.pay.fanout.exchange的路由里面发送数据,客户端中只要是与该路由绑定在一起的队列都会收到相关消息,这类似全频广播,发送端不管队列是谁,都由客户端自己去绑定,谁需要数据谁去绑定自己的处理队列。 for(int i = 1; i <= 10; i++) { String str = "hello" + i; rabbitTemplate.send("leo.pay.fanout.exchange", "", new Message(str.getBytes(), new MessageProperties())) } }1
2
3
4
5
6
7
8
9
1
2
3
4
5
6
7
8
9
客户端spring的xml配置(我举例的配置客户端与发送端不在同一项目下):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" default-autowire="byName"> <!-- rabbitMQ配置 --> <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="192.168.1.13"/> <property name="username" value="root"/> <property name="password" value="lee2342"/> <property name="channelCacheSize" value="8"/> <property name="port" value="5672"></property> </bean> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <rabbit:queue name="test123queue" durable="true" /> <rabbit:queue name="test321queue" durable="true" /> <!-- 该处把需要数据的队列与路由绑定一起,如果手动在控制台绑定就不需要此代码 --> <rabbit:fanout-exchange name="leo.pay.fanout.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true"> <rabbit:bindings> <rabbit:binding queue="test123queue"></rabbit:binding> <rabbit:binding queue="test321queue"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <bean id="detailQueueConsumer" class="com.leo.website.cousumer.DetailQueueConsumer"></bean> <bean id="testQueueConsumer" class="com.leo.website.cousumer.TestQueueConsumer"></bean> <rabbit:listener-container connection-factory="rabbitConnectionFactory" concurrency= "8"> <rabbit:listener queues="test123queue" ref="detailQueueConsumer" method="onMessage"/> <rabbit:listener queues="test321queue" ref="testQueueConsumer" method="onMessage"/> </rabbit:listener-container> </beans>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
相关理论说明文章:
1、http://www.2cto.com/kf/201612/575219.html
2、http://www.cnblogs.com/luxiaoxun/p/3918054.html
3、http://hwcrazy.com/34195c9068c811e38a44000d601c5586/be62fc2668c811e3adba000d601c5586/
4、http://blog.csdn.net/lmj623565791/article/details/37706355
相关文章推荐
- spring boot整合RabbitMQ(Topic模式)
- Java基于Spring Boot、RabbitMQ实现事件驱动模式
- 第5篇 RabbitMQ集成SpringBoot实现Direct模式
- 基于哨兵【sentinel】模式的redis服务集群并与spring集成
- spring boot整合RabbitMQ(Topic模式)
- Spring Boot整合RabbitMQ实例(Topic模式)
- Spring集成Redis方案(spring-data-redis)(基于Jedis的单机模式)(待实践)
- Spring Boot 整合 RabbitMQ 之 Topic转发模式 (二)
- Spring 集成 HornetQ Topic 施用
- Struts2.1&Hibernate3.2&Spring2.5集成[基于Annotation]--Spring2.5单元测试
- Struts2.1&Hibernate3.2&Spring2.5集成[基于Annotation]--运行效果
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- 基于配置文件的工厂设计模式实现,并且做到对象的单例,类似于spring的ioc
- Spring 集成hibernate时配置连接释放模式
- 基于Maven在Spring中集成CXF Web Service框架
- RabbitMQ学习之基于spring-rabbitmq的消息异步发送
- RabbitMQ学习之基于spring-rabbitmq的RPC远程调用
- mybatis, spring基于mapper文件和注解的集成
- Spring基于事件驱动模型的订阅发布模式代码实例详解
- 将基于Spring的UnitTest集成到Maven中