您的位置:首页 > 其它

rabbitmq学习6:RPC

2016-04-22 14:28 323 查看
   在《rabbitmq学习2:Work Queues 》中我们已经知道了在多个worker如何分配耗时的任务。如果我现在要在远程的机器上运行然后得到结果,那应当怎么做呢?那就要用到RPC(Remote Procedure Call or RPC )了!

   关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726

   现在来看看来看看Rabbitmq中RPC吧!RPC的工作示意图如下:



   上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:

1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

 

  对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类;而消息 属性在AMQP的协议中规定有14个;而很多大部分我们没有用到。常用的几个属性有:

English代码  


Message properties  

The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:  

  

delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.   

content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.   

reply_to: Commonly used to name a callback queue.   

correlation_id: Useful to correlate RPC responses with requests.   

 

 delivery_mode : 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;   

  content_type : 用来描述MIME的类型。如把其类型设定为JSON;

  reply_to : 用于命名一个回调Queue;

  correlation_id : 用于与相关联的请求的RPC响应.

  现在我们就开始RPC的程序吧!

client的代码如下:

Java代码  


package com.abin.rabbitmq;  

  

import java.util.UUID;  

  

import com.rabbitmq.client.AMQP.BasicProperties;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.ConnectionFactory;  

import com.rabbitmq.client.QueueingConsumer;  

  

public class RPCClient {  

    private Connection connection;  

    private Channel channel;  

    private String requestQueueName = "rpc_queue";  

    private String replyQueueName;  

    private QueueingConsumer consumer;  

  

    public RPCClient() throws Exception {  

        ConnectionFactory factory = new ConnectionFactory();  

        factory.setHost("localhost");  

        connection = factory.newConnection();  

        channel = connection.createChannel();  

  

        replyQueueName = channel.queueDeclare().getQueue();  

        consumer = new QueueingConsumer(channel);  

        channel.basicConsume(replyQueueName, true, consumer);  

    }  

  

    public String call(String message) throws Exception {  

        String response = null;  

        String corrId = UUID.randomUUID().toString();  

  

        BasicProperties props = new BasicProperties();  

        props.setReplyTo(replyQueueName);  

        props.setCorrelationId(corrId);  

  

        channel.basicPublish("", requestQueueName, props, message.getBytes());  

  

        while (true) {  

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

            if (delivery.getProperties().getCorrelationId().equals(corrId)) {  

                response = new String(delivery.getBody(), "UTF-8");  

                break;  

            }  

        }  

  

        return response;  

    }  

  

    public void close() throws Exception {  

        connection.close();  

    }  

  

    public static void main(String[] argv) {  

        RPCClient fibonacciRpc = null;  

        String response = null;  

        try {  

            fibonacciRpc = new RPCClient();  

  

            System.out.println(" [x] Requesting fib(30)");  

            response = fibonacciRpc.call("30");  

            System.out.println(" [.] Got '" + response + "'");  

            System.out.println(" [x] Requesting fib(-1)");  

            response = fibonacciRpc.call("-1");  

            System.out.println(" [.] Got '" + response + "'");  

            System.out.println(" [x] Requesting fib(a)");  

            response = fibonacciRpc.call("a");  

            System.out.println(" [.] Got '" + response + "'");  

        } catch (Exception e) {  

            e.printStackTrace();  

        } finally {  

            if (fibonacciRpc != null) {  

                try {  

                    fibonacciRpc.close();  

                } catch (Exception ignore) {  

                }  

            }  

        }  

    }  

}  

 

 server的代码如下:

Java代码  


package com.abin.rabbitmq;  

  

import com.rabbitmq.client.AMQP.BasicProperties;  

import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.ConnectionFactory;  

import com.rabbitmq.client.QueueingConsumer;  

  

public class RPCServer {  

    private static final String RPC_QUEUE_NAME = "rpc_queue";  

  

    private static int fib(int n) {  

        if (n > 1)  

            return fib(n - 1) + fib(n - 2);  

        else  

            return n;  

    }  

  

    public static void main(String[] argv) {  

        Connection connection = null;  

        Channel channel = null;  

        try {  

            ConnectionFactory factory = new ConnectionFactory();  

            factory.setHost("localhost");  

  

            connection = factory.newConnection();  

            channel = connection.createChannel();  

  

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);  

  

            channel.basicQos(1);  

  

            QueueingConsumer consumer = new QueueingConsumer(channel);  

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  

  

            System.out.println(" [x] Awaiting RPC requests");  

  

            while (true) {  

                String response = null;  

  

                QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

  

                BasicProperties props = delivery.getProperties();  

                BasicProperties replyProps = new BasicProperties();  

                replyProps.setCorrelationId(props.getCorrelationId());  

  

                try {  

                    String message = new String(delivery.getBody(), "UTF-8");  

                    int n = Integer.parseInt(message);  

  

                    System.out.println(" [.] fib(" + message + ")");  

                    response = "" + fib(n);  

                } catch (Exception e) {  

                    System.out.println(" [.] " + e.toString());  

                    response = "";  

                } finally {  

                    channel.basicPublish("", props.getReplyTo(), replyProps,  

                            response.getBytes("UTF-8"));  

  

                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),  

                            false);  

                }  

            }  

        } catch (Exception e) {  

            e.printStackTrace();  

        } finally {  

            if (connection != null) {  

                try {  

                    connection.close();  

                } catch (Exception ignore) {  

                }  

            }  

        }  

    }  

}  

 

先运行服务器端,运行结果如下:

Java代码  


[x] Awaiting RPC requests  

   再运行运行客户端,运行结果如下:

Java代码  


[x] Requesting fib(30)  

[.] Got '832040'  

[x] Requesting fib(-1)  

[.] Got '-1'  

[x] Requesting fib(a)  

[.] Got ''  

   在服务器还可以出现:

Java代码  


[.] fib(30)  

[.] fib(-1)  

[.] java.lang.NumberFormatException: For input string: "a"  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rabbitmq