AbstractQueuedSynchronizer在工具类ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier中的应用
2017-02-08 17:29
736 查看
在Java.util.concurrent包中,AbstractQueuedSynchronizer的应用非常广泛,而不局限于在ReentrantLock中的实现,本文简要介绍下AbstractQueuedSynchronizer在Semaphore、CountDownLatch、ReentrantReadWriteLock等类中的应用。
0. 回顾
上文在介绍AQS的时候,介绍了AQS和ReentrantLock类中的Sync子类互相配合完成可重入锁的实现,在这其中AQS所提供的是一套灵活和完整的队列处理机制。由于在AQS中已经提供了完整的队列处理机制,通常是不需要扩展的子类Override的。同时,AQS又提供了state属性和tryAcquire()/tryRelease()等方法,而这些正是需要子类根据具体的需求逻辑灵活实现的扩展点。从ReentrantLock、ReentrantReadWriteLock、Semaphore和CountDownLatch的实现来看,通常是在这些工具类的中封装实现自己独有的Sync内部类,而Sync就是对AQS的扩展实现。
1. Semaphore
学习操作系统理论课的时候,教材上应该都会讲过信号量这种概念,java.util.concurrent.Semaphore类就是Java中这个概念的实现。比如资源R有5个实体,如果每个线程执行的过程中需要用到1个,那么允许5个线程并发执行,第6个会等待其他线程释放资源后继续执行。
先看一个应用Semaphore的例子:
其中最主要的方法就是acquire()和release(),更详细的可以参看Oracle的API文档。
[java] view plain copy print?
ExecutorService exec = Executors.newCachedThreadPool();
finalSemaphore sem = newSemaphore(5);
for(inti = 1; i < 100; i++) {
finalint tid = i;
Runnable semTask = newRunnable() {
publicvoid run() {
try{
sem.acquire();
System.out.println(”running thread with id: ” + tid);
Thread.sleep((long) (Math.random() * 3000));
System.out.println(”completing with id: ” + tid);
sem.release();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(semTask);
}
exec.shutdown();
下面看看Semaphore的实现。如果熟悉ReentrantLock的实现,那么Semaphore其实很好理解,简单来讲,Semaphore实际上就是把锁的限制从1变为N。
state存储的是表示剩余可用资源的值
Node采用的是SHARED模式
获得锁之后会尝试传播,释放更多锁
2. CountDownLatch
这个类主要是为了解决多线程中的状态依赖问题。java.util.concurrent.CountDownLatch这个类从名字就可以看出,是以一个递减计数器为基础,多个线程共享这样一个对象,开启一个计数值,某些线程可以等待这个计数器值为0的时候继续任务,调用await(),而那些改变状态的线程需要做的就是使计数器递减,调用countDown()方法。
[java] view plain copy print?
ExecutorService exec = Executors.newCachedThreadPool();
finalCountDownLatch cdl = newCountDownLatch(3);
Runnable watingTasks = newRunnable() {
publicvoidrun() {
try{
System.out.println(”there’re 3 tasks here. if all tasks are finished, i will go home.”);
System.out.println(”working…”);
cdl.await();
System.out.println(”ok, i will go home now!”);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(watingTasks);
for(inti = 0; i < 3; i++) {
finalinttid = i;
Runnable semTask = newRunnable() {
publicvoidrun() {
try{
System.out.println(”starting task ” + tid + “…”);
Thread.sleep((long) (Math.random() * 5000));
System.out.println(”task ” + tid + “ finished”);
cdl.countDown();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(semTask);
}
exec.shutdown();
和Semaphore类似,在使用AQS的实现上,主要有以下几点。
state存储的是count计数变量
Node采用的是SHARED模式
countDown()调用tryReleaseShared使得计数减1
await是调用tryAcquire,实际上就是判断state是否为0
3. ReentrantReadWriteLock
在高并发场景下,为了让任务执行更有效率,将读和写场景分离是有必要的。这是因为读和写在线程安全方面特点的不同,读不改变状态,多个线程是可以同时进行而没有问题的,而写与写、写与读之间都是需要互斥的。在java.util.concurrent中,ReentrantReadWriteLock这个类就是做这个事情的。
具体的使用就不举例了,直接分析下其实现。从类名上看ReentrantLock和ReentrantReadWriteLock就很相似,实现上也有相似的部分。ReentrantReadWriteLock中封装了ReadLock和WriteLock内部类,而ReentrantReadWriteLock、ReentrantReadWriteLock.ReadLock、ReentrantReadWriteLock.WriteLock中都有自己的Sync类属性,使用的是ReentrantReadWriteLock.Sync实现,而且对象关系上,ReadLock和WriteLock中的sync都是指向ReentrantReadWriteLock对象中的sync引用,即使用了同一个AQS同一套队列,只是将方法分离开来处理。
其中的要点:
state同时存储r和w的个数
WriteLock的锁操作类似ReentrantLock使用互斥节点
而readlock使用共享节点
读锁写锁的逻辑在各自的tryLock中,最终实现在ReadWriteLock的tryReadLock和tryWriteLock中
4. CyclicBarrier
在这里说java.util.concurrent.CyclicBarrier,并非因为其使用了AQS,而是因为它的用法和CountDownLatch有类似之处。CyclicBarrier和CountDownLatch都是处理状态依赖的问题的,而不同之处是使用CyclicBarrier的线程互相依赖,即互相等待,直到达到某一特定状态,这些线程同时继续执行。
CyclicBarrier是基于ReetrantLock和ConditionObject的,await()的时候对计数器递减,并检查是否为0,如果为0则执行CyclicBarrier类对象的barrierCommand(Runnable类对象属性)并signalAll()通知所有等待线程开始下一轮,否则阻塞当前线程。
0. 回顾
上文在介绍AQS的时候,介绍了AQS和ReentrantLock类中的Sync子类互相配合完成可重入锁的实现,在这其中AQS所提供的是一套灵活和完整的队列处理机制。由于在AQS中已经提供了完整的队列处理机制,通常是不需要扩展的子类Override的。同时,AQS又提供了state属性和tryAcquire()/tryRelease()等方法,而这些正是需要子类根据具体的需求逻辑灵活实现的扩展点。从ReentrantLock、ReentrantReadWriteLock、Semaphore和CountDownLatch的实现来看,通常是在这些工具类的中封装实现自己独有的Sync内部类,而Sync就是对AQS的扩展实现。
1. Semaphore
学习操作系统理论课的时候,教材上应该都会讲过信号量这种概念,java.util.concurrent.Semaphore类就是Java中这个概念的实现。比如资源R有5个实体,如果每个线程执行的过程中需要用到1个,那么允许5个线程并发执行,第6个会等待其他线程释放资源后继续执行。
先看一个应用Semaphore的例子:
其中最主要的方法就是acquire()和release(),更详细的可以参看Oracle的API文档。
[java] view plain copy print?
ExecutorService exec = Executors.newCachedThreadPool();
finalSemaphore sem = newSemaphore(5);
for(inti = 1; i < 100; i++) {
finalint tid = i;
Runnable semTask = newRunnable() {
publicvoid run() {
try{
sem.acquire();
System.out.println(”running thread with id: ” + tid);
Thread.sleep((long) (Math.random() * 3000));
System.out.println(”completing with id: ” + tid);
sem.release();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(semTask);
}
exec.shutdown();
ExecutorService exec = Executors.newCachedThreadPool(); finalSemaphore sem = newSemaphore(5); for(inti = 1; i < 100; i++) { finalint tid = i; Runnable semTask = newRunnable() { publicvoid run() { try{ sem.acquire(); System.out.println("running thread with id: " + tid); Thread.sleep((long) (Math.random() * 3000)); System.out.println("completing with id: " + tid); sem.release(); }catch(InterruptedException e) { e.printStackTrace(); } } }; exec.execute(semTask); } exec.shutdown();
下面看看Semaphore的实现。如果熟悉ReentrantLock的实现,那么Semaphore其实很好理解,简单来讲,Semaphore实际上就是把锁的限制从1变为N。
state存储的是表示剩余可用资源的值
Node采用的是SHARED模式
获得锁之后会尝试传播,释放更多锁
2. CountDownLatch
这个类主要是为了解决多线程中的状态依赖问题。java.util.concurrent.CountDownLatch这个类从名字就可以看出,是以一个递减计数器为基础,多个线程共享这样一个对象,开启一个计数值,某些线程可以等待这个计数器值为0的时候继续任务,调用await(),而那些改变状态的线程需要做的就是使计数器递减,调用countDown()方法。
[java] view plain copy print?
ExecutorService exec = Executors.newCachedThreadPool();
finalCountDownLatch cdl = newCountDownLatch(3);
Runnable watingTasks = newRunnable() {
publicvoidrun() {
try{
System.out.println(”there’re 3 tasks here. if all tasks are finished, i will go home.”);
System.out.println(”working…”);
cdl.await();
System.out.println(”ok, i will go home now!”);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(watingTasks);
for(inti = 0; i < 3; i++) {
finalinttid = i;
Runnable semTask = newRunnable() {
publicvoidrun() {
try{
System.out.println(”starting task ” + tid + “…”);
Thread.sleep((long) (Math.random() * 5000));
System.out.println(”task ” + tid + “ finished”);
cdl.countDown();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(semTask);
}
exec.shutdown();
ExecutorService exec = Executors.newCachedThreadPool(); finalCountDownLatch cdl = newCountDownLatch(3); Runnable watingTasks = newRunnable() { publicvoidrun() { try{ System.out.println("there're 3 tasks here. if all tasks are finished, i will go home."); System.out.println("working..."); cdl.await(); System.out.println("ok, i will go home now!"); }catch(InterruptedException e) { e.printStackTrace(); } } }; exec.execute(watingTasks); for(inti = 0; i < 3; i++) { finalinttid = i; Runnable semTask = newRunnable() { publicvoidrun() { try{ System.out.println("starting task " + tid + "..."); Thread.sleep((long) (Math.random() * 5000)); System.out.println("task " + tid + " finished"); cdl.countDown(); }catch(InterruptedException e) { e.printStackTrace(); } } }; exec.execute(semTask); } exec.shutdown();
和Semaphore类似,在使用AQS的实现上,主要有以下几点。
state存储的是count计数变量
Node采用的是SHARED模式
countDown()调用tryReleaseShared使得计数减1
await是调用tryAcquire,实际上就是判断state是否为0
3. ReentrantReadWriteLock
在高并发场景下,为了让任务执行更有效率,将读和写场景分离是有必要的。这是因为读和写在线程安全方面特点的不同,读不改变状态,多个线程是可以同时进行而没有问题的,而写与写、写与读之间都是需要互斥的。在java.util.concurrent中,ReentrantReadWriteLock这个类就是做这个事情的。
具体的使用就不举例了,直接分析下其实现。从类名上看ReentrantLock和ReentrantReadWriteLock就很相似,实现上也有相似的部分。ReentrantReadWriteLock中封装了ReadLock和WriteLock内部类,而ReentrantReadWriteLock、ReentrantReadWriteLock.ReadLock、ReentrantReadWriteLock.WriteLock中都有自己的Sync类属性,使用的是ReentrantReadWriteLock.Sync实现,而且对象关系上,ReadLock和WriteLock中的sync都是指向ReentrantReadWriteLock对象中的sync引用,即使用了同一个AQS同一套队列,只是将方法分离开来处理。
其中的要点:
state同时存储r和w的个数
WriteLock的锁操作类似ReentrantLock使用互斥节点
而readlock使用共享节点
读锁写锁的逻辑在各自的tryLock中,最终实现在ReadWriteLock的tryReadLock和tryWriteLock中
4. CyclicBarrier
在这里说java.util.concurrent.CyclicBarrier,并非因为其使用了AQS,而是因为它的用法和CountDownLatch有类似之处。CyclicBarrier和CountDownLatch都是处理状态依赖的问题的,而不同之处是使用CyclicBarrier的线程互相依赖,即互相等待,直到达到某一特定状态,这些线程同时继续执行。
CyclicBarrier是基于ReetrantLock和ConditionObject的,await()的时候对计数器递减,并检查是否为0,如果为0则执行CyclicBarrier类对象的barrierCommand(Runnable类对象属性)并signalAll()通知所有等待线程开始下一轮,否则阻塞当前线程。
相关文章推荐
- AbstractQueuedSynchronizer在工具类Semaphore、CountDownLatch、ReentrantLock中的应用和CyclicBarrier
- AbstractQueuedSynchronizer在工具类ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier中的应用
- java 并发类semaphore countdownlatch cyclicbarrier reentrantlock condition reentrantreadwritelock
- 关于高并发的几个基本锁的学习ReentrantLock,CountDownLatch ,CyclicBarrier,Semaphore,reentrantReadWriteLock
- 同步工具类:CountDownLatch、CyclicBarrier和Semaphore
- Java中的并发工具类:CountDownLatch、CyclicBarrier和Semaphore
- Java线程(CountDownLatch、CyclicBarrier、Semaphore)并发控制工具类
- 23-同步工具类之CountDownLatch、CyclicBarrier和Semaphore
- 【Java多线程】JUC包下的工具类CountDownLatch、CyclicBarrier和Semaphore
- Java并发编程之2——同步工具类的使用(CountDownLatch,CyclicBarrier,BlockungQueue,Semaphore)
- Java各种Synchronizer: CountDownLatch, CyclicBarrier,Semaphore
- 四个并发工具类CountDownLatch,CyclicBarrier,Semaphore,Exchanger
- 并发工具类:CountDownLatch、CyclicBarrier、Semaphore
- Synchronizer 闭锁(CountDownLatch,FutureTask ) 信号量(Semaphore) 关卡(CyclicBarrier) 知识点总结(java并发编程实践读书笔记三)
- Java多线程之同步工具类(CountDownLatch、CyclicBarrier、Semaphore)
- AbstractQueuedSynchronizer(十)——应用之ReentrantLock
- Java并发:同步工具类详解(CountDownLatch、CyclicBarrier、Semaphore)
- java高并发程序设计总结五:jdk并发包其他同步控制工具类:ReadWriteLock/CountDownLatch/CyclicBarrier/LockSupport
- 第8章 Java中的并发工具类(CountDownLatch CyclicBarrier Semaphore Exchanger)
- JAVA多线程系列--并发工具类(CountDownLatch, CyclicBarrier, Semaphore,Exchanger)