生产者/消费者模式(阻塞队列) 一个经典的并发模型
2017-09-18 23:06
423 查看
生产消费者模式也是关于线程阻塞的问题,生产消费者模式是通过观察者模式来实现的。之前在编写一个通讯软件的时候用到了这种模式,通过维护一个BlockingQueue来完成Socket的消息发送,后来读书时看到在服务器开发时三层模型中的Service层在调用Dao层的时候也是通过这种模式来调用的,具体怎么使用的还没有具体实践过,期待后面可以有机会练习这一块。
实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。
解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
支持并发(concurrency)
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。
支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
用了两种方式实现了一下这个模式
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
生产者:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
队列:
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
测试代码:
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
在JDK1.5以上使用了Lock锁来实现:
实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。
解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
支持并发(concurrency)
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。
支持忙闲不均
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
用了两种方式实现了一下这个模式
1.方法一:
消费者:public class TestConsumer implements Runnable { TestQueue queue; public TestConsumer() { // TODO Auto-generated constructor stub } public TestConsumer(TestQueue obj) { this.queue = obj; } @Override public void run() { for (int i = 0; i < 10; i++) { try { queue.consumer(); } catch (Exception e) { e.printStackTrace(); } } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
生产者:
public class TestProduct implements Runnable { TestQueue t; public TestProduct() { } public TestProduct(TestQueue t) { this.t = t; } public void run() { for (int i = 0; i < 10; i++) { try { t.product("test" + i); } catch (Exception e) { e.printStackTrace(); } } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
队列:
public class TestQueue { public static Object signal = new Object(); boolean bFull = false; private List thingsList = new ArrayList(); /** * 生产 * * @param thing * @throws Exception */ public void product(String thing) throws Exception { synchronized (signal) { if (!bFull) { bFull = true; System.out.println("product"); thingsList.add(thing); signal.notify(); // 然后通知消费者 } } } /** * 消费 * * @return * @throws Exception */ public String consumer() { synchronized (signal) { if (!bFull) { // 队列为空。等待..... try { signal.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 进入signal待召队列,等待生产者的通知 } bFull = false; // 读取buf 共享资源里面的东西 System.out.println("consume"); signal.notify(); // 然后通知生产者 } String result = ""; if (thingsList.size() > 0) { result = thingsList.get(thingsList.size() - 1).toString(); thingsList.remove(thingsList.size() - 1); } return result; } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
测试代码:
public class Application { @Test public void test(){ TestQueue queue = new TestQueue(); Thread customer = new Thread(new TestConsumer(queue)); Thread product = new Thread(new TestProduct(queue)); customer.start(); product.start(); } }1
2
3
4
5
6
7
8
9
10
2.方法二:
使用java.util.concurrent.BlockingQueue类来重写的队列那个类,使用这个方法比较简单。直接看JDK提供的democlass Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
在JDK1.5以上使用了Lock锁来实现:
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
相关文章推荐
- 生产者/消费者模式(阻塞队列) 一个经典的并发模型
- 生产者/消费者模式(阻塞队列) 一个经典的并发模型
- 生产者/消费者模式(阻塞队列) 一个类似于监听者模式的并发模型
- Java并发编程笔记 使用阻塞队列实现生产者-消费者模型
- 【Java并发】阻塞队列BlockingQueue和生产者-消费者模型笔记
- 并发编程 03—— 阻塞队列和生产者-消费者模式
- 并发:阻塞队列与生产者消费者模型
- 高并发:阻塞队列 实现生产者-消费者模式
- java消费者生产者模式及JDK之阻塞队列LinkedBlockingQueue实现
- 生产者消费者问题 这是一个非常经典的多线程题目,题目大意如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个有多个缓冲区的缓冲池,生产者
- 多线程+阻塞队列实现生产者-消费者模型获取队列数据问题
- 生产者/消费者模式(阻塞队列)
- 阻塞队列BlockingQueue实现生产者-消费者模式值桌面搜索
- 生产者消费者问题 这是一个非常经典的多线程题目,题目大意如下:有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个有多个缓冲区的缓冲池,生产者
- 生产者/消费者模式(阻塞队列)
- 11.python并发入门(part8 基于线程队列实现生产者消费者模型)
- 生产者/消费者模式(阻塞队列)
- 阻塞队列和生产者-消费者模式、DelayQueue
- 阻塞队列和生产者-消费者模式
- JAVA 阻塞队列实现 生产者和消费者 模式