rabbitmq AMQP协议
2016-12-30 15:39
309 查看
1. 协议说明
rabbitmq遵循 Advanced Message Queue Protocal(AMQP)协议。其中amqp协议1.0可以参考我的资源:
http://download.csdn.net/detail/huyangyamin/9725613
2. spring 客户端
spring java客户端使用 com.rabbitmq.client.impl.FrameHandler处理底层二进制协议(binary protocal)的传输。实际实现类为:
package com.rabbitmq.client.impl; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import com.rabbitmq.client.AMQP; /** * A socket-based frame handler. */ public class SocketFrameHandler implements FrameHandler { /** The underlying socket */ private final Socket _socket; /** Socket's inputstream - data from the broker - synchronized on */ private final DataInputStream _inputStream; /** Socket's outputstream - data to the broker - synchronized on */ private final DataOutputStream _outputStream; /** Time to linger before closing the socket forcefully. */ public static final int SOCKET_CLOSING_TIMEOUT = 1; /** * @param socket the socket to use */ public SocketFrameHandler(Socket socket) throws IOException { _socket = socket; _inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); _outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); }
可以看到SocketFrameHandler持有一个Socket连接,并打开了tcp的read和write IO输入输出流。
3. tcp连接心跳维持
为了维护tcp连接的状态,rabbitmq没有选择使用tcp默认机制的keepalive,而是自己实现了一套心跳机器(在iso 应用层)rabbitmq客户端专门起了一个定时线程池(ScheduledThreadPoolExecutor)开发送心跳包:
// The contents of this file are subject to the Mozilla Public License // Version 1.1 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License // at http://www.mozilla.org/MPL/ // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See // the License for the specific language governing rights and // limitations under the License. // // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is GoPivotal, Inc. // Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. // package com.rabbitmq.client.impl; import com.rabbitmq.client.AMQP; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.ScheduledFuture; import java.io.IOException; import static java.util.concurrent.TimeUnit.SECONDS; /** * Manages heartbeat sending for a {@link AMQConnection}. * <p/> * Heartbeats are sent in a dedicated thread that is separate * from the main loop thread used for the connection. */ final class HeartbeatSender { private final Object monitor = new Object(); private final FrameHandler frameHandler; private final ThreadFactory threadFactory; private ScheduledExecutorService executor; private ScheduledFuture<?> future; private boolean shutdown = false; private volatile long lastActivityTime; HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) { this.frameHandler = frameHandler; this.threadFactory = threadFactory; } public void signalActivity() { this.lastActivityTime = System.nanoTime(); } /** * Sets the heartbeat in seconds. */ public void setHeartbeat(int heartbeatSeconds) { synchronized(this.monitor) { if(this.shutdown) { return; } // cancel any existing heartbeat task if(this.future != null) { this.future.cancel(true); this.future = null; } if (heartbeatSeconds > 0) { // wake every heartbeatSeconds / 2 to avoid the worst case // where the last activity comes just after the last heartbeat long interval = SECONDS.toNanos(heartbeatSeconds) / 2; ScheduledExecutorService executor = createExecutorIfNecessary(); Runnable task = new HeartbeatRunnable(interval); this.future = executor.scheduleAtFixedRate( task, interval, interval, TimeUnit.NANOSECONDS); } } } private ScheduledExecutorService createExecutorIfNecessary() { synchronized (this.monitor) { if (this.executor == null) { this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory); } return this.executor; } } /** * Shutdown the heartbeat process, if any. */ public void shutdown() { ExecutorService executorToShutdown = null; synchronized (this.monitor) { if (this.future != null) { this.future.cancel(true); this.future = null; } if (this.executor != null) { // to be safe, we shouldn't call shutdown holding the // monitor. executorToShutdown = this.executor; this.shutdown = true; this.executor = null; } } if(executorToShutdown != null) { executorToShutdown.shutdown(); } } private final class HeartbeatRunnable implements Runnable { private final long heartbeatNanos; private HeartbeatRunnable(long heartbeatNanos) { this.heartbeatNanos = heartbeatNanos; } public void run() { try { long now = System.nanoTime(); if (now > (lastActivityTime + this.heartbeatNanos)) { frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0)); frameHandler.flush(); } } catch (IOException e) { // ignore } } } }
相关文章推荐
- RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现。
- AMQP 0.9.1和1.0协议差别以及rabbitmq支持情况
- RabbitMQ源码解析前奏--AMQP协议
- RabbitMQ与AMQP协议详解
- rabbitMQ的简单实例——amqp协议带数据回写机制
- RabbitMQ与AMQP协议
- AMQP协议和rabbitmq理解
- RabbitMQ与AMQP协议详解
- RabbitMQ源码解析前奏--AMQP协议
- 消息队列基础 RabbitMQ与AMQP协议详解——超大规模高可用OpenStack核心技术深入解析系列(二)
- RabbitMQ(二)AMQP协议mandatory和immediate标志位区别
- RabbitMQ与AMQP协议详解
- amqp协议与php下的rabbitMQ
- RabbitMQ源码解析前奏--AMQP协议
- RabbitMQ源码解析前奏--AMQP协议
- 熟悉RabbitMQ与AMQP协议
- RabbitMQ源码解析前奏--AMQP协议
- RabbitMQ与AMQP协议详解
- 消息队列基础 RabbitMQ与AMQP协议详解——超大规模高可用OpenStack核心技术深入解析系列(二)
- RabbitMQ介绍2 - AMQP协议