Java并发编程-10-在锁中使用多条件-生产者消费者问题
2015-06-16 11:16
836 查看
一、在锁中使用条件
一个锁可能关联一个或多个条件,这些条件通过Condition接口声明
目的是允许线程获取锁并且查看等待的某一个条件是否满足,如果不满足就挂起直到某个线程唤醒它们
二、模拟实现
fileMock是一个文件模拟类
Buffer是一个数据缓冲区
一个或者多个生产者读取fileMock的所有数据行,并且使用insert()方法将读取的数据行插入到缓冲区
三、注意
1、与锁绑定的所有条件对象都是通过Lock接口的newCondiction()方法创建的。
2、在使用条件的时候,必须获取这个条件绑定的锁,所以带条件的代码必须在调用Lock对象的Lock()方法和unlock()方法之间
3、当线程调用条件await()方法时,它将自动释放这个条件绑定的锁,其它线程才可以获取这个锁并且执行相同的操作,或者执行这个锁保护的另一个临界区代码
4、当一个线程调用了对象的signal()或者signallAll()方法后,一个或多个在该条件上挂起的线程将被唤醒
四、condition接口
测试代码:
package com.concurrent.threadSynchronize;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 数据缓冲区类
*
* @author Nicholas
*
* 被生产者和消费者共享
*/
public class Buffer {
private LinkedList<String> buffer; // 存放共享数据
private int maxSize; // buffer的长度
private ReentrantLock reentrantLock; // 对修改buffer的代码块进行控制
private Condition lines; // 两个condition属性
private Condition space;
private boolean pendingLines; // 判断缓冲区是否还有数据
public Buffer(int maxSize) {
this.maxSize = maxSize;
buffer = new LinkedList<>();
reentrantLock = new ReentrantLock();
lines = reentrantLock.newCondition();
space = reentrantLock.newCondition();
pendingLines = true;
}
/**
* 参数是传入的字符串,将字符串写入到缓冲区
* @param line
*/
public void insert(String line) {
//首先要获取锁
reentrantLock.lock();
//然后判断缓冲区是否还有空位
try {
//缓冲区已满,调用条件space的await()方法等待,知道出现空位
while (buffer.size() == maxSize) {
space.await();
}
//将line插入offer的末尾
buffer.offer(line);
System.out.println(Thread.currentThread().getName()
+ "Inserted line " + buffer.size());
//调用条件lines的signalAll()方法唤醒所有的线程
lines.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
/**
* 获取缓冲区的第一个字符串
* @return
*/
public String get() {
String line = null;
//首先要获取锁
reentrantLock.lock();
//缓冲区是空的,那么调用条件lines的await()方法等待
try {
while ((buffer.size() == 0) && (hasPendingLines())) {
lines.await();
}
//缓冲区有数据,那么取出buffer的第一个字符串
if (hasPendingLines()) {
line = buffer.poll();
System.out.println(Thread.currentThread().getName()
+ "line read " + buffer.size());
//调用条件space的signalAll()方法唤醒所有等待的线程
space.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
//将拿到的字符串返回
return line;
}
public boolean hasPendingLines() {
return pendingLines || buffer.size() > 0;
}
public void setPendingLines(boolean pendingLines) {
this.pendingLines = pendingLines;
}
}
一个锁可能关联一个或多个条件,这些条件通过Condition接口声明
目的是允许线程获取锁并且查看等待的某一个条件是否满足,如果不满足就挂起直到某个线程唤醒它们
二、模拟实现
fileMock是一个文件模拟类
Buffer是一个数据缓冲区
一个或者多个生产者读取fileMock的所有数据行,并且使用insert()方法将读取的数据行插入到缓冲区
三、注意
1、与锁绑定的所有条件对象都是通过Lock接口的newCondiction()方法创建的。
2、在使用条件的时候,必须获取这个条件绑定的锁,所以带条件的代码必须在调用Lock对象的Lock()方法和unlock()方法之间
3、当线程调用条件await()方法时,它将自动释放这个条件绑定的锁,其它线程才可以获取这个锁并且执行相同的操作,或者执行这个锁保护的另一个临界区代码
4、当一个线程调用了对象的signal()或者signallAll()方法后,一个或多个在该条件上挂起的线程将被唤醒
四、condition接口
测试代码:
package com.concurrent.threadSynchronize;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 数据缓冲区类
*
* @author Nicholas
*
* 被生产者和消费者共享
*/
public class Buffer {
private LinkedList<String> buffer; // 存放共享数据
private int maxSize; // buffer的长度
private ReentrantLock reentrantLock; // 对修改buffer的代码块进行控制
private Condition lines; // 两个condition属性
private Condition space;
private boolean pendingLines; // 判断缓冲区是否还有数据
public Buffer(int maxSize) {
this.maxSize = maxSize;
buffer = new LinkedList<>();
reentrantLock = new ReentrantLock();
lines = reentrantLock.newCondition();
space = reentrantLock.newCondition();
pendingLines = true;
}
/**
* 参数是传入的字符串,将字符串写入到缓冲区
* @param line
*/
public void insert(String line) {
//首先要获取锁
reentrantLock.lock();
//然后判断缓冲区是否还有空位
try {
//缓冲区已满,调用条件space的await()方法等待,知道出现空位
while (buffer.size() == maxSize) {
space.await();
}
//将line插入offer的末尾
buffer.offer(line);
System.out.println(Thread.currentThread().getName()
+ "Inserted line " + buffer.size());
//调用条件lines的signalAll()方法唤醒所有的线程
lines.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
/**
* 获取缓冲区的第一个字符串
* @return
*/
public String get() {
String line = null;
//首先要获取锁
reentrantLock.lock();
//缓冲区是空的,那么调用条件lines的await()方法等待
try {
while ((buffer.size() == 0) && (hasPendingLines())) {
lines.await();
}
//缓冲区有数据,那么取出buffer的第一个字符串
if (hasPendingLines()) {
line = buffer.poll();
System.out.println(Thread.currentThread().getName()
+ "line read " + buffer.size());
//调用条件space的signalAll()方法唤醒所有等待的线程
space.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
//将拿到的字符串返回
return line;
}
public boolean hasPendingLines() {
return pendingLines || buffer.size() > 0;
}
public void setPendingLines(boolean pendingLines) {
this.pendingLines = pendingLines;
}
}
package com.concurrent.threadSynchronize; public class FileMock { private String[] context;// 存储文件内容 private int index;// 行号 //使用随机的字符串初始化 public FileMock(int size, int length) { context = new String[size]; for (int i = 0; i < size; i++) { StringBuilder stringBuilder = new StringBuilder(length); for (int j = 0; j < length; j++) { int indice = (int) (Math.random() * 255); stringBuilder.append((char) indice); } context[i] = stringBuilder.toString(); } index = 0; } /** * 如果文件有可以处理的数据行,则返回true * 负责,返回false * @return */ public boolean hasMoreLines() { return index < context.length; } /** * 返回属性index指定的行内容,并将index自动增加1 * @return */ public String getLine() { if (this.hasMoreLines()) { System.out.println("Mock : " + (context.length - index)); return context[index++]; } return null; } }
package com.concurrent.threadSynchronize; import java.util.Random; public class Consumer implements Runnable { private Buffer buffer; public Consumer(Buffer buffer) { this.buffer = buffer; } @Override public void run() { while (buffer.hasPendingLines()) { String line = buffer.get(); processLine(line); } } public void processLine(String line) { try { Random random = new Random(); Thread.sleep(random.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.concurrent.threadSynchronize; public class Producer implements Runnable { private FileMock fileMock; private Buffer buffer; public Producer(FileMock fileMock, Buffer buffer) { this.fileMock = fileMock; this.buffer = buffer; } @Override public void run() { buffer.setPendingLines(true); while (fileMock.hasMoreLines()) { String line = fileMock.getLine(); buffer.insert(line); } buffer.setPendingLines(false); } }
package com.concurrent.threadSynchronize; public class Main { public static void main(String[] args) { FileMock fileMock = new FileMock(100, 10); Buffer buffer = new Buffer(20); Producer producer = new Producer(fileMock, buffer); Thread threadProducer = new Thread(producer, "Producer"); Consumer[] consumer = new Consumer[3]; Thread[] threadConsumer = new Thread[3]; for (int i = 0; i < 3; i++) { consumer[i] = new Consumer(buffer); threadConsumer[i] = new Thread(consumer[i], "Consumer " + i); } threadProducer.start(); for (int i = 0; i < 3; i++) { threadConsumer[i].start(); } } }
相关文章推荐
- java进行时间的方法格式化和时间的差值
- 【第十一章】 SSH集成开发积分商城 之 11.1 概述 ——跟我学spring3
- 【第十章】集成其它Web框架 之 10.4 集成JSF ——跟我学spring3
- 【第十章】集成其它Web框架 之 10.3 集成Struts2.x ——跟我学spring3
- 【第十章】集成其它Web框架 之 10.2 集成Struts1.x ——跟我学spring3
- 【第十章】集成其它Web框架 之 10.1 概述 ——跟我学spring3
- 【第九章】 Spring的事务 之 9.4 声明式事务 ——跟我学spring3
- 【第九章】 Spring的事务 之 9.3 编程式事务 ——跟我学spring3
- Java基础 集合Map
- Java线程:并发协作-死锁
- 【第九章】 Spring的事务 之 9.2 事务管理器 ——跟我学spring3
- eclipse 通过反编译插件查看源码
- 【第九章】 Spring的事务 之 9.1 数据库事务概述 ——跟我学spring3
- Java线程:线程的同步-同步方法
- 【第八章】 对ORM的支持 之 8.4 集成JPA ——跟我学spring3
- 【第八章】 对ORM的支持 之 8.3 集成iBATIS ——跟我学spring3
- Java 继承详解
- 【第八章】 对ORM的支持 之 8.2 集成Hibernate3 ——跟我学spring3
- 入门级菜鸟需要的Eclipse环境
- 【第八章】 对ORM的支持 之 8.1 概述 ——跟我学spring3