您的位置:首页 > 大数据

柯南君:看大数据时代下的IT架构(9)消息队列之RabbitMQ--案例(RPC起航)

2015-07-19 17:21 706 查看

二、Remote procedure call (RPC)(using the Java client)

三、Client interface(客户端接口)

为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞,直到收到RPC运算的结果。代码如下:


fibonacci_rpc = FibonacciRpcClient()

result = fibonacci_rpc.call(4)

print "fib(4) is %r" % (result,)

四、 总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:


[java] view plaincopyprint?





result = channel.queue_declare(exclusive=True)

callback_queue = result.method.queue

channel.basic_publish(exchange='',

routing_key='rpc_queue',

properties=pika.BasicProperties(

reply_to = callback_queue,

),

body=request)

Message properties

AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。

content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。

reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。

correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。

四、Correlation Id  在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

五、(总结)

工作流程:


当客户端启动时,它创建了匿名的exclusive callback queue.

客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.

请求将被发送到an rpc_queue queue.

RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。

client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。

六、

Putting it all together






private static int fib(int n) throws Exception {

if (n == 0) return 0;

if (n == 1) return 1;

return fib(n-1) + fib(n-2);

}

 RPCServer.java :



private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();

BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();

String message = new String(delivery.getBody());
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);

channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}



服务器代码相当简单:


像往常一样,我们首先建立连接、通道和声明队列。

我们可能想要运行多个服务器进程。为了分散负载同样在多个服务器,我们需要设置在channel.basicQos prefetchCount设置。

我们使用basicConsume访问队列。然后我们进入while循环,我们等待请求消息,并发送响应工作。

RPCClient.java:

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();

BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

channel.basicPublish("", requestQueueName, props, message.getBytes());

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}

return response;
}

public void close() throws Exception {
connection.close();
}


客户端代码部分涉及到:


我们建立了一个"connecttion"(连接) 和 "channel"(通道)并且为replies(回复)声明一个独一无二的"callback"(回调);

我们订阅了"callback"(回调)队列,这样我们就可以收到RPC的回应了;

我们调用的方法是实际的RPC;

接下来我们publish(发布)请求消息,有两个属性,分别是:replyTo 和 correlationId;

在这点,我们可以坐下来,直到适当的响应到达;

while循环做了一件非常简单的工作,它会检查每一个消息响应,如果当前的最后,我们将响应给用户;

客户端请求:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();



现在是时候,该看看我们的整体完整的示例源代码了:RPCClent.java(包括基本的异常处理)和RPCServer.java,像往常一样编译和设置路径(可以参考前面的教程)


RPCClient.java:


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;

public class RPCClient {

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();

BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

channel.basicPublish("", requestQueueName, props, message.getBytes());

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(),"UTF-8");
break;
}
}

return response;
}

public void close() throws Exception {
connection.close();
}

public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
}
catch  (Exception e) {
e.printStackTrace();
}
finally {
if (fibonacciRpc!= null) {
try {
fibonacciRpc.close();
}
catch (Exception ignore) {}
}
}
}
}<strong>
</strong>



RPCServer.java:



import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private static int fib(int n) {
if (n ==0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}

public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
String response = null;

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();

try {
String message = new String(delivery.getBody(),"UTF-8");
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
}
catch (Exception e){
System.out.println(" [.] " + e.toString());
response = "";
}
finally {
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
catch  (Exception e) {
e.printStackTrace();
}
finally {
if (connection != null) {
try {
connection.close();
}
catch (Exception ignore) {}
}
}
}
}


$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java

我们的RPC service现在准备好了,我们开始启动server:

$ java -cp $CP RPCServer
[x] Awaiting RPC requests

发布一个fibonacci 数字,运行在client(客户端):

$ java -cp $CP RPCClient
[x] Requesting fib(30)

本节提供的设计并不是唯一的RPC服务实现,但它还是有一定的优点的:

如果RPC server(服务器)太慢了,你仅仅需要运行另一个,就可以扩展;尝试在新的控制台,运行第二个吧;

在客户端,RPC需要发送和接收的消息只有一个,不需要像queueDeclare 同步调用,因为RPC客户端为了一个RPC请求,只需要一个网络往返;

我们的代码依然很简单,不试图去解决更加繁杂的问题,但是非常重要,像以下这样:


如果没有服务运行,客户端将怎么去做?

客户端应该有RPC超时么?

如果服务器出现故障,爆出一个异常,应该发给客户端么?

防止传入错误的消息(如范围检查、类型)前处理
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐