您的位置:首页 > 其它

RabbitMq05——订阅模式

2018-07-08 13:29 344 查看

    有时候我们希望的是当生产者发送一条消息的时候,与它相关的消费者都能接收到该消息,而不是每一条消息只能一个消费者消费。即相当于我们关注的公众号,公众号发送一条消息,但是所有关注它的用户都可以收到该条消息。

    要实现这种模式,需要加入交换机来作为中间转换。即生产者发送消息不再直接发到队列,而是发送到交换机,再由交换机发送到对应的队列之中。但是每一个消费者都有一个自己的队列,并与之绑定,同时再将队列都绑定到交换机上,这样就可以实现一条消息有多个消费者来消费,也就是订阅模式

生产者

[code]package com.mmr.rabbitmq.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

private final static String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {
//获取MQ连接
Connection connection = ConnectionUtils.getConnection();
//获取信道
Channel channel = connection.createChannel();

//声明交换机,设置交换机类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String message = "i am coming------>";

for (int i = 0; i < 20; i++) {
//向交换机中发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes());
System.out.println(message + i);

}
channel.close();
connection.close();
}
}

消费者1

[code]package com.mmr.rabbitmq.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Rev1 {

private final static String EXCHANGE_NAME = "fanout_exchange";
private final static String QUEUE_NAME = "fanout_queue01";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//声明每次向消费者发送一条消息
channel.basicQos(1);
//将队列与交换机绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//创建一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "utf-8");

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//一般用于异常处理,告知MQ消息处理失败,
//第二个参数为true,则该条消息被重新放回队列,为false则放弃该条消息
//					channel.basicReject(envelope.getDeliveryTag(), false);
e.printStackTrace();
}finally {
System.out.println("Recv1---->" + msg + "---->Done");
//向MQ发送确认,告诉MQ该条消息已经消费,可以重新发送了
//第二个参数为false表示只确认当前这一条消息,
//如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认(批量确认针对的是整个信道)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

消费者2

    消费者2与消费者1类似

[code]package com.mmr.rabbitmq.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Rev2 {

private final static String EXCHANGE_NAME = "fanout_exchange";
private final static String QUEUE_NAME = "fanout_queue02";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "utf-8");

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("Recv1---->" + msg + "---->Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

运行两个消费者,然后运行生产者,从控制台我们可以看到消费者1和消费者2同时都收到了生产者所发送的消息。

长按关注公众号“魔性JAVA”,会不定时免费推送JAVA相关的学习资料和知识

 

 

 

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: