您的位置:首页 > 编程语言 > Java开发

Java并发编程-10-在锁中使用多条件-生产者消费者问题

2015-06-16 11:16 836 查看
一、在锁中使用条件

一个锁可能关联一个或多个条件,这些条件通过Condition接口声明

目的是允许线程获取锁并且查看等待的某一个条件是否满足,如果不满足就挂起直到某个线程唤醒它们

二、模拟实现

fileMock是一个文件模拟类

Buffer是一个数据缓冲区

一个或者多个生产者读取fileMock的所有数据行,并且使用insert()方法将读取的数据行插入到缓冲区

三、注意

1、与锁绑定的所有条件对象都是通过Lock接口的newCondiction()方法创建的。

2、在使用条件的时候,必须获取这个条件绑定的锁,所以带条件的代码必须在调用Lock对象的Lock()方法和unlock()方法之间

3、当线程调用条件await()方法时,它将自动释放这个条件绑定的锁,其它线程才可以获取这个锁并且执行相同的操作,或者执行这个锁保护的另一个临界区代码

4、当一个线程调用了对象的signal()或者signallAll()方法后,一个或多个在该条件上挂起的线程将被唤醒

四、condition接口



测试代码:

package com.concurrent.threadSynchronize;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 数据缓冲区类
*
* @author Nicholas
*
* 被生产者和消费者共享
*/
public class Buffer {

private LinkedList<String> buffer; // 存放共享数据
private int maxSize; // buffer的长度
private ReentrantLock reentrantLock; // 对修改buffer的代码块进行控制
private Condition lines; // 两个condition属性
private Condition space;
private boolean pendingLines; // 判断缓冲区是否还有数据

public Buffer(int maxSize) {
this.maxSize = maxSize;
buffer = new LinkedList<>();
reentrantLock = new ReentrantLock();
lines = reentrantLock.newCondition();
space = reentrantLock.newCondition();
pendingLines = true;
}

/**
* 参数是传入的字符串,将字符串写入到缓冲区
* @param line
*/
public void insert(String line) {

//首先要获取锁
reentrantLock.lock();

//然后判断缓冲区是否还有空位
try {
//缓冲区已满,调用条件space的await()方法等待,知道出现空位
while (buffer.size() == maxSize) {
space.await();
}
//将line插入offer的末尾
buffer.offer(line);
System.out.println(Thread.currentThread().getName()
+ "Inserted line " + buffer.size());
//调用条件lines的signalAll()方法唤醒所有的线程
lines.signalAll();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}

/**
* 获取缓冲区的第一个字符串
* @return
*/
public String get() {
String line = null;

//首先要获取锁
reentrantLock.lock();

//缓冲区是空的,那么调用条件lines的await()方法等待
try {
while ((buffer.size() == 0) && (hasPendingLines())) {
lines.await();
}

//缓冲区有数据,那么取出buffer的第一个字符串
if (hasPendingLines()) {
line = buffer.poll();
System.out.println(Thread.currentThread().getName()
+ "line read " + buffer.size());

//调用条件space的signalAll()方法唤醒所有等待的线程
space.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}

//将拿到的字符串返回
return line;
}

public boolean hasPendingLines() {
return pendingLines || buffer.size() > 0;
}

public void setPendingLines(boolean pendingLines) {
this.pendingLines = pendingLines;
}
}

package com.concurrent.threadSynchronize;

public class FileMock {

private String[] context;// 存储文件内容
private int index;// 行号

//使用随机的字符串初始化
public FileMock(int size, int length) {
context = new String[size];
for (int i = 0; i < size; i++) {
StringBuilder stringBuilder = new StringBuilder(length);
for (int j = 0; j < length; j++) {
int indice = (int) (Math.random() * 255);
stringBuilder.append((char) indice);
}
context[i] = stringBuilder.toString();
}
index = 0;
}

/**
* 如果文件有可以处理的数据行,则返回true
* 负责,返回false
* @return
*/
public boolean hasMoreLines() {
return index < context.length;
}

/**
* 返回属性index指定的行内容,并将index自动增加1
* @return
*/
public String getLine() {
if (this.hasMoreLines()) {
System.out.println("Mock : " + (context.length - index));
return context[index++];
}
return null;
}
}

package com.concurrent.threadSynchronize;

import java.util.Random;

public class Consumer implements Runnable {

private Buffer buffer;

public Consumer(Buffer buffer) {
this.buffer = buffer;
}

@Override
public void run() {
while (buffer.hasPendingLines()) {
String line = buffer.get();
processLine(line);
}
}

public void processLine(String line) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

package com.concurrent.threadSynchronize;

public class Producer implements Runnable {

private FileMock fileMock;
private Buffer buffer;

public Producer(FileMock fileMock, Buffer buffer) {
this.fileMock = fileMock;
this.buffer = buffer;
}

@Override
public void run() {
buffer.setPendingLines(true);
while (fileMock.hasMoreLines()) {
String line = fileMock.getLine();
buffer.insert(line);
}
buffer.setPendingLines(false);
}
}

package com.concurrent.threadSynchronize;

public class Main {

public static void main(String[] args) {
FileMock fileMock = new FileMock(100, 10);
Buffer buffer = new Buffer(20);

Producer producer = new Producer(fileMock, buffer);
Thread threadProducer = new Thread(producer, "Producer");

Consumer[] consumer = new Consumer[3];
Thread[] threadConsumer = new Thread[3];

for (int i = 0; i < 3; i++) {
consumer[i] = new Consumer(buffer);
threadConsumer[i] = new Thread(consumer[i], "Consumer " + i);
}

threadProducer.start();
for (int i = 0; i < 3; i++) {
threadConsumer[i].start();
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: