您的位置:首页 > 其它

rabbitmq(六)RPC(Remote procedure call,远程过程调用)

2015-05-07 23:35 239 查看

rabbitmq RPC工作流程

客户端向远端发送请求时需要设置两个重要的属性:callback queue(回调队列)和correlation id(关联id)。远端响应客户端的请求时会将结果投递到该回调队列,并且也会附上客户端发送请求时设置的关联id,这样客户端就能通过远端响应的关联id和自身生成的关联id是否相等来判断是否是对该客户端的响应。设置好这两个请求属性后,就可以将消息投递到请求队列了。如图(来做rabbitmq官网)所示:



示例

客户端向远程请求计算fibonacci(斐波那契数列)前20项和,收到远端的响应后,将结果打印到控制台。

RPCClient.java(客户端)

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

import java.util.UUID;

public class RPCClient {

private static final String REQUEST_QUEUE_NAME = "rpc_queue";

public static void main(String[] args) {
Connection connection = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
Channel channel = connection.createChannel();

//创建一个回调队列
String callbackQueue = channel.queueDeclare().getQueue();

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(callbackQueue, true, consumer);

//生成correlation id
String corrId = UUID.randomUUID().toString();

//设置相关请求属性
BasicProperties requestProps = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(callbackQueue)
.build();

//向远程请求计算fibonacci(斐波那契数列)前20项和
String n = "20";
System.out.println("requesting fibonacci with n=" + n);
channel.basicPublish("", REQUEST_QUEUE_NAME, requestProps, n.getBytes());

String response;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//检查远程响应附带的correlation id是否和客户端生成的一致
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
System.out.println("the result is:" + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
//ignore this
}
}
}
}

}


RPCServer.java(远端)

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

public static void main(String[] args) {
Connection connection = null;
Channel channel;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println("waiting for RPC requests...");

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();

//获取客户端的请求属性
BasicProperties props = delivery.getProperties();

//设置响应属性
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())//关联id
.build();

String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);

System.out.println("start to handler fib(" + message + ")");
String response = "" + fibonacci(n);

//响应客户端请求
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
//ignore this
}
}
}
}

/**
* 计算fibonacci(斐波那契数列)前n项和
*/
private static int fibonacci(int n) {
if (n == 1 || n == 2) {
return 1;
}
return fibonacci(n - 1) + fibonacci(n - 2);
}

}


参考资料:rabbitmq官方教程6
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: