您的位置:首页 > 其它

rabbitmq AMQP协议

2016-12-30 15:39 309 查看

1. 协议说明

rabbitmq遵循 Advanced Message Queue Protocal(AMQP)协议。

其中amqp协议1.0可以参考我的资源:

http://download.csdn.net/detail/huyangyamin/9725613

2. spring 客户端

spring java客户端使用 com.rabbitmq.client.impl.FrameHandler处理底层二进制协议(binary protocal)的传输。

实际实现类为:

package com.rabbitmq.client.impl;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;

import com.rabbitmq.client.AMQP;

/**
* A socket-based frame handler.
*/

public class SocketFrameHandler implements FrameHandler {
/** The underlying socket */
private final Socket _socket;

/** Socket's inputstream - data from the broker - synchronized on */
private final DataInputStream _inputStream;

/** Socket's outputstream - data to the broker - synchronized on */
private final DataOutputStream _outputStream;

/** Time to linger before closing the socket forcefully. */
public static final int SOCKET_CLOSING_TIMEOUT = 1;

/**
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket) throws IOException {
_socket = socket;

_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
}


可以看到SocketFrameHandler持有一个Socket连接,并打开了tcp的read和write IO输入输出流。

3. tcp连接心跳维持

为了维护tcp连接的状态,rabbitmq没有选择使用tcp默认机制的keepalive,而是自己实现了一套心跳机器(在iso 应用层)

rabbitmq客户端专门起了一个定时线程池(ScheduledThreadPoolExecutor)开发送心跳包:

//  The contents of this file are subject to the Mozilla Public License
//  Version 1.1 (the "License"); you may not use this file except in
//  compliance with the License. You may obtain a copy of the License
//  at http://www.mozilla.org/MPL/ //
//  Software distributed under the License is distributed on an "AS IS"
//  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
//  the License for the specific language governing rights and
//  limitations under the License.
//
//  The Original Code is RabbitMQ.
//
//  The Initial Developer of the Original Code is GoPivotal, Inc.
//  Copyright (c) 2007-2015 Pivotal Software, Inc.  All rights reserved.
//

package com.rabbitmq.client.impl;

import com.rabbitmq.client.AMQP;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;
import java.io.IOException;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Manages heartbeat sending for a {@link AMQConnection}.
* <p/>
* Heartbeats are sent in a dedicated thread that is separate
* from the main loop thread used for the connection.
*/
final class HeartbeatSender {

private final Object monitor = new Object();

private final FrameHandler frameHandler;
private final ThreadFactory threadFactory;

private ScheduledExecutorService executor;

private ScheduledFuture<?> future;

private boolean shutdown = false;

private volatile long lastActivityTime;

HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) {
this.frameHandler = frameHandler;
this.threadFactory = threadFactory;
}

public void signalActivity() {
this.lastActivityTime = System.nanoTime();
}

/**
* Sets the heartbeat in seconds.
*/
public void setHeartbeat(int heartbeatSeconds) {
synchronized(this.monitor) {
if(this.shutdown) {
return;
}

// cancel any existing heartbeat task
if(this.future != null) {
this.future.cancel(true);
this.future = null;
}

if (heartbeatSeconds > 0) {
// wake every heartbeatSeconds / 2 to avoid the worst case
// where the last activity comes just after the last heartbeat
long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
ScheduledExecutorService executor = createExecutorIfNecessary();
Runnable task = new HeartbeatRunnable(interval);
this.future = executor.scheduleAtFixedRate(
task, interval, interval, TimeUnit.NANOSECONDS);
}
}
}

private ScheduledExecutorService createExecutorIfNecessary() {
synchronized (this.monitor) {
if (this.executor == null) {
this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
}
return this.executor;
}
}

/**
* Shutdown the heartbeat process, if any.
*/
public void shutdown() {
ExecutorService executorToShutdown = null;
synchronized (this.monitor) {
if (this.future != null) {
this.future.cancel(true);
this.future = null;
}

if (this.executor != null) {
// to be safe, we shouldn't call shutdown holding the
// monitor.
executorToShutdown = this.executor;

this.shutdown = true;
this.executor = null;
}
}
if(executorToShutdown != null) {
executorToShutdown.shutdown();
}
}

private final class HeartbeatRunnable implements Runnable {

private final long heartbeatNanos;

private HeartbeatRunnable(long heartbeatNanos) {
this.heartbeatNanos = heartbeatNanos;
}

public void run() {
try {
long now = System.nanoTime();

if (now > (lastActivityTime + this.heartbeatNanos)) {
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
frameHandler.flush();
}
} catch (IOException e) {
// ignore
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq