RabbitMQ 工作队列
2016-10-31 09:31
447 查看
创建一个工作队列用来在工作者(consumer)间分发耗时任务。
工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。
1、 准备
我们使用Thread.sleep来模拟耗时的任务。我们在发送到队列的消息的末尾添加一定数量的点,每个点代表在工作线程中需要耗时1秒,例如hello…将会需要等待3秒。
发送端:
接收端:
Round-robin 转发
使用任务队列的好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。
下面我们先运行3个工作者(Work.java)实例,然后运行NewTask.java,3个工作者实例都会得到信息。
View Code
可以看出此时并没有按照之前的Round-robin机制进行转发消息,而是当消费者不忙时进行转发。且这种模式下支持动态增加消费者,因为消息并没有发送出去,动态增加了消费者马上投入工作。而默认的转发机制会造成,即使动态增加了消费者,此时的消息已经分配完毕,无法立即加入工作,即使有很多未完成的任务。
完整代码:
参考博客:http://blog.csdn.net/lmj623565791/article/details/37620057
工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。
1、 准备
我们使用Thread.sleep来模拟耗时的任务。我们在发送到队列的消息的末尾添加一定数量的点,每个点代表在工作线程中需要耗时1秒,例如hello…将会需要等待3秒。
发送端:
1 public class NewTask 2 { 3 //队列名称 4 private final static String QUEUE_NAME = "workqueue"; 5 6 public static void main(String[] args) throws IOException 7 { 8 //创建连接和频道 9 ConnectionFactory factory = new ConnectionFactory(); 10 factory.setHost("localhost"); 11 Connection connection = factory.newConnection(); 12 Channel channel = connection.createChannel(); 13 //声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 //发送10条消息,依次在消息后面附加1-10个点 16 for (int i = 0; i < 10; i++) 17 { 18 String dots = ""; 19 for (int j = 0; j <= i; j++) 20 { 21 dots += "."; 22 } 23 String message = "helloworld" + dots+dots.length(); 24 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 25 System.out.println(" [x] Sent '" + message + "'"); 26 } 27 //关闭频道和资源 28 channel.close(); 29 connection.close(); 30 31 } 32 33 34 }
接收端:
1 public class Work 2 { 3 //队列名称 4 private final static String QUEUE_NAME = "workqueue"; 5 6 public static void main(String[] argv) throws java.io.IOException, 7 java.lang.InterruptedException 8 { 9 //区分不同工作进程的输出 10 int hashCode = Work.class.hashCode(); 11 //创建连接和频道 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setHost("localhost"); 14 Connection connection = factory.newConnection(); 15 Channel channel = connection.createChannel(); 16 //声明队列 17 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 18 System.out.println(hashCode 19 + " [*] Waiting for messages. To exit press CTRL+C"); 20 21 QueueingConsumer consumer = new QueueingConsumer(channel); 22 // 指定消费队列 23 channel.basicConsume(QUEUE_NAME, true, consumer); 24 while (true) 25 { 26 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 27 String message = new String(delivery.getBody()); 28 29 System.out.println(hashCode + " [x] Received '" + message + "'"); 30 doWork(message); 31 System.out.println(hashCode + " [x] Done"); 32 33 } 34 35 } 36 37 /** 38 * 每个点耗时1s 39 * @param task 40 * @throws InterruptedException 41 */ 42 private static void doWork(String task) throws InterruptedException 43 { 44 for (char ch : task.toCharArray()) 45 { 46 if (ch == '.') 47 Thread.sleep(1000); 48 } 49 } 50 }
Round-robin 转发
使用任务队列的好处是能够很容易的并行工作。如果我们积压了很多工作,我们仅仅通过增加更多的工作者就可以解决问题,使系统的伸缩性更加容易。
下面我们先运行3个工作者(Work.java)实例,然后运行NewTask.java,3个工作者实例都会得到信息。
1 [x] Sent 'helloworld......6' 2 [x] Sent 'helloworld.....5' 3 [x] Sent 'helloworld....4' 4 [x] Sent 'helloworld...3' 5 [x] Sent 'helloworld..2' 6 工作者1: 7 18019860 [*] Waiting for messages. To exit press CTRL+C 8 18019860 [x] Received 'helloworld......6' 9 18019860 [x] Done 10 18019860 [x] Received 'helloworld...3' 11 18019860 [x] Done 12 工作者2: 13 31054905 [*] Waiting for messages. To exit press CTRL+C 14 31054905 [x] Received 'helloworld.....5' 15 31054905 [x] Done 16 31054905 [x] Received 'helloworld....4' 17 31054905 [x] Done 18 31054905 [x] Received 'helloworld..2' 19 31054905 [x] Done
View Code
可以看出此时并没有按照之前的Round-robin机制进行转发消息,而是当消费者不忙时进行转发。且这种模式下支持动态增加消费者,因为消息并没有发送出去,动态增加了消费者马上投入工作。而默认的转发机制会造成,即使动态增加了消费者,此时的消息已经分配完毕,无法立即加入工作,即使有很多未完成的任务。
完整代码:
1 public class NewTask 2 { 3 // 队列名称 4 private final static String QUEUE_NAME = "workqueue_persistence"; 5 6 public static void main(String[] args) throws IOException 7 { 8 // 创建连接和频道 9 ConnectionFactory factory = new ConnectionFactory(); 10 factory.setHost("localhost"); 11 Connection connection = factory.newConnection(); 12 Channel channel = connection.createChannel(); 13 // 声明队列 14 boolean durable = true;// 1、设置队列持久化 15 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 16 // 发送10条消息,依次在消息后面附加1-10个点 17 for (int i = 5; i > 0; i--) 18 { 19 String dots = ""; 20 for (int j = 0; j <= i; j++) 21 { 22 dots += "."; 23 } 24 String message = "helloworld" + dots + dots.length(); 25 // MessageProperties 2、设置消息持久化 26 channel.basicPublish("", QUEUE_NAME, 27 MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 28 System.out.println(" [x] Sent '" + message + "'"); 29 } 30 // 关闭频道和资源 31 channel.close(); 32 connection.close(); 33 34 } 35 36 }
1 public class Work 2 { 3 // 队列名称 4 private final static String QUEUE_NAME = "workqueue_persistence"; 5 6 public static void main(String[] argv) throws java.io.IOException, 7 java.lang.InterruptedException 8 { 9 // 区分不同工作进程的输出 10 int hashCode = Work.class.hashCode(); 11 // 创建连接和频道 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setHost("localhost"); 14 Connection connection = factory.newConnection(); 15 Channel channel = connection.createChannel(); 16 // 声明队列 17 boolean durable = true; 18 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 19 System.out.println(hashCode 20 + " [*] Waiting for messages. To exit press CTRL+C"); 21 //设置最大服务转发消息数量 22 int prefetchCount = 1; 23 channel.basicQos(prefetchCount); 24 QueueingConsumer consumer = new QueueingConsumer(channel); 25 // 指定消费队列 26 boolean ack = false; // 打开应答机制 27 channel.basicConsume(QUEUE_NAME, ack, consumer); 28 while (true) 29 { 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody()); 32 33 System.out.println(hashCode + " [x] Received '" + message + "'"); 34 doWork(message); 35 System.out.println(hashCode + " [x] Done"); 36 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 37 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 38 39 } 40 41 } 42 43 /** 44 * 每个点耗时1s 45 * 46 * @param task 47 * @throws InterruptedException 48 */ 49 private static void doWork(String task) throws InterruptedException 50 { 51 for (char ch : task.toCharArray()) 52 { 53 if (ch == '.') 54 Thread.sleep(1000); 55 } 56 } 57 }
参考博客:http://blog.csdn.net/lmj623565791/article/details/37620057
相关文章推荐
- RabbitMQ详解(二) 工作队列
- RabbitMQ (二)工作队列
- RabbitMQ学习之工作队列(java)
- RabbitMQ --- Work Queues(工作队列)
- RabbitMQ之工作队列
- python使用rabbitmq实例二,工作队列
- RabbitMQ-(二)工作队列
- RabbitMQ入门(2)——工作队列
- RabbitMQ-work queue工作队列 和 fair dispatch公平分发
- RabbitMQ 原文译03--工作队列
- rabbitmq工作队列实现高性能任务的负载分发[python实例] 推荐
- 使用rabbitmq工作队列实现任务的负载分发
- python使用rabbitmq实例二,工作队列 (2)
- RabbitMQ系列教程之二:工作队列(Work Queues)
- RabbitMQ 官方NET教程(二)【工作队列】
- RabbitMQ学习心得——工作队列
- 转载RabbitMQ入门(2)--工作队列
- python使用rabbitmq实例二,工作队列
- RabbitMQ (二)工作队列
- RabbitMQ 教程2 工作队列