rabbitmq(5)订阅模式
2018-03-25 16:27
330 查看
1、模型
一个生产者绑定一个交换机,每个消费者绑定一个队列。生产者将消息通过交换器分发给所有在线的消费者。交换机没有消息存储的能力,只能向当前在线的消费者发送消息。因此未接收到消息的消费者,即使重新连接rabbitmq也无法获取到已发送的消息
订阅模式与普通队列的区别:
1. 普通队列中生产者发送的消息只能有一个消费者接受到;订阅模式中生产者发送的消息会被所有在线的消费者接收到;
2. 普通队列中生产者消息会被储存,即使消费者没有成功接受到消息,重新上线后也可以重新收到该消息;订阅模式中生产者消息不会被储存;
2、代码
2.1、生产者
private final static String EXCHANGE_NAME = "test_exchange"; @Test public void testSend() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setUsername("root"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* * 申明交换机, fanout:分发 * 交换机没有消息存储的能力,只能向当前在线的消费者发送消息 * 未接收到消息的消费者,即使重新连接rabbitmq也无法获取到已发送的消息 */ channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "Hello world"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [1] Sent '" + message + "'"); channel.close(); connection.close(); }
2.2、消费者1
private final static String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setUsername("root"); factory.setPassword("123456"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); // 绑定队列到交换机转发器 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [1] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [1] Received '" + message + "'"); channel.basicAck(envelope.getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(queueName, autoAck, consumer); }
2.2、消费者2
private final static String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setUsername("root"); factory.setPassword("123456"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); // 绑定队列到交换机转发器 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [2] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [2] Received '" + message + "'"); channel.basicAck(envelope.getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(queueName, autoAck, consumer); }
相关文章推荐
- RabbitMQ 一二事(3) - 订阅模式(微信公众号模式)的应用
- RabbitMq六种使用模式(3)_订阅发布模式
- 【RabbitMQ】三种Exchange模式——订阅、路由、通配符模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- .Net下RabbitMQ发布订阅模式实践
- RabbitMQ下的生产消费者模式与订阅发布模式
- (java)简单实现原生RabbitMQ中的广播订阅(fanout)模式
- 7_rabbitmq订阅模式 PublishSubscribe
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMQ入门-发布订阅模式
- RabbitMQ3.7.2入门到进阶之订阅模式Publish/Subscribe
- RabbitMQ五种消息队列学习(四)--发布订阅模式
- RabbitMQ入门-消息订阅模式
- RabbitMQ 使用 | 第三篇:发布/订阅模式
- RabbitMQ-订阅模式publish/subscribe
- 发布——订阅模式
- 设计模式在软件系统中的应用(一)之订阅模式