您的位置:首页 > 其它

RabbitMQ实现RPC技术

2016-09-20 10:50 579 查看
RabbitMQ可以用于实现RPC技术,其原理框图如下:



这时候其实RabbitMQ服务器只是充当一个转发的中心节点,这里的Client和Server其实都充当了producer和consumer:

Server:

(1)建立与RabbitMQ服务器的连接

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

        defer conn.Close()

(2)在该连接上创建channel

ch, err := conn.Channel()

    defer ch.Close()

(3)以consumer身份声明一个Queue

q, err := ch.QueueDeclare(

        "rpc_queue", // name

        false,       // durable

        false,       // delete when usused

        false,       // exclusive

        false,       // no-wait

        nil,         // arguments

    )

(4)以consumer身份consume消息

msgs, err := ch.Consume(

        q.Name, // queue

        "",     // consumer

        false,  // auto-ack

        false,  // exclusive

        false,  // no-local

        false,  // no-wait

        nil,    // args

    )

(5)处理从声明的队列发送过来的消息

(6)以producer身份publish消息(即处理完的结果)到指定的发送队列(这里的发送队列是由接收消息的时候,消息发送方Client所指定的,它将该发送队列的名字一并发送过来)。并且publish的时候使用默认Exchange,即“”,routing key使用Client发送过来的队列名字,即d.ReplyTo,d是循环消息的时候一个中间变量for d := range msgs
,在Publishing的时候指定CorrelationId,这个是用于识别双方的一个唯一的ID号,也是从Client端获得d.CorrelationId,由Client负责发送过来。

err = ch.Publish(

                "",        // exchange

                d.ReplyTo, // routing key

                false,     // mandatory

                false,     // immediate

                amqp.Publishing{

                    ContentType:   "text/plain",

                    CorrelationId: d.CorrelationId,

                    Body:          []byte(strconv.Itoa(response)),

                })

Client:

(1)建立与RabbitMQ服务器的连接

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

        defer conn.Close()

(2)在该连接上创建channel

ch, err := conn.Channel()

    defer ch.Close()

(3)以consumer身份声明一个Queue

其中,队列采用匿名队列,即名字设为“”,此时由RabbitMQ服务器为该匿名队列自动生成一个唯一的名字,exclusive设为true说明队列是私有的,即队列声明者才可以从队列中取出消息。

q, err := ch.QueueDeclare(

        "",    // name

        false, // durable

        false, // delete when usused

        true,  // exclusive

        false, // noWait

        nil,   // arguments

    )

(4)以consumer身份consume消息

这里这么做是等待Server处理完消息后,将结果发送到Client所指定的接收队列,由Client来消费这个结果

msgs, err := ch.Consume(

        q.Name, // queue

        "",     // consumer

        true,   // auto-ack

        false,  // exclusive

        false,  // no-local

        false,  // no-wait

        nil,    // args

    )

(5)以producer身份publish消息

这里是向Server发送请求处理的消息,发送至Server所指定的队列,即“rpc_queue”这个是需要令Client所知道的,publishing的时候还要将CorrelationId、ReplyTo发送过去,CorrelationId是用于标识双方这一次通信的唯一ID号,ReplyTo是告知Server应该将结果发送至哪一个队列当中(由Client接收并消费)

err = ch.Publish(

        "",          // exchange

        "rpc_queue", // routing key

        false,       // mandatory

        false,       // immediate

        amqp.Publishing{

            ContentType:   "text/plain",

            CorrelationId: corrId,

            ReplyTo:       q.Name,

            Body:          []byte(strconv.Itoa(n)),

        })

(6)循环接收的消息,判断获得的ID号是否等于CorrelationId,是的话就进行结果处理,break出循环

for d := range msgs {

        if corrId == d.CorrelationId {

            res, err = strconv.Atoi(string(d.Body))

            failOnError(err, "Failed to convert body to integer")

            break

        }

    }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: