RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析
2015-12-06 12:57
411 查看
以下转自:http://blog.csdn.net/yangbutao/article/details/10395599
rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析。 编程模型伪代码如下: ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); Channel channel=conn.createChannel(); 创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel 以下是基于channel上的两种消费方式。
1、Subscribe订阅方式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } });
订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息, 这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。 参见ChannelN中的方法 public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException { ...... rpc((Method) new Basic.Consume.Builder() .queue(queue) .consumerTag(consumerTag) .noLocal(noLocal) .noAck(autoAck) .exclusive(exclusive) .arguments(arguments) .build(), k);
try { return k.getReply(); } catch(ShutdownSignalException ex) { throw wrap(ex); } }
Consumer接收消息的过程: 创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息, public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // 对消息进行协议assemble _command = new AMQCommand(); // prepare for the next one handleCompleteInboundCommand(command);//对消息消费处理 } } ChannelN.handleCompleteInboundCommand ---ChannelN.processAsync ----dispatcher.handleDelivery ---QueueingConsumer.handleDelivery ---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中 每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。 接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()
对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效
2、poll API方式 ChannelN: GetResponse basicGet(String queue, boolean autoAck) 这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息
rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析。 编程模型伪代码如下: ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); Channel channel=conn.createChannel(); 创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel 以下是基于channel上的两种消费方式。
1、Subscribe订阅方式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } });
订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息, 这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。 参见ChannelN中的方法 public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException { ...... rpc((Method) new Basic.Consume.Builder() .queue(queue) .consumerTag(consumerTag) .noLocal(noLocal) .noAck(autoAck) .exclusive(exclusive) .arguments(arguments) .build(), k);
try { return k.getReply(); } catch(ShutdownSignalException ex) { throw wrap(ex); } }
Consumer接收消息的过程: 创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息, public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // 对消息进行协议assemble _command = new AMQCommand(); // prepare for the next one handleCompleteInboundCommand(command);//对消息消费处理 } } ChannelN.handleCompleteInboundCommand ---ChannelN.processAsync ----dispatcher.handleDelivery ---QueueingConsumer.handleDelivery ---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中 每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。 接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()
对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效
2、poll API方式 ChannelN: GetResponse basicGet(String queue, boolean autoAck) 这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息
相关文章推荐
- Linux下C语言编程
- (NO.00004)iOS实现打砖块游戏(十六):导弹发射道具的实现(下)
- 一个编译静态库的make脚本
- (NO.00004)iOS实现打砖块游戏(十六):导弹发射道具的实现(下)
- (NO.00004)iOS实现打砖块游戏(十六):导弹发射道具的实现(下)
- 【数据拾遗(java描述)】--- 二叉树的基本操作
- WPF基础到企业应用系列7——深入剖析依赖属性(WPF/Silverlight核心)
- php遍历数组
- Android通过adb命令来获取手机当前页面的名称
- 特征脸(Eigenface)理论基础-PCA(主成分分析法)
- git操作??
- git 清除历史
- hdu 5533 Dancing Stars on Me【计算几何】
- jforum Action的基类
- FZU 2082 过路费(树链剖分+BIT)
- Golang安装
- 耐心题 广东oj新生赛“我是好人2”
- 人脸识别经典算法一:特征脸方法(Eigenface)
- Android下setLatestEventInfo警告、Handler警告、SimpleDateFormat警告http://www.piaoyi.org/mobile-app/Android-se
- centos7没有安装ifconfig命令的解决方法