您的位置:首页 > 其它

RabbitMQ 工作队列

2016-10-31 09:31 447 查看
  创建一个工作队列用来在工作者(consumer)间分发耗时任务。

  工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。

  1、 准备

  我们使用Thread.sleep来模拟耗时的任务。我们在发送到队列的消息的末尾添加一定数量的点,每个点代表在工作线程中需要耗时1秒,例如hello…将会需要等待3秒。

  发送端:

1 public class NewTask
2 {
3     //队列名称
4     private final static String QUEUE_NAME = "workqueue";
5
6     public static void main(String[] args) throws IOException
7     {
8         //创建连接和频道
9         ConnectionFactory factory = new ConnectionFactory();
10         factory.setHost("localhost");
11         Connection connection = factory.newConnection();
12         Channel channel = connection.createChannel();
13         //声明队列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15         //发送10条消息,依次在消息后面附加1-10个点
16         for (int i = 0; i < 10; i++)
17         {
18             String dots = "";
19             for (int j = 0; j <= i; j++)
20             {
21                 dots += ".";
22             }
23             String message = "helloworld" + dots+dots.length();
24             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
25             System.out.println(" [x] Sent '" + message + "'");
26         }
27         //关闭频道和资源
28         channel.close();
29         connection.close();
30
31     }
32
33
34 }


  接收端:

1 public class Work
2 {
3     //队列名称
4     private final static String QUEUE_NAME = "workqueue";
5
6     public static void main(String[] argv) throws java.io.IOException,
7             java.lang.InterruptedException
8     {
9         //区分不同工作进程的输出
10         int hashCode = Work.class.hashCode();
11         //创建连接和频道
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setHost("localhost");
14         Connection connection = factory.newConnection();
15         Channel channel = connection.createChannel();
16         //声明队列
17         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
18         System.out.println(hashCode
19                 + " [*] Waiting for messages. To exit press CTRL+C");
20
21         QueueingConsumer consumer = new QueueingConsumer(channel);
22         // 指定消费队列
23         channel.basicConsume(QUEUE_NAME, true, consumer);
24         while (true)
25         {
26             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
27             String message = new String(delivery.getBody());
28
29             System.out.println(hashCode + " [x] Received '" + message + "'");
30             doWork(message);
31             System.out.println(hashCode + " [x] Done");
32
33         }
34
35     }
36
37     /**
38      * 每个点耗时1s
39      * @param task
40      * @throws InterruptedException
41      */
42     private static void doWork(String task) throws InterruptedException
43     {
44         for (char ch : task.toCharArray())
45         {
46             if (ch == '.')
47                 Thread.sleep(1000);
48         }
49     }
50 }


  Round-robin 转发
  使用任务队列的好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。
下面我们先运行3个工作者(Work.java)实例,然后运行NewTask.java,3个工作者实例都会得到信息。  

1 [x] Sent 'helloworld......6'
2 [x] Sent 'helloworld.....5'
3 [x] Sent 'helloworld....4'
4 [x] Sent 'helloworld...3'
5 [x] Sent 'helloworld..2'
6 工作者1:
7 18019860 [*] Waiting for messages. To exit press CTRL+C
8 18019860 [x] Received 'helloworld......6'
9 18019860 [x] Done
10 18019860 [x] Received 'helloworld...3'
11 18019860 [x] Done
12 工作者2:
13 31054905 [*] Waiting for messages. To exit press CTRL+C
14 31054905 [x] Received 'helloworld.....5'
15 31054905 [x] Done
16 31054905 [x] Received 'helloworld....4'
17 31054905 [x] Done
18 31054905 [x] Received 'helloworld..2'
19 31054905 [x] Done


View Code
  可以看出此时并没有按照之前的Round-robin机制进行转发消息,而是当消费者不忙时进行转发。且这种模式下支持动态增加消费者,因为消息并没有发送出去,动态增加了消费者马上投入工作。而默认的转发机制会造成,即使动态增加了消费者,此时的消息已经分配完毕,无法立即加入工作,即使有很多未完成的任务。

  完整代码:

1 public class NewTask
2 {
3     // 队列名称
4     private final static String QUEUE_NAME = "workqueue_persistence";
5
6     public static void main(String[] args) throws IOException
7     {
8         // 创建连接和频道
9         ConnectionFactory factory = new ConnectionFactory();
10         factory.setHost("localhost");
11         Connection connection = factory.newConnection();
12         Channel channel = connection.createChannel();
13         // 声明队列
14         boolean durable = true;// 1、设置队列持久化
15         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
16         // 发送10条消息,依次在消息后面附加1-10个点
17         for (int i = 5; i > 0; i--)
18         {
19             String dots = "";
20             for (int j = 0; j <= i; j++)
21             {
22                 dots += ".";
23             }
24             String message = "helloworld" + dots + dots.length();
25             // MessageProperties 2、设置消息持久化
26             channel.basicPublish("", QUEUE_NAME,
27                     MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
28             System.out.println(" [x] Sent '" + message + "'");
29         }
30         // 关闭频道和资源
31         channel.close();
32         connection.close();
33
34     }
35
36 }


1 public class Work
2 {
3     // 队列名称
4     private final static String QUEUE_NAME = "workqueue_persistence";
5
6     public static void main(String[] argv) throws java.io.IOException,
7             java.lang.InterruptedException
8     {
9         // 区分不同工作进程的输出
10         int hashCode = Work.class.hashCode();
11         // 创建连接和频道
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setHost("localhost");
14         Connection connection = factory.newConnection();
15         Channel channel = connection.createChannel();
16         // 声明队列
17         boolean durable = true;
18         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
19         System.out.println(hashCode
20                 + " [*] Waiting for messages. To exit press CTRL+C");
21         //设置最大服务转发消息数量
22         int prefetchCount = 1;
23         channel.basicQos(prefetchCount);
24         QueueingConsumer consumer = new QueueingConsumer(channel);
25         // 指定消费队列
26         boolean ack = false; // 打开应答机制
27         channel.basicConsume(QUEUE_NAME, ack, consumer);
28         while (true)
29         {
30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
31             String message = new String(delivery.getBody());
32
33             System.out.println(hashCode + " [x] Received '" + message + "'");
34             doWork(message);
35             System.out.println(hashCode + " [x] Done");
36             //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
37             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
38
39         }
40
41     }
42
43     /**
44      * 每个点耗时1s
45      *
46      * @param task
47      * @throws InterruptedException
48      */
49     private static void doWork(String task) throws InterruptedException
50     {
51         for (char ch : task.toCharArray())
52         {
53             if (ch == '.')
54                 Thread.sleep(1000);
55         }
56     }
57 }


参考博客:http://blog.csdn.net/lmj623565791/article/details/37620057
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: