您的位置:首页 > 编程语言 > Java开发

Java并发编程之(二)管程

2016-05-27 19:23 330 查看
本博客转载自《奔跑的猪的博客》

本文中的代码可以在github上找到,有需要的请自行下载

标题

定义

维基百科中定义管程为:在并发编程中,管程(monitor)为一个同步结构,具有线程互斥特性,以及能够根据某些条件来阻塞线程。根据定义,管程有三个要素:同步、互斥、条件。恰好在Java的Concurrent包中ReentrantLock具有上述所有特性,可以用来实现管程。管程是一个非常实用且常见的技术,可以用来实现很多常用的并发数据结构,例如阻塞队列。

阻塞队列

队列常用于生产者消费者模型,生产者发送消息并存储到队列,消费者从队列中取出消息。一般情况下会存在多个生产者和多个消费者,普通的队列不能保证并发的安全,因此需要用到线程安全的技术。线程安全的队列又可以分为两种:

阻塞队列(BlockingQueue)

非阻塞队列(NoBlockingQueue)

阻塞队列的特征为,当队列为空时会阻塞消费者,当队列满时阻塞生产者。这种机制能够平衡生产者和消费者的负载。

分析上述定义,阻塞队列具有线程安全,阻塞,条件的特性;线程安全和阻塞就意味着要实现同步以及互斥,因此,管程能够很好地满足阻塞队列的要求。

Java源码中的管程

如下代码所示是Concurrent包中ArrayBlockingQueue的实现,队列有两种实现方式,数组和链表。Array是数组的实现,而数组通常是有界的。

ArrayBlockingQueue结构将ReentrantLock(可重入锁)作为全局锁来实现线程安全,所谓全局锁就是整个数据结构中就这么一个锁,当一个线程在访问该对象并获得锁之后,其他线程要访问该对象都得阻塞。在后续的学习中,我们会发现全局锁有一定的弊端,因为他锁住了整个对象,使得整体的并发性不高。源代码中的newCondition()操作会生成一个条件对象,条件对象具有唤醒线程和挂起线程的能力,具体的操作如第二段代码所示。

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
}


第二段代码是ArrayBlockingQueue的put操作,put操作的第一步就是获得全局锁,然后判断队列当前元素是否已满,如果队列慢就会使用notFull条件变量将线程挂起,否则就会调用enqueue函数进行入队操作。入队操作同时会使用notEmpty条件变量来唤醒一个被notEmpty阻塞的的线程(take操作会调用notEmpty来阻塞线程)。

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();//挂起线程,当其他线程调用notFull.signal()或者notFull.signlAll()时,该线程唤醒。
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();//唤醒一个被notEmpty阻塞的线程,因为此时队列已经非空了
}


实例

下面我为了熟悉管程的使用,会亲自造轮子体会一下。

package Monitors;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue<E> {
private ReentrantLock lock;//全局锁
private Condition notFull;//条件变量,用来监控队列是否已满
private Condition notEmpty;//条件变量,用来监控队列是否为空
private final int capacity;//容量,这是一个定容的队列
private int head;//队列头部哨兵
private int tail;//队列尾部哨兵
private int count;//当然元素个数
private E[] data;//保存元素的数组
/*
初始化阻塞队列
*/
public BlockingQueue(int c){
this.lock=new ReentrantLock();
this.notEmpty=lock.newCondition();
this.notFull=lock.newCondition();
this.capacity=c;
this.count=0;
this.head=0;
this.tail=0;
data=(E[])new Object[c+1];
}
/*
向队列中添加一个元素
*/
public void put(E e){
lock.lock();
try{
while(this.count==this.capacity)//判断队列是否已满
notFull.await();//挂起线程
tail++;
count++;
if(tail==this.capacity+1)tail=0;
data[tail]=e;//插入元素
notEmpty.signal();//通知其他线程,队列不为空
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}finally{
lock.unlock();
}
}
/*
获取队列容量
*/
public int getCap(){
return this.capacity;
}
/*
判读队列是否为空
*/
public boolean isEmpty(){
lock.lock();
try{
return count==0?true:false;
}finally{
lock.unlock();
}
}
/*
判断队列是否已满
*/
public boolean isFull(){
lock.lock();
try{
return count==capacity?true:false;
}finally{
lock.unlock();
}
}
/*
从队列中取出一个元素
*/
public E take() throws InterruptedException{
lock.lock();
try{
while(this.count==0)//判断队列是否为空
notEmpty.await();//阻塞队列
head++;
count--;
if(head==this.capacity+1)head=0;
E x=data[head];//获得头部元素
notFull.signal();//通知其他线程,队列未满
return x;
}finally{
lock.unlock();
}
}
}


在上锁所有操作中,几乎每个操作都在如下的结构体中进行,为了防止代码异常退出而导致锁没有被释放,必须使用finally关键字确保锁的释放。该代码块的作用和synchronized同步块作用类似,在进入同步块之前都要先获得锁,然后进行同步操作;在退出同步块的时候都要释放锁并再次同步。

lock.lock();
try{
return count==capacity?true:false;
}finally{
lock.unlock();
}


参考文献

下面两本书我极力推荐,它们对并发编程解释得非常全面。

[1] Brian Goetz等著 童云兰译..机械工业出版社

[2][Maurice Herlihy, Nir Shavit著,<多处理器编程的艺术]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发