您的位置:首页 > 其它

AmqpTemplate-发送-接收-消息

2014-11-17 12:40 274 查看

AmqpTemplate-简介

就像Spring Framework和其它一些项目提供了一些高度抽象,Spring AMQP提供的‘template’扮演者关键的角色。定义者主要操作的接口是AmqpTemplate。这些操作包含了发送和接收消息的一般操作。换种说法,它们不是某个实现所专有的,所以AMQP存在于名称里。这个接口的实现与AMQP协议的实现紧密关联。不像JMS,本身是一个接口级别的API,而AMQP是一个wire-level的协议。这个协议的实现必须提供他们自己的客户端类库,所以模板接口的每一个实现必须依赖一个特定的客户端类库。到目前为止,仅仅存在一个实现:RabbitTemplate。在下面的例子中,你将看到AmqpTemplate的使用,但是当你看到例子的配置,或者一些代码template的初始化或者设定的时候,你会看到实现类型RabbitTemplate。
就像前面介绍的那样,AmqpTemplate接口定义了发送和接收消息的基本操作。我们将在下面章节中探讨它们。

添加重试的功能

从1.3版本开始,你可以通过配置RabbitTemplate通过使用RetryTemplate来处理与消息代理之间的联通性问题。具体情况可以参照spring-retry项目。下面的这个例子使用的 指数退避算法,在默认的情况下,SimpleRetryPolice尝试三次,三次之后就向调用者抛出异常。
XMl空间的配置方式如下:

<rabbit:template id="template" connection-factory="connectionFactory" retrytemplate="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">

<property name="backOffPolicy">

<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">

<property name="initialInterval" value="500"/>

<property name="multiplier" value="10.0"/>

<property name="maxInterval" value="10000"/>

</bean>

</property>

</bean>

[/code]

使用注解 @Configuration配置:

@Bean

public AmqpTemplate rabbitTemplate();

RabbitTemplate template = new RabbitTemplate(connectionFactory());

RetryTemplate retryTemplate = new RetryTemplate();

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();

backOffPolicy.setInitialInterval(500);

backOffPolicy.setMultiplier(10.0);

backOffPolicy.setMaxInterval(10000);

retryTemplate.setBackOffPolicy(backOffPolicy);

template.setRetryTemplate(retryTemplate);

return template;

}

[/code]

发布确认和返回

AmqpTemplate的实现RabbitTemplate支持发布确认和返回。
对于返回消息,模板的mandatory属性必须被设定为true,它同样要求CachingConnectionFactory的publisherReturns属性被设定为true。如果客户端通过调用setReturnCallback(ReturnCallback callback)注册了RabbitTemplate.ReturnCallback,那么返回将被发送到客户端。这个回调函数必须实现下列方法:

voidreturnedMessage(Message message, intreplyCode, String replyText,

String exchange, String routingKey);

[/code]

对于每一个RabbitTemplate只支持一个ReturnCallback。

对于发布确认,template要求CachingConnectionFactory的publisherConfirms属性设置为true。如果客户端通过setConfirmCallback(ConfirmCallback callback)注册了RabbitTemplate.ConfirmCallback,那么确认消息将被发送到客户端。这个回调函数必须实现以下方法:

voidconfirm(CorrelationData correlationData, booleanack);

[/code]

CorrelationData当客户端发送原始消息的时候提供。这将在后面详细描述。

同样一个RabbitTemplate只支持一个ConfirmCallback。
发送消息
当发送消息的时候,我们可以使用下面的一种方法:

void send(Message message) throwsAmqpException;

void send(String routingKey, Message message) throwsAmqpException;

void send(String exchange, String routingKey, Message message) throwsAmqpException;

[/code]

我们从最后一个方法开始讨论,最后一个方法最为详尽。它允许运行时提供AMQP Exchange和路由键。最后一个参数是消息实例。使用这个函数发送消息,大体类似下面代码:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",

newMessage("12.34".getBytes(), someProperties))

[/code]

如果你在大多数的情况下都发送到同一个Exchange,你可以通过template来设置exchange属性。在这种情况下,使用上面列出的第二个方法发送消息。如下:

amqpTemplate.setExchange("marketData.topic");

amqpTemplate.send("quotes.nasdaq.FOO", newMessage("12.34".getBytes(), someProperties));

[/code]

同样,如果exchange和routingKey你都在template中设置了,你可以使用上面列出的第一个方法发送消息:

amqpTemplate.setExchange("marketData.topic");

amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");

amqpTemplate.send(newMessage("12.34".getBytes(), someProperties));

[/code]

详尽方法中设置的参数将会覆盖template中设置的默认值。实际上,即使你没有明确的设置template的这些属性,同样会有默认值存在。这种情况下,默认值是空字符串。过滤键不是必须的(例如Fanout Exchange)。并且,队列和Exchange也有可能与Exchange通过空字符串绑定。这两种场景都是合法的。对于AMQP规范中提供的default
Exchange就没有名称。所有的队列都自动与这个default Exchange绑定,使用的是他们的名称,所以通过默认Exchange使用上面的第二个方法来进行点对点的将消息路由到队列。仅仅需要提供队列的名称作为路由键:

RabbitTemplate template = newRabbitTemplate(); // using default no-name Exchange

template.send("queue.helloWorld", newMessage("Hello World".getBytes(), someProperties));

[/code]

如果你想排他的针对一个队列发送消息,推荐采取以下这种方式:

RabbitTemplate template = newRabbitTemplate(); // using default no-name Exchange

template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue

template.send(newMessage("Hello World".getBytes(), someProperties));

[/code]

Message Builder API

从1.3版版开始,MessageBuilder和MessagePropertiesBuilder就提供了消息生成接口;他没提供了方便的流式方式来创建消息和消息属性:

Message message = MessageBuilder.withBody("foo".getBytes())

.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)

.setMessageId("123")

.setHeader("bar", "baz")

.build();

[/code]

MessageProperties props = MessagePropertiesBuilder.newInstance()

.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)

.setMessageId("123")

.setHeader("bar", "baz")

.build();

Message message = MessageBuilder.withBody("foo".getBytes())

.andProperties(props)

.build();

[/code]

MessageProperties中的每一个属性都可以设置,其他一些方法包括:setHeader(String key, String value), removeHeader(String key), removeHeaders(),和 copyProperties(MessageProperties properties)。每个属性设置方法,都有一个set*IfAbsent()的变体。为了防止默认值得存在,提供了set*IfAbsentDefault()方法。

提供了五个静态方法来初始化消息创建器:

public staticMessageBuilder withBody(byte[] body) ❶

public staticMessageBuilder withClonedBody(byte[] body) ❷

public staticMessageBuilder withBody(byte[] body, intfrom, intto) ❸

public staticMessageBuilder fromMessage(Message message) ❹

public staticMessageBuilder fromClonedMessage(Message message) ❺

[/code]


提供了消息体进行初始化,消息体直接引用这个参数。

❷ 提供消息体进行创建,消息体使用提供参数的拷贝。
❸ 提供了字节数组,消息体将拷贝这个字节数组的部分作为消息体。
❹ 从另外一个消息创建,消息体将引用例外一个消息体,属性将使用另外一个消息的属性拷贝。
❺ 从另外一个消息创建,消息体和属性都使用另外一个消息的拷贝。
提供了三个静态方法来初始化消息属性创建器:

public staticMessagePropertiesBuilder newInstance() ❶

public staticMessagePropertiesBuilder fromProperties(MessageProperties properties) ❷

public staticMessagePropertiesBuilder fromClonedProperties(MessageProperties

properties) ❸

[/code]

❶ 使用默认的消息属性来初始化消息属性创建。


使用build()方法,将会返回提供的消息对象。

参数中消息属性将被拷贝到新的消息属性中。

发布确认

AmqpTemplate的实现RabbitTemplate,它的每一个send()方法都有一个重载版本,这个重载版本多了一个CorrelationData对象。当发布确认被启用时,这个对象将被传输到先前描述的回调函数中,这使得发送者对发送的消息进行确认。
发布返回

发布返回与发布确认类似。
接收消息

消息接收要比消息发送稍微复杂一些,原因是接收消息有两种方式,在简单的方式是使用同步的阻塞的方法call获取一个消息。复杂一点但是更为通用的方式是注册监听器,异步的按需接收消息。我们将在以后章节中使用实例描述这两种方式。

同步消费者

AmqpTemplate本身可以作为同步接受者。他有两个receive函数。如同在发送端那样,可以在template上直接设置队列属性,队列函数也可以在接收的时候进行设置:

Message receive() throwsAmqpException;

Message receive(String queueName) throwsAmqpException;

[/code]

就像发送消息那样,AmqpTemplate提供了一些方便的方法使得可以直接接收POJOS而不是消息实体,它是通过创建消息所需要的MessageConverter来实现的。

Object receiveAndConvert() throwsAmqpException;

Object receiveAndConvert(String queueName) throwsAmqpException;

[/code]

从1.3版本开始,AmqpTemplate不仅提供了sendAndReceive方法,而且为同步接收提供了receiveAndReply方法,接收处理和回复消息:

<R, S> booleanreceiveAndReply(ReceiveAndReplyCallback<R, S> callback)

throwsAmqpException;

<R, S> booleanreceiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)

throwsAmqpException;

<R, S> booleanreceiveAndReply(ReceiveAndReplyCallback<R, S> callback,

String replyExchange, String replyRoutingKey) throwsAmqpException;

<R, S> booleanreceiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,

String replyExchange, String replyRoutingKey) throwsAmqpException;

<R, S> booleanreceiveAndReply(ReceiveAndReplyCallback<R, S> callback,

ReplyToAddressCallback<S> replyToAddressCallback) throwsAmqpException;

<R, S> booleanreceiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,

ReplyToAddressCallback<S> replyToAddressCallback) throwsAmqpException;

[/code]

AmqpTemplate在实现‘receive’和‘reply’时很精细。在大多数的情况下,你只需要提供ReceiveAndReplyCallback的实现来完成接受消息的业务逻辑,如果需要的话,再创建答复实体或者消息。注意,ReceiveAndReplyCallback可以返回null。在这种情况下,没有答复被发送,receiveAndReply方法如同receive方法。这样队列就适用于混合模式。

如果回调不是ReceiveAndReplyMessageCallback的实例(它将提供原始消息交换规约),自动消息规约将被应用。
当需要一定的逻辑来决定相应地址时,ReplyToAddressCallback能够派上用处。在默认的情况下,在请求消息中的replyTo信息来路由响应。
以下是基于POJO的接收和响应

booleanreceived =

this.template.receiveAndReply(ROUTE, newReceiveAndReplyCallback<Order, Invoice>()

{

publicInvoice handle(Order order) {

returnprocessOrder(order);

}

});

if(received) {

log.info("We received an order!");

}

[/code]

异步消费者

对于异步消息接收,一个专属的组件将被提及。这个组件是消息消费回调的容器。我们稍后将看到这个容器和它的属性。但是首先我们先看一下回调,毕竟在那里你的应用代码和消息系统集成。这个回调有很多选项。最简单的方式是实现MessageListener接口:

public interfaceMessageListener {

voidonMessage(Message message);

}

[/code]

如果你的回调逻辑由于某种原因依赖于AMQP Channel实例,你可以使用ChannelAwareMessageListener。它多了一个参数:

public interfaceChannelAwareMessageListener {

voidonMessage(Message message, Channel channel) throwsException;

}

[/code]

如果你想在应用逻辑和messaging API之间保持严格的分界,你可以使用框架 提供的适配器实现。它通常被叫做‘Message-driven POJO’支持。当使用这个适配器的时候,你仅仅需要提供适配器需要调用的实体的引用。

MessageListener listener = newMessageListenerAdapter(somePojo);

[/code]

这里我们已经看到了消息监听回调的不同版本,我们现在来看看先前提到的那个容器。事实上,容器处于担当责任的活跃角色,所以回调才可以保持活跃。容器是生命周期组件的一个典范。它提供了开始和停止方法。当配置了这样一个容器,你实质上是架起了AMQP队列和MessageListener实例之间的桥梁。你必须提供ConnectionFactory的引用,队列名称或者队列实例,这样监听才知道从哪里消费消息。这里使用默认实现SimpleMessageListenerContainer:

SimpleMessageListenerContainer container = newSimpleMessageListenerContainer();

container.setConnectionFactory(rabbitConnectionFactory);

container.setQueueNames("some.queue");

container.setMessageListener(newMessageListenerAdapter(somePojo));

[/code]

作为一个活跃的组件,可以通过XML进行配置:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">

<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>

</rabbit:listener-container>

[/code]

你也可以通过@Configuration进行配置:

@Configuration

public classExampleAmqpConfiguration {

@Bean

publicSimpleMessageListenerContainer messageListenerContainer() {

SimpleMessageListenerContainer container = newSimpleMessageListenerContainer();

container.setConnectionFactory(rabbitConnectionFactory());

container.setQueueName("some.queue");

container.setMessageListener(exampleListener());

returncontainer;

}

@Bean

publicConnectionFactory rabbitConnectionFactory() {

CachingConnectionFactory connectionFactory =

newCachingConnectionFactory("localhost");

connectionFactory.setUsername("guest");

connectionFactory.setPassword("guest");

returnconnectionFactory;

}

@Bean

publicMessageListener exampleListener() {

return newMessageListener() {

public voidonMessage(Message message) {

System.out.println("received: "+ message);

}

};

[/code]

从RabbitMQ的3.2版本开始,这个消息代理支持消费者优先级。这个可以通过设置消费者的x-priority进行配置。SimpleMessageListenerContainer支持设定消费者参数:

container.setConsumerArguments(Collections. <String, Object> singletonMap("x-priority",

Integer.valueOf(10)));

[/code]

为了方便,命名空间在listener元素上提供了priority属性:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">

<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10"/>

</rabbit:listener-container>

[/code]

从1.3版本开始容器监听的队列可以动态的修改。

自动删除队列

当容器配置来就爱你听自动删除的队列时,这个队列将被消息代理移除,如果容器停止了。在1.3版本之前容器不能再重新启动,因为队列已经丢失;在 当连接断掉或者打开的时候,RabbitAdmin试着自动的重新声明队列,这在容器停止启动的时候不会发生。
从1.3版本开始,容器在重新启动的时候会利用RabbitAdmin来重新声明丢失的队列。
你还可以通过条件声明来延迟来延迟队列的声明。

<rabbit:queue id="otherAnon" declared-by="containerAdmin"/>

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declaredby="containerAdmin">

<rabbit:bindings>

<rabbit:binding queue="otherAnon" key="otherAnon"/>

</rabbit:bindings>

</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">

<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin"/>

</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"

auto-startup="false"/>

[/code]

在这种情况下,Queue和Exchange在上下文初始化的时候不会进行声明这些元素,因为containerAdmin的auto-startup=‘false’。同样,容器也不会立马启动,当后来容器启动的时候,它将使用它所引用的containerAdmin来声明这些元素。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: