您的位置:首页 > 其它

RabbitMQ几种工作模式的发送与消费数据

2020-08-22 18:00 246 查看

一.简单模式

特点:无交换机转发数据(使用默认交换机名""),生产者直接将数据发送至队列,单个消费者消费指定队列的消息。

开发步骤

1.创建连接MQ的连接工厂对象

ConnectionFactory connectionFactory = new ConnectionFactory();

2.设置部分连接参数

connectionFactory.setHost("localhost"); //设置rabbitmq所在的ip地址 默认值为localhost
connectionFactory.setPort(5672);  //设置rabbitmq的端口号 默认值为5672
connectionFactory.setUsername("guest"); //设置操作的用户名 默认为guest
connectionFactory.setPassword("guest");//设置用户名的密码 默认也为guest

3.创建connection对象以及channel对象
4.利用channel对象创建队列
5.发送数据,简单工作模式情况下 routeKey要与队列名一致
6.关闭连接释放资源

//3.获取连接connection,以及channel,利用try..catch自动释放资源
try(Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();) {
//4.创建队列
channel.queueDeclare("hello",true,false,false,null);
//5.设置要发送的消息并发送
String message="rabbitmq";
channel.basicPublish("","hello",null,message.getBytes());
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}

创建队列方法

queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)

que: 队列名
durable::该队列是否进行持久化存储
exclusive: 是否只有一个消费者监听该队列,connection关闭时队列是否删除
autoDelete: 当没有消费者时是否自动删除该队列
arguements: 参数信息

发送消息方法

basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

exchange: 交换机名称简单模式下,使用默认交换机,名空字符串“”
routingKey: 简单模式下,使用默认交换机 ,该值与队列名保持一致
props: 配置信息
body: 发送数据的字节码数组形式

在消费者端接收数据

//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();//2.设置连接参数
connectionFactory.setHost("localhost"); //设置rabbitmq所在的ip地址 默认值为localhost
connectionFactory.setPort(5672);  //设置rabbitmq的端口号 默认值为5672
connectionFactory.setUsername("guest"); //设置操作的用户名 默认为guest
connectionFactory.setPassword("guest");//设置用户名的密码 默认也为guest
//3.获取连接connection,以及channel
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4.创建Consumer对象设置回调方法当接收到消息后如何处理 多态 实现类对象
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consmerTag"+consumerTag);
System.out.println("exchange名称"+envelope.getExchange()); //交换机的名称
System.out.println("routingkey名称"+envelope.getRoutingKey()); //路由名称
System.out.println(properties);
System.out.println("收到的消息:"+new String(body)); //接收到的数据
}
};

//5.接收数据
channel.basicConsume("hello",true,consumer);
}

接收数据方法

basicConsume(String queue, boolean autoAck, Consumer callback)

queue: 队列名从该队列获取数据
autoAck: 在收到消息后是否自动反馈给MQ
callback: 回调对象,创建Consumer对象,并以内部类的方式重新handleDelivery方法,对收到的数据进行对应的操作

二.工作队列模式(work queues)

特点:无交换机转发数据(使用默认交换机名""),多个消费者以轮循的方式竞争同一个队列中的消息,消费者1读取一条,消费者2读取一条,一般用于任务过重或任务较多的情况

开发步骤

1.创建连接工厂对象
2.设置参数
3.创建connection与channel对象
4.创建队列
5.发送数据
6.关闭资源

消费时创建多个消费者类获取同一个队列数据即可

三.订阅模式(Pub/Sub)

特点:使用交换机,利用交换机将数据转发到与该交换机绑定的所有队列中,每个队列都可以获得相同的数据

开发步骤

1.创建连接工厂对象
2.设置参数
3.创建connection与channel对象
4.创建交换机

try(Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()
) {
//4.创建交换机
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
exchange:创建交换机的名称
type:交换机的类型
durable:是否进行持久化存储
autoDelete:是否自动删除
internal:是否内部使用 一般都设置为false
arguements:参数
*/
String exchangeName="test_fanout";//自定义要创建的交换机名称
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);

创建交换机方法

exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)

exchange: 创建交换机的名称
type: 交换机的类型 订阅模式下要指定交换机的类型为fanout
durable: 是否进行持久化存储
autoDelete: 是否自动删除
internal: 是否内部使用 一般都设置为false
arguements: 参数

5.创建多个队列

//5.创建多个队列
String queueName1="fanout_queue1"; //队列1名称
String queueName2="fanout_queue2"; //队列2名称
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);

6.将队列与交换机进行绑定

channel.queueBind(queueName1,exchangeName,"");
channel.queueBind(queueName2,exchangeName,"");

绑定方法

queueBind(String queue, String exchange, String routingKey)

queue: 要绑定的队列名称
exchange: 绑定的交换机名称
routingKey: 路由规则 订阅模式下路由规则为空字符串" ",或其他任意字符串,只要在发送数据时路由规则与绑定的相同即可
7.发送数据
8.关闭资源

消费者端只需要从指定队列获取数据即可

四.路由模式(Routing)

特点:使用交换机,根据不同的routeKey(路由规则)将数据发送到不同的队列中

开发步骤

1.创建连接工厂对象
2.设置参数
3.创建connection与channel对象
4.创建交换机 路由模式下交换机类型为direct
5.创建多个队列
6.将队列与交换机进行绑定,指定routingKey

channel.queueBind(queueName1,exchangeName,"info");//队列1 绑定规则info
channel.queueBind(queueName1,exchangeName,"warning");//队列1 绑定规则warning
channel.queueBind(queueName1,exchangeName,"error");//队列1 绑定规则error
channel.queueBind(queueName2,exchangeName,"error"); //队列2 绑定规则error

单个队列可以进行多次绑定,指定多个路由规则
7.发送数据,指定路由规则

//将warningMessage发送给 路由规则为waring的队列
channel.basicPublish(exchangeName,"warning",null,warningMessage.getBytes());
//将errorMessage 发送给路由规则为error的队列
channel.basicPublish(exchangeName,"error",null,errorMessage.getBytes());

此种情况下队列1可以接收到warningMessage也可以接收到errorMessage,队列2只能接收到errorMessage
8.关闭资源

消费者端只需要从指定队列获取数据即可

通配符模式(topic)

特点:使用交换机转发数据,并将数据转发给符合通配符形式的routingKey的队列
通配符:* 代表一个单词
通配符#: 代表0个或多个单词

开发步骤
1.创建连接工厂对象
2.设置参数
3.创建connection与channel对象
4.创建交换机
5.创建多个队列
6.将队列与交换机进行绑定,此时绑定的路由规则不再是确定的了 而是通配符的方式

channel.queueBind(queueName1,exchangeName,"*.apple"); //队列1绑定 路由规则*.apple
channel.queueBind(queueName2,exchangeName,"*.*"); //队列2绑定 路由规则 *.*
channel.queueBind(queueName3,exchangeName,"#.apple"); //队列3绑定路由规则 #.apple

7.发送数据

channel.basicPublish(exchangeName,"a.apple",null,message1.getBytes());
channel.basicPublish(exchangeName,"apple",null,message2.getBytes());
channel.basicPublish(exchangeName,"a.bb",null,message3.getBytes());

此时队列1只能接收到message1,队列2可以接收到message1,message3,队列3可以接收的message1,message2
8.关闭资源

消费者端只需要从指定队列获取数据即可
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: