您的位置:首页 > 其它

RabbitMQ官方文档翻译之Remote procedure call(六)

2017-04-23 19:20 357 查看


Remote procedure call (RPC)


(using the Java client)

Prerequisites

This tutorial assumes RabbitMQ is installed and running on localhost on
standard port (5672). In case you use a different host, port or credentials, connections settings
would require adjusting.

Where to get help

If you're having trouble going through this tutorial you can contact us through the mailing list.

In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.

在第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

But what if we need to run a function on a remote computer and wait for the result? Well, that's a different story. This pattern is commonly known as Remote Procedure Call or RPC.

但是如果我们需要在远程计算机上运行功能并等待结果怎么办? 此模式通常称为远程过程调用或RPC。

In this tutorial we're going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don't have any time-consuming tasks that are worth distributing, we're going to create a dummy RPC service that returns Fibonacci numbers.

在本教程中,我们将使用RabbitMQ构建一个RPC系统:它包含一个客户端和一个可扩展的RPC服务器。 由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波纳契数字的虚拟RPC服务来代替。


1.Client interface 客户端接口

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which
sends an RPC request and blocks until the answer is received:

为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为call的方法,该方法发送RPC请求并阻塞,直到接收到响应:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);


A note on RPC

虽然RPC是一个很常见的计算模式,但是它并不总是是一个很好的解决方式。当程序员不知道函数调用是本地完成还是通过缓慢的RPC时,可能会出现问题。 带来未知的混乱,还对于调试增加了的不必要的复杂性。
而没有达到不是简化软件的目的,所以滥用RPC可能导致代码不可维护。
铭记这一点,请考虑以下建议:
确保哪个函是在本地调用,还是远程调用是显而易见的。
Document your system。清除组件之间的依赖关系。

处理错误情况。 当RPC服务器长时间停机时,客户端应该如何反应?

当有疑问时你应该避免使用RPC。 如果可以的话,你应该使用异步管道 - 而不是类似RPC的阻塞形式。采用异步管道的方式结果将被异步推送到下一个计算阶段。


2.Callback queue 回调队列

In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response we need to send a 'callback' queue address with the request. We can use the default queue (which is exclusive
in the Java client). Let's try it:

一般来说,RPC在RabbitMQ上实现很容易。 客户端发送请求消息,服务器回复响应消息。 为了客户端能收到响应,我们发送请求时要包含一个‘callback’队列的地址。 我们可以使用默认队列(在Java客户端中是排他性的)。 我们来试试吧

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...


Message properties

AMQP 0-9-1协议预先定义了一组属性(14个)。 大多数属性很少使用,除了以下内容:
deliveryMode: 将消息标记为持久性(值为2)或瞬态(任何其他值)。
您可能从 the
second tutorial.中得知过此属性。
contentType: 用于描述MIME类型的编码。
例如对于经常使用的JSON编码,将此属性设置为:application / json是一个很好的做法。
replyTo: 通常用来命名一个 callback
queue.
correlationId: 用于将RPC响应与请求相关联。

3.Correlation Id 关联id

In the method presented above we suggest creating a callback queue for every RPC request. That's pretty inefficient, but fortunately there is a better way - let's create a single callback queue per client.

在上面提出的方法中,我们建议为每个RPC请求创建一个回调队列。 这是非常低效的,但幸运的是有一个更好的方法 - 让我们为每个客户端创建一个回调队列。

That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationId property
is used. We're going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we'll be able to match a response with a request. If we see an unknown correlationId value,
we may safely discard the message - it doesn't belong to our requests.

这引发了一个新的问题,callback队列收到一个响应,但是不知道响应是属于他自己的?

因为所有的响应都推送到了同一个callback队列。上一小节的提到的correlation_id在这种情况下就派上用场了。 对于每个request,都设置唯一的标识值。 之后,当我们在回调队列中收到一条消息时,我们将查看此属性的值,这样我们将能够将响应与请求相匹配。 如果我们看到一个未知的correlationId值,我们可能会安全地丢弃该消息 -
因为它不属于我们的请求。

You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It's due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending
us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be
idempotent.

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是抛出错误? 这可能是由于服务器端出现了问题。 RPC服务器可能会在发送给我们的响应后,还没有接收到发送响应请求的确认消息之前停止工作了。 如果发生这种情况,重新启动的RPC服务器将再次处理该请求。 这就是为什么在客户端上,我们必须优雅地处理这些重复的响应,而且RPC理应上是幂等的。


4.Summary 总结



 

当客户端启动时,它创建一个匿名的 anonymous 独占的exclusive的callback队列。

对于每个RPC请求,客户端发送一个具有两个属性的消息: 
            replyTo,callback队列的名字。                                                                                                                                                  
                    correlationId,标识request的唯一值,用来匹配后续从回调队列接收到的响应是对应哪个request的。

请求然后被发送到rpc_queue队列。

RPC worker(aka:server)等待队列上的请求。 一旦请求出现时,它将执行该作业,并使用replyTo字段中的标识的队列将结果发送回客户端。

客户端等待回调队列中的数据。 当响应消息出现时,它检查消息中的correlationId属性。 如果它与某个请求中的设置的值相匹配,则返回对应用程序的响应。


5.实现Putting it all together最终实现

我们先定义fibonacci函数。 用来模拟耗时任务

The Fibonacci task:
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}


We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).

然后完成RPC服务端代码

The code for our RPC server RPCServer.java looks like this:
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

public static void main(String[] argv) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = null;
try {
connection      = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();

String response = "";

try {
String message = new String(body,"UTF-8");
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
response += fib(n);
}
catch (RuntimeException e){
System.out.println(" [.] " + e.toString());
}
finally {
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

//...
}
}
}


The server code is rather straightforward:
As usual we start by establishing the connection, channel and declaring the queue.
We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetchCount setting
in channel.basicQos.
We use basicConsume to access
the queue, where we provide a callback in the form of an object (DefaultConsumer) that will do the
work and send the response back.

The code for our RPC client RPCClient.java:
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;

public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

connection = factory.newConnection();
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();
}

public String call(String message) throws IOException, InterruptedException {
String corrId = UUID.randomUUID().toString();

AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});

return response.take();
}

public void close() throws IOException {
connection.close();
}

//...
}


The client code is slightly more involved:
We establish a connection and channel and declare an exclusive 'callback' queue for replies.
We subscribe to the 'callback' queue, so that we can receive RPC responses.
Our call method makes the
actual RPC request.
Here, we first generate a unique correlationId number
and save it - our implementation of handleDelivery in DefaultConsumer will
use this value to catch the appropriate response.
Next, we publish the request message, with two properties: replyTo and correlationId.
At this point we can sit back and wait until the proper response arrives.
Since our consumer delivery handling is happening in a separate thread, we're going to need something to suspend main thread
before response arrives. Usage of BlockingQueue is one of possible solutions. Here we are creating ArrayBlockingQueue with
capacity set to 1 as we need to wait for only one response.
The handleDelivery method
is doing a very simple job, for every consumed response message it checks if the correlationId is
the one we're looking for. If so, it puts the response to BlockingQueue.
At the same time main thread
is waiting for response to take it from BlockingQueue.
Finally we return the response back to the user.

Making the Client request:
RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();


Now is a good time to take a look at our full example source code (which includes basic exception handling) for RPCClient.java and RPCServer.java.

Compile and set up the classpath as usual (see tutorial one):
javac -cp $CP RPCClient.java RPCServer.java


Our RPC service is now ready. We can start the server:
java -cp $CP RPCServer
# => [x] Awaiting RPC requests


To request a fibonacci number run the client:
java -cp $CP RPCClient
# => [x] Requesting fib(30)


The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:
If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in
a new console.
On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queueDeclare are
required. As a result the RPC client needs only one network round trip for a single RPC request.

Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
How should the client react if there are no servers running?
Should a client have some kind of timeout for the RPC?
If the server malfunctions and raises an exception, should it be forwarded to the client?
Protecting against invalid incoming messages (eg checking bounds, type) before processing.

If you want to experiment, you may find the management UI useful for viewing the queues.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: