您的位置:首页 > 其它

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

2011-08-30 21:54 746 查看
本身使用RpcClient发送消息与同步接收消息的代码是很简单的,如下:

RpcClient client = new RpcClient(channel, exchange, routingKey);

String msg = "hello world!";

byte[] result = client.primitiveCall(msg.getBytes());

这里的primitiveCall调用后,当前线程会进行同步等待,等待消息接收端给自己的回复消息

一个完整的发送消息与接收回复消息的图例:





整个流程详解:

l 创建RpcClient实例
RpcClient client = new RpcClient(channel, exchange, routingKey);

创建RpcClient时会做两件事:

A:创建一个回复queue,接收当前RpcClient发送的消息的消息接收人会将回复消息发到这个replyQueue上供当前RpcClient去接收回复消息

_replyQueue = setupReplyQueue();

protected String setupReplyQueue() throws IOException {

return _channel.queueDeclare("", false, false, true, true, null).getQueue();

//这里实际上是由rabbitmq server去定义一个唯一的queue(因为queueName是空的,所以是由server去生成queueName),最后返回这个queueName,queueName是由server生成的,使用的是以下这个方法:

Queue.DeclareOk queueDeclare(String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,

Map<String, Object> arguments)

}

B:创建一个接收回复消息的consumer

_consumer = setupConsumer();

protected DefaultConsumer setupConsumer() throws IOException {

//创建一个接收消息的DefaultConsumer实例

DefaultConsumer consumer = new DefaultConsumer(_channel) {

@Override //发生shutdown的时候回调

public void handleShutdownSignal(String consumerTag,

ShutdownSignalException signal) {

synchronized (_continuationMap) {

for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {

entry.getValue().set(signal);

}

_consumer = null;

}

}

@Override //处理消息交付

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body)

throws IOException {

//这部分就是和下面的代码一起协作来实现将异步接收强制变成同步接收

synchronized (_continuationMap) {

String replyId = properties.getCorrelationId();

BlockingCell<Object> blocker = _continuationMap.get(replyId);

_continuationMap.remove(replyId);

blocker.set(body);

}

}

};

//让接收消息的consumer去replyQueue上去接收消息,这个过程对于主线程来说是异步进行的,只要replyQueue上有消息了,consumer就会去replyQueue上去接收消息,并回调它的handleDelivery方法

_channel.basicConsume(_replyQueue, true, consumer);

return consumer;

}

l 发送消息
byte[] result = rpcClient.primitiveCall(msg.getBytes());

使用rpcClient的primitiveCall发送消息,看看是怎么做的

public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException {

return primitiveCall(null, message);

}

继续跟踪,核心方法是这个

public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) throws IOException, ShutdownSignalException{

//检查consumer是否为空,若为空,抛出异常

checkConsumer();

BlockingCell<Object> k = new BlockingCell<Object>();

synchronized (_continuationMap) {

_correlationId++;

String replyId = "" + _correlationId;

//如果props不为空,则将上一步骤创建的replyQueue设置到props上去,还有replyId

if (props != null) {

props.setCorrelationId(replyId);

props.setReplyTo(_replyQueue);

}

else {

//如果props为空,则创建一个,并将replyId和replyQueue都设置到props上

props = new AMQP.BasicProperties(null, null, null, null,

null, replyId,

_replyQueue, null, null, null,

null, null, null, null);

}

_continuationMap.put(replyId, k);

}

//使用上面的props发送消息,这样replyQueue和replyId就跟着传递到了接收消息的那一方去了,接收消息的client去props上去取到replyQueue,它就知道了它接收的消息的回复queue,然后它会将回复消息发送到replyQueue上去,而在上一步骤我们已经指定了一个consumer去replyQueue上去取消息,所以整个发送和接收消息的所有client是有条不紊的进行着

publish(props, message); //这行代码执行完后,只是将消息发送出去了,接收回复消息是异步的,由上一步骤的consumer去接收回复消息

//这里就是进行同步等待接收回复消息,将异步接收变成同步回复接收的核心就在这里

Object reply = k.uninterruptibleGet();

if (reply instanceof ShutdownSignalException) {

ShutdownSignalException sig = (ShutdownSignalException) reply;

ShutdownSignalException wrapper =

new ShutdownSignalException(sig.isHardError(),

sig.isInitiatedByApplication(),

sig.getReason(),

sig.getReference());

wrapper.initCause(sig);

throw wrapper;

} else {

return (byte[]) reply;

}

}

完整描述
创建RpcClient实例:
1,定义一个Map,用于存放每个消息的相关信息:
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
Key是一个correlationId,相当于当前rpcClient实例发送消息的一个计数器,初始化时是0,每发送一个消息时,加1
Value是一个com.rabbitmq.utility.BlockingCell对象,它是在发送消息前创建,并和当前的correlationId进行关联,放进来
_continuationMap.put(correlationId, blockingCell);
2,correlationId初始化为0
3,创建一个回复queue,replyQueue=channel.queueDeclare("", false, false, true, true, null).getQueue();
4,创建一个接收回复消息的consumer
5,指定consumer接收replyQueue上的消息,channel.basicConsume(replyQueue, true, consumer);

RpcClient发送消息:
1,创建一个BlockingCell<Object>对象blockingCell
1,correlationId++
2,创建BasicProperties对象,并将correlationId,replyQueue设置到它上面,发送消息时,它会被传递到接收方
3,以correlationId为Key,将blockingCell放入到_continuationMap中
4,发送消息:channel.basicPublish(exchange, routingKey, 上面 步骤得到的BasicProperties对象, message);
5,获取回复消息,Object reply = blockingCell.uninterruptibleGet();这里就是同步等待回复消息

RpcServer接收消息:
1,接收消息
2,从request中获取BasicProperties对象requestProperties,requestProperties=request.getProperties()
3,从requestProperties中得到correlationId,replyQueue
4,创建一个回复消息用的BasicProperties对象replyProperties,并将correlationId设置到它上面
4,发送回复消息:channel.basicPublish("", replyQueue, replyProperties, replyMessage);

RpcClient接收回复:
1,replyQueue一有消息,consumer就会接收到并回调consumer的handleDelivery方法
2,获取传递过来的BasicProperties获取correlationId
3,根据correlationId去continuationMap中取BlockingCell对象,BlockingCell<Object> blocker = continuationMap.get(correlationId);
4,从continuationMap中删除,continuationMap.remove(correlationId);
5,将回复消息设置到blocker对象里面,blocker.set(replyMessage);

同步等待回复消息:
1,【RpcClient发送消息】第4步主线程,发送消息后,第5步就去获取回复消息
2,【RpcClient发送消息】第5步主线程,blockingCell.uninterruptibleGet(),如果blockingCell没有被set(value)过,那么让当前主线程处于等待wait(),等待状态
3, 【RpcClient接收回复】第5步blocker.set(replyMessage);这里的blocker其实就是上面主线程创建的 blockingCell,因为它是根据correlationId去continuationMap中取 的,set(replyMessage),blocker会用一个属性将replyMessage保存起来,供get的时候去返回这个属性,然后调用 notify();唤醒处于等待的主线程(当前这步所在的线程和上一步主线程是在两个线程,所以主线程的等待是可以被这个线程唤醒的),主线程被唤醒 后,get()就会取到replyMessage,最终整个步骤实现了将异步接收强制转换为同步等待接收

BlockingCell类
public class BlockingCell<T> {

private boolean _filled = false;
private T _value;

private static final long NANOS_IN_MILLI = 1000 * 1000;
private static final long INFINITY = -1;

public BlockingCell() {
}

public synchronized T get() throws InterruptedException {
while (!_filled) { //如果value没有被设置过
wait(); //让当前线程处于等待,直到其它线程调用当前对象的notify()或notifyAll()为止
}
return _value;
}

//带超时的get
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
if (timeout < 0 && timeout != INFINITY)
throw new AssertionError("Timeout cannot be less than zero");
if (!_filled && timeout != 0) {
wait(timeout == INFINITY ? 0 : timeout);
}
if (!_filled)
throw new TimeoutException();
return _value;
}

//无限制的等待,直到取到值为止
public synchronized T uninterruptibleGet() {
while (true) {
try {
return get();
} catch (InterruptedException ex) {
}
}
}

public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
long now = System.nanoTime() / NANOS_IN_MILLI;
long runTime = now + timeout;
do {
try {
return get(runTime - now);
} catch (InterruptedException e) {
}
} while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
throw new TimeoutException();
}

public synchronized void set(T newValue) {
if (_filled) {
throw new AssertionError("BlockingCell can only be set once");
}
_value = newValue;
_filled = true;
notify(); //唤醒当前线程(处于等待状态)
}

//保证只能被set(value)一次
public synchronized boolean setIfUnset(T newValue) {
if (_filled) {
return false;
}
set(newValue);
_filled = true;
return true;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: