RabbitMQ 6种工作模式
面试:你懂什么是分布式系统吗?Redis分布式锁都不会?>>>
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
1. Work queues 工作队列
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
2. Publish/subscribe 发布/订阅
2.1 工作模式
发布订阅模式:
1、每个消费者监听自己的队列;
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
2.2 代码
案例:
用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。
1、生产者
声明Exchange_fanout_inform交换机。
声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
发送消息时不需要指定routingkey。
package com.xuecheng.rabbitemq.test; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName: producer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:31 * @Version: 1.0 */ public class producer02_publish { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; @Test public void send01() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); /** * 声明队列,如果Rabbit中没有此队列将自动创建 * param1:队列名称 * param2:是否持久化 * param3:队列是否独占此连接 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //将交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,""); /** * 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体 */ for (int i = 0; i < 5; i++) { String message = "send message to user"+i; channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes("UTF-8")); System.out.println("发布:" + message); } } catch (Exception e) { e.printStackTrace(); }finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
2.邮件发送消费者
package com.xuecheng.rabbitmq.test; import com.rabbitmq.client.*; import java.io.IOException; /** * @ClassName: Consumer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:43 * @Version: 1.0 */ public class Consumer02_subscribe_email { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT); channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String msg = new String(body,"utf-8"); System.out.println("订阅:" + msg); } }; channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } catch (Exception e) { e.printStackTrace(); } } }
按照上边的代码,编写邮件通知的消费代码。
3、短信发送消费者
参考上边的邮件发送消费者代码编写。
2.3 思考
1、publish/subscribe与work queues有什么区别。区别:
1)work queues不用定义交换机,而publish/subscribe需要定义交换机。
2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。
3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑定到默认的交换机。
相同点: 所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实质工作用什么 publish/subscribe还是work queues。
建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。
3. Routing
3.1 工作模式
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
3.2 代码
1、生产者
声明exchange_routing_inform交换机;
声明两个队列并且绑定到此交换机,绑定时需要指定routingkey发送消息时需要指定routingkey。
package com.xuecheng.rabbitemq.test; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName: producer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:31 * @Version: 1.0 */ public class producer02_routing { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_DIRECT_INFORM="exchange_direct_inform"; private static final String ROUTINGKEY_EMAIL="routingkey_email"; private static final String ROUTINGKEY_SMS="routingkey_sms"; @Test public void send01() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_DIRECT_INFORM, BuiltinExchangeType.DIRECT); /** * 声明队列,如果Rabbit中没有此队列将自动创建 * param1:队列名称 * param2:是否持久化 * param3:队列是否独占此连接 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //将交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_DIRECT_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_DIRECT_INFORM,ROUTINGKEY_SMS); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_DIRECT_INFORM,"both"); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_DIRECT_INFORM,"both"); /** * 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体 */ for (int i = 0; i < 5; i++) { String message = "send message to user"+i; channel.basicPublish(EXCHANGE_DIRECT_INFORM,"both",null,message.getBytes("UTF-8")); System.out.println("发布:" + message); } } catch (Exception e) { e.printStackTrace(); }finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
2、邮件发送消费者
package com.xuecheng.rabbitmq.test; import com.rabbitmq.client.*; import java.io.IOException; /** * @ClassName: Consumer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:43 * @Version: 1.0 */ public class Consumer02_routing_email { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_DIRECT_INFORM="exchange_direct_inform"; private static final String ROUTINGKEY_EMAIL="routingkey_email"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1 1bb8c "); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_DIRECT_INFORM,BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_DIRECT_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_DIRECT_INFORM,"both"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String msg = new String(body,"utf-8"); System.out.println("订阅:" + msg); } }; channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } catch (Exception e) { e.printStackTrace(); } } }
3、短信发送消费者
参考邮件发送消费者的代码流程,编写短信通知的代码。
3.3 思考
1、Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
4. Topics
4.1 工作模式
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
4.2 代码
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
1、生产者
声明交换机,指定topic类型:
package com.xuecheng.rabbitemq.test; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName: producer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:31 * @Version: 1.0 */ public class producer02_topics { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_TOPIC_INFORM="exchange_topic_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; private static final String ROUTINGKEY_SMS="inform.#.sms.#"; @Test public void send01() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC); /** * 声明队列,如果Rabbit中没有此队列将自动创建 * param1:队列名称 * param2:是否持久化 * param3:队列是否独占此连接 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //将交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPIC_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPIC_INFORM,ROUTINGKEY_SMS); /** * 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体 */ for (int i = 0; i < 5; i++) { String message = "send message to user"+i; channel.basicPublish(EXCHANGE_TOPIC_INFORM,"inform.sms.email",null,message.getBytes("UTF-8")); System.out.println("发布:" + message); } } catch (Exception e) { e.printStackTrace(); }finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
2、消费端
队列绑定交换机指定通配符:
统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。
package com.xuecheng.rabbitmq.test; import com.rabbitmq.client.*; import java.io.IOException; /** * @ClassName: Consumer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:43 * @Version: 1.0 */ public class Consumer02_topics_email { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_TOPIC_INFORM="exchange_topic_inform"; private static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM,BuiltinExchangeType.TOPIC); channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPIC_INFORM,ROUTINGKEY_EMAIL); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String msg = new String(body,"utf-8"); System.out.println("订阅:" + msg); } }; channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } catch (Exception e) { e.printStackTrace(); } } }
4.3 思考
1、本案例的需求使用Routing工作模式能否实现?
使用Routing模式也可以实现本案例,共设置三个 routingkey,分别是email、sms、all,email队列绑定email和all,sms队列绑定sms和all,这样就可以实现上边案例的功能,实现过程比topics复杂。
Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能。
5. Header模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
代码:
1)生产者
队列与交换机绑定的代码与之前不同,如下:
package com.xuecheng.rabbitemq.test; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; import java.util.Hashtable; import java.util.Map; import java.util.concurrent.TimeoutException; /** * @ClassName: producer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:31 * @Version: 1.0 */ public class producer02_header { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform"; private static final Map<String, Object> headers_email = new Hashtable<>(); private static final Map<String, Object> headers_sms = new Hashtable<>(); static { headers_email.put("inform_type", "email"); headers_sms.put("inform_type", "sms"); } @Test public void send01() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_HEADER_INFORM, BuiltinExchangeType.HEADERS); /** * 声明队列,如果Rabbit中没有此队列将自动创建 * param1:队列名称 * param2:是否持久化 * param3:队列是否独占此连接 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); //将交换机和队列绑定 channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_HEADER_INFORM, "", headers_email); channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_HEADER_INFORM, "", headers_sms); Map<String, Object> map = new Hashtable<>(); map.put("inform_type", "email"); map.put("inform_type", "sms"); AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(map); /** * 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体 */ for (int i = 0; i < 5; i++) { String message = "send message to user" + i; channel.basicPublish(EXCHANGE_HEADER_INFORM, "", properties.build(), message.getBytes("UTF-8")); System.out.println("发布:" + message); } } catch (Exception e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
2)发送邮件消费者
package com.xuecheng.rabbitmq.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Hashtable; import java.util.Map; /** * @ClassName: Consumer01 * @description: * @author: edison_Kwok * @Date: create in 2019/4/17 11:43 * @Version: 1.0 */ public class Consumer02_headers_email { private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform"; private static final Map<String, Object> headers_email = new Hashtable<>(); public static void main(String[] args) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //创建与RabbitMQ服务的TCP连接 connection = connectionFactory.newConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_HEADER_INFORM,BuiltinExchangeType.HEADERS); channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADER_INFORM,"",headers_email); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String msg = new String(body,"utf-8"); System.out.println("订阅:" + msg); } }; channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); } catch (Exception e) { e.printStackTrace(); } } }
6. RPC
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列;
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果;
3、服务端将RPC方法 的结果发送到RPC响应队列;
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
- RabbitMQ的工作模式
- RabbitMQ工作模式详解
- RabbitMQ的几种工作模式
- rabbitmq 消息队列工作模式
- rabbitmq的五种工作模式
- python使用rabbitMQ介绍二(工作队列模式)
- RabbitMQ Java 工作模式
- 被动模式下FTP的详细工作过程
- git的工作模式和Pull Request用法详解
- java全屏工作模式
- 第四章 apache的工作模式
- 使用RabbitMQ做的一些工作及经验教训
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- 一步一步学FRDM-KE02Z(一):IAR调试平台搭建以及OpenSDA两种工作模式设置
- Linux 编辑器工作模式切换
- Ubuntu下 hadoop2.5.1 (伪分布模式) 配置工作
- FTP工作模式分析
- Apache两种工作模式区别及配置切换
- 轻松搞定RabbitMQ(二)——工作队列之消息分发机制
- VMware中Bridged,NAT,Host-Only三种工作模式介绍