您的位置:首页 > 编程语言 > Java开发

Spring-AMQP连接和资源管理

2014-11-16 11:23 876 查看

Spring-AMQP连接和资源管理

AMQP模型已经在前一模块进行了讨论,它们高度抽象,适用于所有的实现,当我们讨论资源管理的时候,要针对的是具体的代理实现。因此在这一部分,我们将聚焦于spring-rabbit模块,RabbitMQ是目前仅有的一个实现。

管理到RabbitMQ代理的连接的中心组件是ConnectionFactory接口。ConnectionFactory的实现职责是提供一个实例,这个实例是com.rabbitmq.client.Connection的包装类。目前为止,我们提供ConnectionFactory的唯一实现是CachingConnectionFactory,它在默认情况下,建立了一个可以被应用共享的连接代理。由于在AMQP中,工作单元是channel(类似于JMS中连接和Session之间的关系),所以共享一个连接是可行的。你可想象的到,connection实例提供了createChannel这一方法。CachingConnectionFactory的实现提支持缓存这些channel,它根据这些channel是否在事物中,分开维持这些channel缓存。当我们创建CachingConnectionFactory实例的时候,‘hostname’可以通过构造函数提供。‘username’和‘password’属性也因该被提供。如果你想配置channel缓存的大小你可以调用setChannelCacheSize()方法来完成。
从1.3版本开始CachingConnectionFactory可以向缓存channel那样来缓存connection。在这种情况下,每次调用createConnection()将创建一个连接或者从缓存中获取连接。关闭连接将把它返还到缓存。创建于这种连接上的channel同样可以缓存。创建分离的连接在一些场景中很有用处,例如从高可用的服务器集群上消费。
注意:
当缓存模式是CONNECTION的时候,队列的自动声明是不支持的。
同样,当写的时候,rabbitmq-client默认为每个连接创建了固定大小的线程池。当使用大量的连接时,你要为CachingConnectionFactory指定executor。这时,同一个executor被所有的连接使用,他的线程也被共享。这个executor线程池的大小应该无界或者针对应用进行调解。如果多个channel被创建于每个连接上,这个池的大小将影响并发性,所以一个可变的线程池最适合。

CachingConnectionFactory connectionFactory = newCachingConnectionFactory("somehost");

connectionFactory.setUsername("guest");

connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

[/code]

使用XML进行配置:

<bean id="connectionFactory"

class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">

<constructor-arg value="somehost"/>

<property name="username" value="guest"/>

<property name="password" value="guest"/>

</bean>

[/code]

可以通过继承AbstractConnectionFactory这个基类来实现自己的连接工厂。

CachingConnectionFactory的一个更为快捷配置方式是使用rabbit 命名空间:

<rabbit:connection-factory id="connectionFactory"/>

[/code]

在大多数情况下,框架将为你设置最合理的默认值。创建的工厂是CachingConnectionFactory。channels默认缓存的大小是1.如果你想缓存更大,你可以通过channelCacheSize这个属性来设置。XML如下:

<bean id="connectionFactory"

class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">

<constructor-arg value="somehost"/>

<property name="username" value="guest"/>

<property name="password" value="guest"/>

<property name="channelCacheSize" value="25"/>

</bean>

[/code]

如果有命名空间设置你可以直接设置channel-Cache-size属性:

<rabbit:connection-factory

id="connectionFactory" channel-cache-size="25"/>

[/code]

主机和端口也可以在命名空间中设置:

<rabbit:connection-factory

id="connectionFactory" host="somehost" port="5672"/>

[/code]

如果运行在集群环境中,你可以设置addresss属性:

<rabbit:connection-factory

id="connectionFactory" addresses="host1:5672,host2:5672"/

[/code]

配置底层的客户端连接工厂

CachingConnectionFactory使用Rabbit 客户端ConnectionFactory实例,一系列配置属性通过CachingConnectionFactory的相应属性设置。为了设置其他属性,可以定义一个rabbit
factory实例,然后通过CachingConnectionFactory的构造函数引用这个实例。当使用上面部分所描述的命名空间时,可以配置connection-factory属性。

<rabbit:connection-factory

id="connectionFactory" connection-factory="rabbitConnectionFactory"/>

[/code]

路由连接工厂

从1.3版本开始,AbstractRoutingConnectionFactory就被引入。它提供了一种机制来配置映射多个连接工厂最终通过运行时的lookupKey来决定目标连接工厂。典型的情况是通过检验线程域内的环境进行检测。为了方面,Spring AMQP提供了SimpleRoutingConnectionFactory,它会从SimpleResourceHolder中获得当前线程的lookupKey:

<bean id="connectionFactory"

class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory>

<property name="targetConnectionFactories">

<map>

<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>

<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>

</map>

</property>

</bean>

<rabbit:template id="template" connection-factory="connectionFactory"/>

[/code]

public classMyService {

@Autowired

privateRabbitTemplate rabbitTemplate;

public voidservice(String vHost, String payload) {

SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);

rabbitTemplate.convertAndSend(payload);

SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());

}

}

[/code]

在使用资源之后要释放资源,这一点很重要。

发布确认和返回
确认和返回消息可以通过设置CachingConnectionFactory的publishConfirms和publishReturns属性为‘true’进行设置。当这些选项被设置时,工厂创建的Channels被包装在PublisherCallbackChannel,使得回调变得更为简单。当这样的一个Channel被获得之后,客户端可以使用这个Channel注册PublisherCallbackChannel.Listener监听。PublisherCallbackChannel实现包括路由确认和返回消息给适当的监听器。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: