rabbitmq(六)RPC(Remote procedure call,远程过程调用)
2015-05-07 23:35
239 查看
rabbitmq RPC工作流程
客户端向远端发送请求时需要设置两个重要的属性:callback queue(回调队列)和correlation id(关联id)。远端响应客户端的请求时会将结果投递到该回调队列,并且也会附上客户端发送请求时设置的关联id,这样客户端就能通过远端响应的关联id和自身生成的关联id是否相等来判断是否是对该客户端的响应。设置好这两个请求属性后,就可以将消息投递到请求队列了。如图(来做rabbitmq官网)所示:示例
客户端向远程请求计算fibonacci(斐波那契数列)前20项和,收到远端的响应后,将结果打印到控制台。RPCClient.java(客户端)
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; public class RPCClient { private static final String REQUEST_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) { Connection connection = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建一个回调队列 String callbackQueue = channel.queueDeclare().getQueue(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(callbackQueue, true, consumer); //生成correlation id String corrId = UUID.randomUUID().toString(); //设置相关请求属性 BasicProperties requestProps = new BasicProperties .Builder() .correlationId(corrId) .replyTo(callbackQueue) .build(); //向远程请求计算fibonacci(斐波那契数列)前20项和 String n = "20"; System.out.println("requesting fibonacci with n=" + n); channel.basicPublish("", REQUEST_QUEUE_NAME, requestProps, n.getBytes()); String response; while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //检查远程响应附带的correlation id是否和客户端生成的一致 if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } System.out.println("the result is:" + response); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception e) { //ignore this } } } } }
RPCServer.java(远端)
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) { Connection connection = null; Channel channel; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("waiting for RPC requests..."); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //获取客户端的请求属性 BasicProperties props = delivery.getProperties(); //设置响应属性 BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId())//关联id .build(); String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println("start to handler fib(" + message + ")"); String response = "" + fibonacci(n); //响应客户端请求 channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception e) { //ignore this } } } } /** * 计算fibonacci(斐波那契数列)前n项和 */ private static int fibonacci(int n) { if (n == 1 || n == 2) { return 1; } return fibonacci(n - 1) + fibonacci(n - 2); } }
参考资料:rabbitmq官方教程6
相关文章推荐
- Rabbitmq教程翻译(六)Remote procedure call (RPC)远程过程调用
- RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)
- 远程过程调用(Remote Procedure Call,RPC)
- RabbitMQ 原文译06--Remote procedure call(RPC)
- Java 的RPC:remote procedure call 远程过程调用
- RabbitMQ--Remote procedure call (RPC)(六)
- RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)
- 从核心概念和技术层面着眼,系统化认识RPC 2017-09-26 张旭 InfoQ 作者|张旭 编辑|田光 RPC(Remote Procedure Call),即远程过程调用,是一个分布式系统间
- rabbitmq 教程 四 Remote procedure call (RPC)
- Remote Procedure Call(RPC)远程过程调用(第八讲)
- rabbitmq之RPC(remote procedure call)
- RPC(Remote Procedure Call 远程过程调用)
- RPC(RemoteProcedureCallProtocol)
- Remote Procedure Call(RPC)
- Will Goldengate use Remote Procedure Call (RPC)?
- PNP : Remote Procedure Call (RPC)
- Will Goldengate use Remote Procedure Call (RPC)?
- Java RPC(Remote Procedure Call)远程通讯可选技术及原理
- RPC(Remote Procedure Call Protocol)——远程过程调用协议