RabbitMQ学习笔记4-五种模式之二(simple、work)
2019-06-29 15:44
260 查看
五种模式之二(simple、work)
simple queue模式
模型
描述
是一个一对一的简单模式,即一个生产者和一个消费者。
代码
生产者
package com.lin.rabbit.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.lin.rabbit.utils.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class SimpleProvider { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection con = ConnectionUtils.getConnection(); //创建通道 Channel channel = con.createChannel(); //声明队列 channel.queueDeclare(SimpleConstant.SIMPLE_QUEUE_NAME, false, false, false, null); //发送消息 String msg = "你好"; channel.basicPublish("", SimpleConstant.SIMPLE_QUEUE_NAME, null, msg.getBytes("UTF-8")); channel.close(); con.close(); } }
消费者
package com.lin.rabbit.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.lin.rabbit.utils.ConnectionUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class SimpleConsumer { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection con = ConnectionUtils.getConnection(); //创建通道 Channel channel = con.createChannel(); //声明队列 channel.queueDeclare(SimpleConstant.SIMPLE_QUEUE_NAME, false, false, false, null); //消息获取 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("receiver msg:"+msg); } }; channel.basicConsume(SimpleConstant.SIMPLE_QUEUE_NAME, true, defaultConsumer); } }
辅助类
package com.lin.rabbit.simple; public class SimpleConstant { public static String SIMPLE_QUEUE_NAME = "simple-queue"; }
演示效果
1) 启动消费者
2) 启动生产者
3) 查看消费者控制台
4) 查看控制台queue
work queue模式
模型
描述
一个生产者,对应多个消费者。队列是一个,可以通过轮询模式、公平模式分发到不同客户端。轮询模式与公平模式区别在于,轮询模式是按照消费者一人一条信息发送,而公平模式是谁完成消息后,就再给予分配消息。
轮询模式与公平模式在代码中要设置2个地方:
1) 设置发送频率,每次一条:channel.basicQos(1),消费者和生产者都需设置。
2) 消费者关闭自动应答,改为手动应答模式。
本例子代码中演示一个公平模式下的分发。红色加粗为公平模式与轮询模式不一样的地方。
代码
生产者
package com.lin.rabbit.workfair; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.lin.rabbit.utils.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class WorkFairProvider { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection con = ConnectionUtils.getConnection(); //创建通道 Channel channel = con.createChannel(); //声明队列 channel.queueDeclare(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, false, false, null); //设置每次发送一条 channel.basicQos(1); //发送消息 for(int i=0;i<50;i++) { String msg = "你好"+i; channel.basicPublish("", WorkFairConstant.WORK_FAIR_QUEUE_NAME, null, msg.getBytes("UTF-8")); System.out.println("send msg:"+msg); } //关闭通道和连接 channel.close(); con.close(); } }
消费者1
package com.lin.rabbit.workfair; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.lin.rabbit.utils.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class WorkFairConsumer1 { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection con = ConnectionUtils.getConnection(); //创建通道 final Channel channel = con.createChannel(); //声明队列 channel.queueDeclare(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, false, false, null); //设置每次获取一条 channel.basicQos(1); //获取消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope 7ff7 , BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println(WorkFairConsumer1.class.getName()+" receiver work queue msg:"+msg); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, defaultConsumer); } }
消费者2
package com.lin.rabbit.workfair; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.lin.rabbit.utils.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class WorkFairConsumer2 { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection con = ConnectionUtils.getConnection(); //创建通道 final Channel channel = con.createChannel(); //声明队列 channel.queueDeclare(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, false, false, null); //设置每次获取一条 channel.basicQos(1); //获取消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println(WorkFairConsumer2.class.getName()+" receiver work queue msg:"+msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, defaultConsumer); } }
辅助类
package com.lin.rabbit.workfair; public class WorkFairConstant { public static String WORK_FAIR_QUEUE_NAME = "work-fair-queue"; }
演示效果
公平模式会导致消费者1和消费者2处理消息数量不一致,因为2者的处理速度不一致导致的。
1) 启动2个消费者
2) 启动生产者
3) 查看消费者1
4) 查看消费者2
5) 查看控制台queue
相关文章推荐
- RabbitMQ五种消息队列学习(三)--Work模式
- RabbitMQ五种消息队列学习(六)--通配符模式(路由类型:Topic)
- 设计模式学习笔记--简单工厂模式(Simple Factory Pattern)【创建型模式】
- [设计模式学习笔记][之二]面象对象单挑结构化设计
- rabbitMQ学习笔记(3):Work Queues
- PHP面向对象学习笔记之二 生成对象的设计模式
- 设计模式C++学习笔记之二(Proxy代理模式)
- 阿Sam的设计模式学习笔记----Factory模式(之Simple Factory)
- 设计模式C++学习笔记之二(Proxy代理模式)
- 设计模式C++学习笔记之二(Proxy代理模式)
- ListView学习笔记之二------SimpleAdapter, BaseAdapter构造自己的Adapter
- PHP面向对象学习笔记之二 生成对象的设计模式
- 设计模式学习笔记三:简单工厂(Simple Factory)
- RabbitMQ 学习笔记(二):work queues
- 设计模式学习笔记——简单工厂(Simple Factory)
- 设计模式C++学习笔记之二(Proxy代理模式)
- 设计模式学习笔记之二---单例模式
- 设计模式学习笔记三:简单工厂(Simple Factory)
- 模式识别学习笔记之二:模式识别的重要环节——学习