您的位置:首页 > 其它

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

2018-01-10 10:55 519 查看
rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message,

Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息,

最近翻阅了基于java的客户端的相关源码,简单做个分析。

编程模型伪代码如下:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();

Channel channel=conn.createChannel();

创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel

以下是基于channel上的两种消费方式。

1、Subscribe订阅方式

boolean autoAck = false;

channel.basicConsume(queueName, autoAck, "myConsumerTag",

new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body)

throws IOException

{

String routingKey = envelope.getRoutingKey();

String contentType = properties.contentType;

long deliveryTag = envelope.getDeliveryTag();

// (process the message components here ...)

channel.basicAck(deliveryTag, false);

}

});

订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,

这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。

参见ChannelN中的方法

public String basicConsume(String queue, boolean autoAck, String consumerTag,

boolean noLocal, boolean exclusive, Map<String, Object> arguments,

final Consumer callback)

throws IOException

{

......

rpc((Method)

new Basic.Consume.Builder()

.queue(queue)

.consumerTag(consumerTag)

.noLocal(noLocal)

.noAck(autoAck)

.exclusive(exclusive)

.arguments(arguments)

.build(),

k);

try {

return k.getReply();

} catch(ShutdownSignalException ex) {

throw wrap(ex);

}

}

Consumer接收消息的过程:

创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息,

public void handleFrame(Frame frame) throws IOException {

AMQCommand command = _command;

if (command.handleFrame(frame)) { // 对消息进行协议assemble

_command = new AMQCommand(); // prepare for the next one

handleCompleteInboundCommand(command);//对消息消费处理

}

}

ChannelN.handleCompleteInboundCommand

---ChannelN.processAsync

----dispatcher.handleDelivery

---QueueingConsumer.handleDelivery

---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中

每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。

接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()

对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效

2、poll API方式

ChannelN:

GetResponse basicGet(String queue, boolean autoAck)

这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: