您的位置:首页 > 编程语言 > Java开发

随便谈谈RabbitMQ与springBoot进行集成。

2017-10-31 16:31 501 查看
先说说题外话,本来只想找到一个springBoot快速集成RabbitMQ的例子,用起来就行的。但是百度搜了一大通, 各有各的玩法,但是就是没找到一个自己心仪的方式。最终发现,稍微看看springBoot的jar包,顿时觉得清晰好多。

顺便说明一下,这个文章全是我自己总结的,目前是不同于网络上各种转载文的。旨在给有兴趣的朋友共同分享。

关于RabbitMQ就不做介绍了。几种消息传递模式官网自有介绍。这里只关注两个点:

1、如何主动发送及消费消息。

2、如何注册监听自动消费发过来的消息。

首先针对第一个问题:

springBoot已经有了集成rabbitMQ 的 starter

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


加入这个引用后,就可以在springBoot的配置文件中直接配置rabbitMQ的属性,(以spring.rabbitmq开头)然后在自己的方法中可以注入 RabbitTemplate ,用这个template进行发消息 以及 收消息 的操作 。可以设置exchange ,routekey,queue什么的,然后一大堆的send receive方法可以调用。

是不是觉得有点太简单了?这用法比百度搜到的大部分示例都简单,甚至都不用上示例代码。那下面就一起来看看是怎么回事呗。

可以看到 引入了maven依赖后,springBoot的自动配置模块就已经引入进来了,正是在这里面封装了springBoot对RabbitMQ的配置。在spring-boot-autoconfigure模块中有RabbitMQ 的自定义配置加载类



这个类在META-INF下的spring.factories文件中已经配置成了在springBoot启动时就会自动进行配置。这是springBoot的自动配置机制。



然后在这个配置类的代码头部可以看到配置的要求



同时,这个配置类往spring容器里注入了几个Bean。





其中,这个RabbitTemplate就是可以用来操作RabbitMQ的一个重要的工具类。

这样,在自己的代码中,就可以直接用@Resource或者@Autowire引用这个类来发送及接受消息了。

而RabbitProperties就是rabbit的相关配置的处理类。



所以,只要在application.properties里配置spring.rabbitmq开头的几个属性,就会自动配置到这些类里。

至于spring.rabbitmq下可选的配置属性, 在spring-boot-autoconfigure包下的/META-INF/spring-configuration-metadata.json 文件中有详细的定义。用IDEA就会自动弹出所有可选的配置项。eclipse貌似不行。–其实这些注释都是通过RabbitProperties来注入的,看看这个类的属性也能知道要配些什么东西。



分析到这里,通过SpringBoot主动发送和接受消息的引入方式就已经知道了。

然后针对第二个问题:

继续看看怎么用springBoot的方式注册监听,自动消费发过来的消息。

还是在那个 自动注册类 RabbitAutoConfiguration里,可以看到头部还引用了一个配置类 RabbitAnnotationDrivenConfiguration 从名字就看出这是Rabbit的注释驱动配置类,一看就是springBoot的风格。还是看头部的声明



这个配置需要有EnableRabbit注释才会启动配置。然后,看看这个EnableRabbit 注释前面的注释内容就很一目了然了

org.springframework.amqp.rabbit.annotation.EnableRabbit

Enable Rabbit listener annotated endpoints that are created under the

cover by a RabbitListenerContainerFactory. To be used on

Configuration classes as follows: @Configuration @EnableRabbit

public class AppConfig {

@Bean

public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setConnectionFactory(connectionFactory());

factory.setMaxConcurrentConsumers(5);

return factory;

}

// other @Bean definitions } The RabbitListenerContainerFactory is responsible to create the listener container responsible for a

particular endpoint. Typical implementations, as the

SimpleRabbitListenerContainerFactory used in the sample above,

provides the necessary configuration options that are supported by

the underlying MessageListenerContainer.

@EnableRabbit enables detection of RabbitListener annotations on any

Spring-managed bean in the container. For example, given a class

MyService: package com.acme.foo;

public class MyService {

@RabbitListener(containerFactory=”myRabbitListenerContainerFactory”,

queues=”myQueue”)

public void process(String msg) {

// process incoming message

} } The container factory to use is identified by the containerFactory attribute defining the name of the

RabbitListenerContainerFactory bean to use. When none is set a

RabbitListenerContainerFactory bean with name

rabbitListenerContainerFactory is assumed to be present. the

following configuration would ensure that every time a

org.springframework.amqp.core.Message is received on the

org.springframework.amqp.core.Queue named “myQueue”,

MyService.process() is called with the content of the message:

@Configuration @EnableRabbit public class AppConfig {

@Bean

public MyService myService() {

return new MyService();

}

// Rabbit infrastructure setup  } Alternatively, if MyService were annotated with  @Component, the following configuration would


ensure that its @RabbitListener annotated method is invoked with a

matching incoming message: @Configuration @EnableRabbit

@ComponentScan(basePackages=”com.acme.foo”) public class AppConfig {

} Note that the created containers are not registered against the

application context but can be easily located for management purposes

using the RabbitListenerEndpointRegistry.

Annotated methods can use flexible signature; in particular, it is

possible to use the Message abstraction and related annotations, see

RabbitListener Javadoc for more details. For instance, the following

would inject the content of the message and a a custom “myCounter”

AMQP header: @RabbitListener(containerFactory =

“myRabbitListenerContainerFactory”, queues = “myQueue”) public void

process(String msg, @Header(“myCounter”) int counter) {

// process incoming message } These features are abstracted by the MessageHandlerMethodFactory that is responsible to build the

necessary invoker to process the annotated method. By default,

DefaultMessageHandlerMethodFactory is used. When more control is

desired, a @Configuration class may implement

RabbitListenerConfigurer. This allows access to the underlying

RabbitListenerEndpointRegistrar instance. The following example

demonstrates how to specify an explicit default

RabbitListenerContainerFactory @Configuration

@EnableRabbit public class AppConfig implements

RabbitListenerConfigurer {

@Override

public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {

registrar.setContainerFactory(myRabbitListenerContainerFactory());

}

@Bean
public RabbitListenerContainerFactory myRabbitListenerContainerFactory() {
// factory settings
}

@Bean
public MyService myService() {
return new MyService();
}  }  } For reference, the example above can be compared to the following Spring  XML configuration:   <beans>
<rabbit:annotation-driven container-factory="myRabbitListenerContainerFactory"/>

<bean id="myRabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
// factory settings
</bean>

<bean id="myService" class="com.acme.foo.MyService"/>  </beans> It is also possible to specify a custom RabbitListenerEndpointRegistry


in case you need more control on the way the containers are created

and managed. The example below also demonstrates how to customize the

RabbitHandlerMethodFactory to use with a custom Validator so that

payloads annotated with Validated are first validated against a

custom Validator. @Configuration @EnableRabbit public

class AppConfig implements RabbitListenerConfigurer {

@Override

public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {

registrar.setEndpointRegistry(myRabbitListenerEndpointRegistry());

registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);

}

@Bean
public RabbitListenerEndpointRegistry myRabbitListenerEndpointRegistry() {
// registry configuration
}

@Bean
public RabbitHandlerMethodFactory myMessageHandlerMethodFactory() {
DefaultRabbitHandlerMethodFactory factory = new DefaultRabbitHandlerMethodFactory();
factory.setValidator(new MyValidator());
return factory;
}

@Bean
public MyService myService() {
return new MyService();
}  }  } For reference, the example above can be compared to the following Spring  XML configuration:   <beans>
<rabbit:annotation-driven registry="myRabbitListenerEndpointRegistry"
handler-method-factory="myRabbitHandlerMethodFactory"/>

<bean id="myRabbitListenerEndpointRegistry"
class="org.springframework.amqp.rabbit.config.RabbitListenerEndpointRegistry">
// registry configuration
</bean>

<bean id="myRabbitHandlerMethodFactory"
class="org.springframework.amqp.rabbit.config.DefaultRabbitHandlerMethodFactory">
<property name="validator" ref="myValidator"/>
</bean>

<bean id="myService" class="com.acme.foo.MyService"/>  </beans> Implementing RabbitListenerConfigurer  also allows for fine-grained


control over endpoints registration via the

RabbitListenerEndpointRegistrar. For example, the following

configures an extra endpoint: @Configuration @EnableRabbit public

class AppConfig implements RabbitListenerConfigurer {

@Override

public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {

SimpleRabbitListenerEndpoint myEndpoint = new SimpleRabbitListenerEndpoint();

// … configure the endpoint

registrar.registerEndpoint(endpoint, anotherRabbitListenerContainerFactory());

}

@Bean
public MyService myService() {
return new MyService();
}

@Bean
public RabbitListenerContainerFactory anotherRabbitListenerContainerFactory() {
// ...
}

// Rabbit infrastructure setup  }  } Note that all beans implementing RabbitListenerConfigurer  will be detected and invoked in


a similar fashion. The example above can be translated in a regular

bean definition registered in the context in case you use the XML

configuration.

Since:

1.4 Author: Stephane Nicoll See Also: RabbitListener RabbitListenerAnnotationBeanPostProcessor

org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar

org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry

里面很多详细的配置,可以逐步的设置默认的处理流程。有兴趣可以再继续深入跟踪下去。

(网上看到有一个教程,说对ConnectionFactory,因为没有默认设置来支持消息回调 connectionFactory.setPublisherConfirms(true);

而大费周章,摈弃springBoot的自动配置,自己从头注入。虽然没有动手试过,但是看着些说明,springBoot都是有很优雅的方式支持设置的。)

所以最后,针对第二个问题,结果也就出来了。

最简单的用法就是按注释里说的分三步

先定义一个SimpleRabbitListenerContainerFactory

@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxConcurrentConsumers(5);
return factory;
}
// other @Bean definitions
}


然后就可以定义自己的service监听这个连接发过来的消息了。

package com.acme.foo;

public class MyService {
@RabbitListener(containerFactory="myRabbitListenerContainerFactory", queues="myQueue")
public void process(String msg) {
// process incoming message
}
}


然后把这个MyService注入到spring容器里

@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public MyService myService() {
return new MyService();
}

// Rabbit infrastructure setup
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RabbitMQ springBoot