您的位置:首页 > 编程语言 > Java开发

Java并发编程的4个同步辅助类(CountDownLatch、CyclicBarrier、Semphore、Phaser)

2018-10-16 14:41 579 查看

我在《jdk1.5引入的concurrent包》中,曾经介绍过CountDownLatch、CyclicBarrier两个类,还给出了CountDownLatch的演示案例。这里再系统总结下Java并发编程中的4个类CountDownLatch、CyclicBarrier、Semphore、Phaser。

  1.CountDownLatch

  CountDownLatch可以理解为一个计数器在初始化时设置初始值,当一个线程需要等待某些操作先完成时,需要调用await()方法。这个方法让线程进入休眠状态直到等待的所有线程都执行完成。每调用一次countDown()方法,内部计数器减1,直到计数器为0时唤醒。这个可以理解为特殊的CyclicBarrier。

   案例场景:视频会议室里等与会人员到齐了会议才能开始。

   代码执行结果如下:

   需要注意的是,CountDownLatch是一个线程计数器。等计数器为0时,那些先前因调用await()方法休眠的线程被唤醒。CountDownLatch能够控制的线程是哪些呢?是那些调用了CountDownLatch的await()方法的线程。案例中,先运行await()方法的线程是视频会议的线程,然后执行与会者 线程,这里的处理是每到一位(每创建一个线程并运行run()方法时就使计数器减1)就让计数器减1,等计数器减为0时唤醒因调用await()方法进入休眠的线程。这里的与会者线程就是视频会议线程要等待的线程。

  2.CyclicBarrier

  CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
  当一个线程到达集合点时,它将调用await()方法等待其它的线程。线程调用await()方法后,CyclicBarrier将阻塞这个线程,并将它置入休眠状态等待其它线程的到来。等最后一个线程调用await()方法时,CyclicBarrier将唤醒所有等待的线程,然后这些线程将继续执行。CyclicBarrier可以传入另一个Runnable对象作为初始化参数。当所有的线程都到达集合点后,CyclicBarrier类将Runnable对象作为线程执行。

   案例场景:有4个游戏玩家玩游戏,游戏有三个关卡,每个关卡必须要所有玩家都到达后才能允许通过。其实这个场景里的玩家中如果有玩家A先到了关卡1,他必须等到其他所有玩家都到达关卡1时才能通过,也就是说线程之间需要相互等待。这和CountDownLatch的应用场景有区别,CountDownLatch里的线程是到了运行的目标后继续干自己的其他事情,而这里的线程需要等待其他线程后才能继续完成后面的工作。   

   代码执行结果如下:

   3.semaphore
  信号量就是可以声明多把锁(包括一把锁,此时为互斥信号量)。
  举个例子:一个房间如果只能容纳5个人,多出来的人必须在门外面等着。如何去做呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一个人就取走一把钥匙,没有钥匙的不能进入该房间,而是在外面等待。每出来一个人就把钥匙放回原处以方便别人再次进入。   

   可以说,Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

   所有java.util.concurrent包中的同步器类都声明了一个私有的继承了

AbstractQueuedSynchronizer
的内部类,并且把所有同步方法都委托给这个内部类。这样各个同步器类的公开方法就可以使用适合自己的名称。子类只需定义状态的检查与更新相关的方法,这些方法控制着acquire和 release操作。

  AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

  AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

  不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:  

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。

  以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

  再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

  一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

  同步器背后的基本思想非常简单。acquire操作如下:

   release操作如下:  

   为了实现上述操作,需要下面三个基本组件的相互协作:

  • 同步状态的原子性管理;
  • 线程的阻塞与解除阻塞;
  • 队列的管理;

  同步器框架的核心决策是为这三个组件选择一个具体实现,同时在使用方式上又有大量选项可用。这里有意地限制了其适用范围,但是提供了足够的效率,使得实际上没有理由在合适的情况下不用这个框架而去重新建造一个。

  到此,我们再继续看Semaphore同步器。为了简单起见,我们以一个停车场的运作为例。假设停车场只有三个车位,一开始三个车位都是空的。这时,如果同时来了五辆车,看门人允许其中三辆不受阻碍地进入,然后放下车拦,剩下的车则必须在停车场外的入口处等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,升起车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。在这个场景中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么停车场在进入两辆车后,其后的车辆就要等到有车离开才能获准许进入。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。

  Semaphore的主要方法有:  

   接下来写一个案例,有7个人,各自获取信号量的许可后,再释放许可。

   上述代码执行结果如下:

   我们上面用的是非公平信号量,改为公平信号量:

   这时运行结果如下:

   Semaphore信号量的实现和ReetrantLock类似,都是通过内部类Sync,Sync是一个继承于AQS的抽象类;    Semaphore信号量和ReentrantLock互斥锁的实现区别在于,ReentrantLock互斥锁的state如果为0则表示锁未被占用,如果为0之外的数值表示锁被重入的次数;Semaphore信号量的state表示许可的数目;    Sync包括两个子类:公平信号量FairSync和非公平信号量NonfailrSync,默认是非公平信号量NonfairSync。其中,公平信号量是指如果线程不在同步队列头部则排队等候;非公平信号量是指无论当前线程是否在同步队列头部,都会尝试获取信号量。

  信号量如果要实现单例模式,可以这样修改:

   再执行代码,结果则如下:

   可见,Semaphore将给定许可数设置为1,就如同一个单例模式,即单个停车位,只有一辆车进,然后这辆车出来后,下一辆车才能进。

  另外,我们在上面的案例中用到了线程池:

   其中,ThreadPoolExecutor的构造方法体系有:

   对于构造方法的参数说明如下:

   线程池的线程执行规则跟任务队列有很大的关系。其中:

  在ThreadPoolExecutor中用到了BlockingQueue阻塞队列的接口。请参考我的另一篇博文《Java中的BlockingQueue》。  

   4.Phaser

  Phaser是一个更加复杂和强大的同步辅助类,它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。

  可以说,Phaser允许并发多阶段任务。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
  跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。  

  一个Phaser对象有两种状态:

  • 活跃态(Active):当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
  • 终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止态,在终止状态下,Phaser没有任何参与者。当Phaser对象onAdvance()方法返回True时,Phaser对象就处于终止态。当Phaser处于终止态时,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步操作。

   案例场景:Phaser将同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内改过扩展名为.txt的文件。这个任务分解为三个步骤:①在指定文件夹及其子文件夹中获得扩展名为.txt的文件;②对第一步的结果过滤,删除修改时间超过24小时的文件;③将结果打印数据到控制台。

  控制台打印如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: