Java并发编程之(二)管程
2016-05-27 19:23
330 查看
本博客转载自《奔跑的猪的博客》
本文中的代码可以在github上找到,有需要的请自行下载
阻塞队列(BlockingQueue)
非阻塞队列(NoBlockingQueue)
阻塞队列的特征为,当队列为空时会阻塞消费者,当队列满时阻塞生产者。这种机制能够平衡生产者和消费者的负载。
分析上述定义,阻塞队列具有线程安全,阻塞,条件的特性;线程安全和阻塞就意味着要实现同步以及互斥,因此,管程能够很好地满足阻塞队列的要求。
ArrayBlockingQueue结构将ReentrantLock(可重入锁)作为全局锁来实现线程安全,所谓全局锁就是整个数据结构中就这么一个锁,当一个线程在访问该对象并获得锁之后,其他线程要访问该对象都得阻塞。在后续的学习中,我们会发现全局锁有一定的弊端,因为他锁住了整个对象,使得整体的并发性不高。源代码中的newCondition()操作会生成一个条件对象,条件对象具有唤醒线程和挂起线程的能力,具体的操作如第二段代码所示。
第二段代码是ArrayBlockingQueue的put操作,put操作的第一步就是获得全局锁,然后判断队列当前元素是否已满,如果队列慢就会使用notFull条件变量将线程挂起,否则就会调用enqueue函数进行入队操作。入队操作同时会使用notEmpty条件变量来唤醒一个被notEmpty阻塞的的线程(take操作会调用notEmpty来阻塞线程)。
在上锁所有操作中,几乎每个操作都在如下的结构体中进行,为了防止代码异常退出而导致锁没有被释放,必须使用finally关键字确保锁的释放。该代码块的作用和synchronized同步块作用类似,在进入同步块之前都要先获得锁,然后进行同步操作;在退出同步块的时候都要释放锁并再次同步。
参考文献
下面两本书我极力推荐,它们对并发编程解释得非常全面。
[1] Brian Goetz等著 童云兰译..机械工业出版社
[2][Maurice Herlihy, Nir Shavit著,<多处理器编程的艺术]
本文中的代码可以在github上找到,有需要的请自行下载
标题
定义
维基百科中定义管程为:在并发编程中,管程(monitor)为一个同步结构,具有线程互斥特性,以及能够根据某些条件来阻塞线程。根据定义,管程有三个要素:同步、互斥、条件。恰好在Java的Concurrent包中ReentrantLock具有上述所有特性,可以用来实现管程。管程是一个非常实用且常见的技术,可以用来实现很多常用的并发数据结构,例如阻塞队列。阻塞队列
队列常用于生产者消费者模型,生产者发送消息并存储到队列,消费者从队列中取出消息。一般情况下会存在多个生产者和多个消费者,普通的队列不能保证并发的安全,因此需要用到线程安全的技术。线程安全的队列又可以分为两种:阻塞队列(BlockingQueue)
非阻塞队列(NoBlockingQueue)
阻塞队列的特征为,当队列为空时会阻塞消费者,当队列满时阻塞生产者。这种机制能够平衡生产者和消费者的负载。
分析上述定义,阻塞队列具有线程安全,阻塞,条件的特性;线程安全和阻塞就意味着要实现同步以及互斥,因此,管程能够很好地满足阻塞队列的要求。
Java源码中的管程
如下代码所示是Concurrent包中ArrayBlockingQueue的实现,队列有两种实现方式,数组和链表。Array是数组的实现,而数组通常是有界的。ArrayBlockingQueue结构将ReentrantLock(可重入锁)作为全局锁来实现线程安全,所谓全局锁就是整个数据结构中就这么一个锁,当一个线程在访问该对象并获得锁之后,其他线程要访问该对象都得阻塞。在后续的学习中,我们会发现全局锁有一定的弊端,因为他锁住了整个对象,使得整体的并发性不高。源代码中的newCondition()操作会生成一个条件对象,条件对象具有唤醒线程和挂起线程的能力,具体的操作如第二段代码所示。
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
第二段代码是ArrayBlockingQueue的put操作,put操作的第一步就是获得全局锁,然后判断队列当前元素是否已满,如果队列慢就会使用notFull条件变量将线程挂起,否则就会调用enqueue函数进行入队操作。入队操作同时会使用notEmpty条件变量来唤醒一个被notEmpty阻塞的的线程(take操作会调用notEmpty来阻塞线程)。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//挂起线程,当其他线程调用notFull.signal()或者notFull.signlAll()时,该线程唤醒。 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();//唤醒一个被notEmpty阻塞的线程,因为此时队列已经非空了 }
实例
下面我为了熟悉管程的使用,会亲自造轮子体会一下。package Monitors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class BlockingQueue<E> { private ReentrantLock lock;//全局锁 private Condition notFull;//条件变量,用来监控队列是否已满 private Condition notEmpty;//条件变量,用来监控队列是否为空 private final int capacity;//容量,这是一个定容的队列 private int head;//队列头部哨兵 private int tail;//队列尾部哨兵 private int count;//当然元素个数 private E[] data;//保存元素的数组 /* 初始化阻塞队列 */ public BlockingQueue(int c){ this.lock=new ReentrantLock(); this.notEmpty=lock.newCondition(); this.notFull=lock.newCondition(); this.capacity=c; this.count=0; this.head=0; this.tail=0; data=(E[])new Object[c+1]; } /* 向队列中添加一个元素 */ public void put(E e){ lock.lock(); try{ while(this.count==this.capacity)//判断队列是否已满 notFull.await();//挂起线程 tail++; count++; if(tail==this.capacity+1)tail=0; data[tail]=e;//插入元素 notEmpty.signal();//通知其他线程,队列不为空 } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); }finally{ lock.unlock(); } } /* 获取队列容量 */ public int getCap(){ return this.capacity; } /* 判读队列是否为空 */ public boolean isEmpty(){ lock.lock(); try{ return count==0?true:false; }finally{ lock.unlock(); } } /* 判断队列是否已满 */ public boolean isFull(){ lock.lock(); try{ return count==capacity?true:false; }finally{ lock.unlock(); } } /* 从队列中取出一个元素 */ public E take() throws InterruptedException{ lock.lock(); try{ while(this.count==0)//判断队列是否为空 notEmpty.await();//阻塞队列 head++; count--; if(head==this.capacity+1)head=0; E x=data[head];//获得头部元素 notFull.signal();//通知其他线程,队列未满 return x; }finally{ lock.unlock(); } } }
在上锁所有操作中,几乎每个操作都在如下的结构体中进行,为了防止代码异常退出而导致锁没有被释放,必须使用finally关键字确保锁的释放。该代码块的作用和synchronized同步块作用类似,在进入同步块之前都要先获得锁,然后进行同步操作;在退出同步块的时候都要释放锁并再次同步。
lock.lock(); try{ return count==capacity?true:false; }finally{ lock.unlock(); }
参考文献
下面两本书我极力推荐,它们对并发编程解释得非常全面。
[1] Brian Goetz等著 童云兰译..机械工业出版社
[2][Maurice Herlihy, Nir Shavit著,<多处理器编程的艺术]
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树