rabbitmq学习6:RPC
2016-04-22 14:28
323 查看
在《rabbitmq学习2:Work Queues 》中我们已经知道了在多个worker如何分配耗时的任务。如果我现在要在远程的机器上运行然后得到结果,那应当怎么做呢?那就要用到RPC(Remote Procedure Call or RPC )了!
关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726
现在来看看来看看Rabbitmq中RPC吧!RPC的工作示意图如下:
上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:
1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;
2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致
3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。
对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类;而消息 属性在AMQP的协议中规定有14个;而很多大部分我们没有用到。常用的几个属性有:
English代码
Message properties
The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
reply_to: Commonly used to name a callback queue.
correlation_id: Useful to correlate RPC responses with requests.
delivery_mode : 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;
content_type : 用来描述MIME的类型。如把其类型设定为JSON;
reply_to : 用于命名一个回调Queue;
correlation_id : 用于与相关联的请求的RPC响应.
现在我们就开始RPC的程序吧!
client的代码如下:
Java代码
package com.abin.rabbitmq;
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties();
props.setReplyTo(replyQueueName);
props.setCorrelationId(corrId);
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(-1)");
response = fibonacciRpc.call("-1");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(a)");
response = fibonacciRpc.call("a");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}
server的代码如下:
Java代码
package com.abin.rabbitmq;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n > 1)
return fib(n - 1) + fib(n - 2);
else
return n;
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties();
replyProps.setCorrelationId(props.getCorrelationId());
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
先运行服务器端,运行结果如下:
Java代码
[x] Awaiting RPC requests
再运行运行客户端,运行结果如下:
Java代码
[x] Requesting fib(30)
[.] Got '832040'
[x] Requesting fib(-1)
[.] Got '-1'
[x] Requesting fib(a)
[.] Got ''
在服务器还可以出现:
Java代码
[.] fib(30)
[.] fib(-1)
[.] java.lang.NumberFormatException: For input string: "a"
关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726
现在来看看来看看Rabbitmq中RPC吧!RPC的工作示意图如下:
上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:
1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;
2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致
3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。
对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类;而消息 属性在AMQP的协议中规定有14个;而很多大部分我们没有用到。常用的几个属性有:
English代码
Message properties
The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
reply_to: Commonly used to name a callback queue.
correlation_id: Useful to correlate RPC responses with requests.
delivery_mode : 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;
content_type : 用来描述MIME的类型。如把其类型设定为JSON;
reply_to : 用于命名一个回调Queue;
correlation_id : 用于与相关联的请求的RPC响应.
现在我们就开始RPC的程序吧!
client的代码如下:
Java代码
package com.abin.rabbitmq;
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties();
props.setReplyTo(replyQueueName);
props.setCorrelationId(corrId);
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(-1)");
response = fibonacciRpc.call("-1");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(a)");
response = fibonacciRpc.call("a");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}
server的代码如下:
Java代码
package com.abin.rabbitmq;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n > 1)
return fib(n - 1) + fib(n - 2);
else
return n;
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties();
replyProps.setCorrelationId(props.getCorrelationId());
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
先运行服务器端,运行结果如下:
Java代码
[x] Awaiting RPC requests
再运行运行客户端,运行结果如下:
Java代码
[x] Requesting fib(30)
[.] Got '832040'
[x] Requesting fib(-1)
[.] Got '-1'
[x] Requesting fib(a)
[.] Got ''
在服务器还可以出现:
Java代码
[.] fib(30)
[.] fib(-1)
[.] java.lang.NumberFormatException: For input string: "a"
相关文章推荐
- Rabbitmq集群搭建笔记
- 利用Python学习RabbitMQ消息队列
- python使用rabbitmq实现网络爬虫示例
- Linux下PHP扩展amqp安装
- rabbitmq学习
- CentOS6.5 安装rabbitmq
- 非常不错的rabbitmq集群高可用部署
- Rabbitmq 安装与配置
- rabbitmq可靠性保证(原文加上部分标记)
- rabbitmq流控制(原文标记)
- RabbitMQ 流控制学习
- 自研轻量级分布式实时计算框架light_drtc
- win7安装RabbitMQ
- RabbitMQ liunx-centos 安装过程记录
- rabbitmq集群安装实践
- AMQP协议
- RabbitMQ 系列——1.初始
- rabbitMQ安装
- rabbitmq——用户管理
- rabbitmq 消息传送与监听