Producer Consumer problem - mutex and semaphore
2015-05-12 12:16
411 查看
很典型的一道多线程题目。如果使用BlockingQueue,就十分简单。但是要注意blockingqueue的语法,用put/take
下面是自己实现的两个线程,使用mutex 方法 wait() and notify()
输出:
几个需要注意的地方:
1. 因为两个Thread都对queue进行操作,所有读写操作都需要用synchronizd关键字保证mutex
2. 在Producer里面我们有一个地方是while(queue.size()>1)。这么做是因为我想生产者消费者轮流放入和取出。如果没有这个while循环,生产者会任意生产10个数,而消费者取出的顺序也会随机。
比如:
用信号量实现同样的功能就简单了许多,不再使用synchronized关键字,不需要while循环。
为什么呢? 因为第一种方法实现的是monitor,使用busy wait,他会不断的check;而信号量就像红绿灯一样,一个thread通知下一个thread,下一个thread才可以执行,所以两个thread都不需要busy wait或者不可能有两个thread同时操作一个变量的情况。
package concurrent; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ProducerConsumerBlockingQueue { static BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } private static class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 5; i++) { try { queue.put(i); Thread.sleep(100); } catch (InterruptedException e) { } System.out.println("queue offer " + i); } } } private static class Consumer implements Runnable { @Override public void run() { try { System.out.println("queue.take " + queue.take()); System.out.println("queue.take " + queue.take()); System.out.println("queue.take " + queue.take()); System.out.println("queue.take " + queue.take()); System.out.println("queue.take " + queue.take()); } catch (InterruptedException e) { } } } }
下面是自己实现的两个线程,使用mutex 方法 wait() and notify()
package concurrent; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Semaphore; public class ProducerConsumer { static Queue<Integer> queue = new LinkedList<Integer>(); public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } private static class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { while (queue.size() > 1) { try { queue.wait(); } catch (Exception e) { } } synchronized (queue) { queue.offer(i); System.out.println("queue offer " + i); queue.notify(); } } } } private static class Consumer implements Runnable { @Override public void run() { while (true) { synchronized (queue) { while (queue.isEmpty()) { try { queue.wait(); } catch (InterruptedException e) { } } int i = queue.poll(); System.out.println("queue poll " + i); queue.notify(); } } } } }
输出:
queue offer 0 queue offer 1 queue poll 0 queue poll 1 queue offer 2 queue offer 3 queue poll 2 queue poll 3 queue offer 4 queue offer 5 queue poll 4 queue poll 5 queue offer 6 queue offer 7 queue poll 6 queue poll 7 queue offer 8 queue offer 9 queue poll 8 queue poll 9
几个需要注意的地方:
1. 因为两个Thread都对queue进行操作,所有读写操作都需要用synchronizd关键字保证mutex
2. 在Producer里面我们有一个地方是while(queue.size()>1)。这么做是因为我想生产者消费者轮流放入和取出。如果没有这个while循环,生产者会任意生产10个数,而消费者取出的顺序也会随机。
比如:
queue offer 0 queue poll 0 queue offer 1 queue poll 1 queue offer 2 queue poll 2 queue offer 3 queue poll 3 queue offer 4 queue offer 5 queue offer 6 queue offer 7 queue offer 8 queue offer 9 queue poll 4 queue poll 5 queue poll 6 queue poll 7 queue poll 8 queue poll 9
用信号量实现同样的功能就简单了许多,不再使用synchronized关键字,不需要while循环。
为什么呢? 因为第一种方法实现的是monitor,使用busy wait,他会不断的check;而信号量就像红绿灯一样,一个thread通知下一个thread,下一个thread才可以执行,所以两个thread都不需要busy wait或者不可能有两个thread同时操作一个变量的情况。
package concurrent; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Semaphore; public class ProducerConsumerSemaphore { static Queue<Integer> queue = new LinkedList<Integer>(); static Semaphore semaphore_consume = new Semaphore(0); static Semaphore semaphore_produce = new Semaphore(1); public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } private static class Producer implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { semaphore_produce.acquire(); } catch (InterruptedException e) { } queue.offer(i); System.out.println("queue offer " + i); semaphore_consume.release(); } } } private static class Consumer implements Runnable { @Override public void run() { while (true) { try { semaphore_consume.acquire(); } catch (InterruptedException e) { } int i = queue.poll(); System.out.println("queue poll " + i); semaphore_produce.release(); } } } }
相关文章推荐
- Internet高级编程作业:Producer-consumer problem implemented by semaphore
- golang for thread channel routine consumer and producer
- MULTITHREADING - PRODUCER AND CONSUMER WITH QUEUE
- Semaphore and Mutex usages and differences
- Producer/Consumer problem
- Difference between mutex and semaphore
- Fast synchronization between a single producer and single consumer
- classic problem: producer and consumer
- C# Thread 多线程 Monitor 锁 Producer And Consumer 生产者和消费者 经典模型
- Solution to the Producer-Consumer problem using Semaphores
- producer and consumer concept ( I )
- 生产者-消费者问题(producer-consumer-problem)
- Apache Kafka - KIP-42: Add Producer and Consumer Interceptors
- producer and consumer 算法
- difference between semaphore and mutex
- Java Lock and Condition Example using Producer Consumer Solution
- java 线程 ProducerAndConsumer
- Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)
- Python KafkaProducer and KafkaConsumer的开发模块
- Python MultiProducer and MultiConsumer