您的位置:首页 > 其它

RabbitMQ学习笔记4-五种模式之二(simple、work)

2019-06-29 15:44 260 查看

五种模式之二(simple、work)

  • work queue模式
  • simple queue模式

    模型

    描述

    是一个一对一的简单模式,即一个生产者和一个消费者。

    代码

    生产者

    package com.lin.rabbit.simple;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.lin.rabbit.utils.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    public class SimpleProvider {
    
    public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection con = ConnectionUtils.getConnection();
    //创建通道
    Channel channel = con.createChannel();
    //声明队列
    channel.queueDeclare(SimpleConstant.SIMPLE_QUEUE_NAME, false, false, false, null);
    //发送消息
    String msg = "你好";
    channel.basicPublish("", SimpleConstant.SIMPLE_QUEUE_NAME, null, msg.getBytes("UTF-8"));
    channel.close();
    con.close();
    }
    }

    消费者

    package com.lin.rabbit.simple;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.lin.rabbit.utils.ConnectionUtils;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    public class SimpleConsumer {
    
    public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection con = ConnectionUtils.getConnection();
    //创建通道
    Channel channel = con.createChannel();
    //声明队列
    channel.queueDeclare(SimpleConstant.SIMPLE_QUEUE_NAME, false, false, false, null);
    //消息获取
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    throws IOException {
    String msg = new String(body,"UTF-8");
    System.out.println("receiver msg:"+msg);
    }
    
    };
    channel.basicConsume(SimpleConstant.SIMPLE_QUEUE_NAME, true, defaultConsumer);
    }
    }

    辅助类

    package com.lin.rabbit.simple;
    public class SimpleConstant {
    
    public static String SIMPLE_QUEUE_NAME = "simple-queue";
    }

    演示效果

    1) 启动消费者
    2) 启动生产者

    3) 查看消费者控制台

    4) 查看控制台queue

    work queue模式

    模型

    描述

    一个生产者,对应多个消费者。队列是一个,可以通过轮询模式、公平模式分发到不同客户端。轮询模式与公平模式区别在于,轮询模式是按照消费者一人一条信息发送,而公平模式是谁完成消息后,就再给予分配消息。
    轮询模式与公平模式在代码中要设置2个地方:
    1) 设置发送频率,每次一条:channel.basicQos(1),消费者和生产者都需设置。
    2) 消费者关闭自动应答,改为手动应答模式。
    本例子代码中演示一个公平模式下的分发。红色加粗为公平模式与轮询模式不一样的地方。

    代码

    生产者

    package com.lin.rabbit.workfair;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.lin.rabbit.utils.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    public class WorkFairProvider {
    public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection con = ConnectionUtils.getConnection();
    //创建通道
    Channel channel = con.createChannel();
    //声明队列
    channel.queueDeclare(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, false, false, null);
    //设置每次发送一条
    channel.basicQos(1);
    //发送消息
    for(int i=0;i<50;i++) {
    String msg = "你好"+i;
    channel.basicPublish("", WorkFairConstant.WORK_FAIR_QUEUE_NAME, null, msg.getBytes("UTF-8"));
    System.out.println("send msg:"+msg);
    }
    //关闭通道和连接
    channel.close();
    con.close();
    }
    }

    消费者1

    package com.lin.rabbit.workfair;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.lin.rabbit.utils.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    public class WorkFairConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection con = ConnectionUtils.getConnection();
    //创建通道
    final Channel channel = con.createChannel();
    //声明队列
    channel.queueDeclare(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, false, false, null);
    //设置每次获取一条
    channel.basicQos(1);
    //获取消息
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope
    7ff7
    , BasicProperties properties, byte[] body)
    throws IOException {
    String msg = new String(body,"UTF-8");
    System.out.println(WorkFairConsumer1.class.getName()+" receiver work queue msg:"+msg);
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    
    };
    channel.basicConsume(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, defaultConsumer);
    }
    }

    消费者2

    package com.lin.rabbit.workfair;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.lin.rabbit.utils.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    public class WorkFairConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection con = ConnectionUtils.getConnection();
    //创建通道
    final Channel channel = con.createChannel();
    //声明队列
    channel.queueDeclare(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, false, false, null);
    //设置每次获取一条
    channel.basicQos(1);
    //获取消息
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    throws IOException {
    String msg = new String(body,"UTF-8");
    System.out.println(WorkFairConsumer2.class.getName()+" receiver work queue msg:"+msg);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }finally {
    channel.basicAck(envelope.getDeliveryTag(), false);
    }
    }
    
    };
    channel.basicConsume(WorkFairConstant.WORK_FAIR_QUEUE_NAME, false, defaultConsumer);
    }
    }

    辅助类

    package com.lin.rabbit.workfair;
    public class WorkFairConstant {
    public static String WORK_FAIR_QUEUE_NAME = "work-fair-queue";
    }

    演示效果

    公平模式会导致消费者1和消费者2处理消息数量不一致,因为2者的处理速度不一致导致的。
    1) 启动2个消费者
    2) 启动生产者

    3) 查看消费者1

    4) 查看消费者2

    5) 查看控制台queue

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: