RabbitMQ实现RPC技术
2016-09-20 10:50
579 查看
RabbitMQ可以用于实现RPC技术,其原理框图如下:
这时候其实RabbitMQ服务器只是充当一个转发的中心节点,这里的Client和Server其实都充当了producer和consumer:
Server:
(1)建立与RabbitMQ服务器的连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
(2)在该连接上创建channel
ch, err := conn.Channel()
defer ch.Close()
(3)以consumer身份声明一个Queue
q, err := ch.QueueDeclare(
"rpc_queue", // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
(4)以consumer身份consume消息
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
(5)处理从声明的队列发送过来的消息
(6)以producer身份publish消息(即处理完的结果)到指定的发送队列(这里的发送队列是由接收消息的时候,消息发送方Client所指定的,它将该发送队列的名字一并发送过来)。并且publish的时候使用默认Exchange,即“”,routing key使用Client发送过来的队列名字,即d.ReplyTo,d是循环消息的时候一个中间变量for d := range msgs
,在Publishing的时候指定CorrelationId,这个是用于识别双方的一个唯一的ID号,也是从Client端获得d.CorrelationId,由Client负责发送过来。
err = ch.Publish(
"", // exchange
d.ReplyTo, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
})
Client:
(1)建立与RabbitMQ服务器的连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
(2)在该连接上创建channel
ch, err := conn.Channel()
defer ch.Close()
(3)以consumer身份声明一个Queue
其中,队列采用匿名队列,即名字设为“”,此时由RabbitMQ服务器为该匿名队列自动生成一个唯一的名字,exclusive设为true说明队列是私有的,即队列声明者才可以从队列中取出消息。
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // noWait
nil, // arguments
)
(4)以consumer身份consume消息
这里这么做是等待Server处理完消息后,将结果发送到Client所指定的接收队列,由Client来消费这个结果
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
(5)以producer身份publish消息
这里是向Server发送请求处理的消息,发送至Server所指定的队列,即“rpc_queue”这个是需要令Client所知道的,publishing的时候还要将CorrelationId、ReplyTo发送过去,CorrelationId是用于标识双方这一次通信的唯一ID号,ReplyTo是告知Server应该将结果发送至哪一个队列当中(由Client接收并消费)
err = ch.Publish(
"", // exchange
"rpc_queue", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
})
(6)循环接收的消息,判断获得的ID号是否等于CorrelationId,是的话就进行结果处理,break出循环
for d := range msgs {
if corrId == d.CorrelationId {
res, err = strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
break
}
}
这时候其实RabbitMQ服务器只是充当一个转发的中心节点,这里的Client和Server其实都充当了producer和consumer:
Server:
(1)建立与RabbitMQ服务器的连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
(2)在该连接上创建channel
ch, err := conn.Channel()
defer ch.Close()
(3)以consumer身份声明一个Queue
q, err := ch.QueueDeclare(
"rpc_queue", // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
(4)以consumer身份consume消息
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
(5)处理从声明的队列发送过来的消息
(6)以producer身份publish消息(即处理完的结果)到指定的发送队列(这里的发送队列是由接收消息的时候,消息发送方Client所指定的,它将该发送队列的名字一并发送过来)。并且publish的时候使用默认Exchange,即“”,routing key使用Client发送过来的队列名字,即d.ReplyTo,d是循环消息的时候一个中间变量for d := range msgs
,在Publishing的时候指定CorrelationId,这个是用于识别双方的一个唯一的ID号,也是从Client端获得d.CorrelationId,由Client负责发送过来。
err = ch.Publish(
"", // exchange
d.ReplyTo, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
})
Client:
(1)建立与RabbitMQ服务器的连接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
(2)在该连接上创建channel
ch, err := conn.Channel()
defer ch.Close()
(3)以consumer身份声明一个Queue
其中,队列采用匿名队列,即名字设为“”,此时由RabbitMQ服务器为该匿名队列自动生成一个唯一的名字,exclusive设为true说明队列是私有的,即队列声明者才可以从队列中取出消息。
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // noWait
nil, // arguments
)
(4)以consumer身份consume消息
这里这么做是等待Server处理完消息后,将结果发送到Client所指定的接收队列,由Client来消费这个结果
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
(5)以producer身份publish消息
这里是向Server发送请求处理的消息,发送至Server所指定的队列,即“rpc_queue”这个是需要令Client所知道的,publishing的时候还要将CorrelationId、ReplyTo发送过去,CorrelationId是用于标识双方这一次通信的唯一ID号,ReplyTo是告知Server应该将结果发送至哪一个队列当中(由Client接收并消费)
err = ch.Publish(
"", // exchange
"rpc_queue", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
})
(6)循环接收的消息,判断获得的ID号是否等于CorrelationId,是的话就进行结果处理,break出循环
for d := range msgs {
if corrId == d.CorrelationId {
res, err = strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
break
}
}
相关文章推荐
- rabbitmq整合spring实现rpc技术调用
- springmvc 集成rabbitmq,实现RPC通信方式
- Python案例-网络编程-使用RabbitMQ实现RPC简易分享
- Vert.x 技术内幕 | 异步RPC实现原理
- 易用的C++ RPC服务框架 - pioneer - 4 - 技术实现:函数的序列化
- 中间件系列八 RabbitMQ之实现RPC模式
- 高并发处理技术老司机带你玩RabbitMq实现性能倍增
- hadoop 使用内部RPC技术实现通信
- RabbitMQ 实现RPC
- RabbitMQ实现异步及同步RPC
- RabbitMQ随手笔记(九)RabbitMQ实现RPC(netCore2.0)
- python rabbitMQ 实现RPC
- rabbitmq RPC java实现
- RabbitMQ 实现RPC
- Python RabbitMQ消息队列实现rpc
- 高并发处理技术老司机带你玩RabbitMq实现性能倍增
- WinForm实现Rabbitmq官网6个案例-RPC
- 利用RabbitMQ实现RPC(python)
- 高并发处理技术老司机带你玩RabbitMq实现性能倍增
- 易用的C++ RPC服务框架 - pioneer - 5 - 技术实现:函数的序列化