您的位置:首页 > 编程语言 > Java开发

java并发之同步辅助类(Semphore、CountDownLatch、CyclicBarrier、Phaser、Exchanger)

2018-02-09 17:23 645 查看
1、Semphore
字面翻译的意思是(信号量)。正常的锁来自(concurrent.lock或内建的synchronized锁),在任何时刻都只允许一个任务访问同一资源,而技术信号量允许n个任务同时访问这个资源。你还可以就信号量看成是在向外分发资源的"许可证",尽管实际上没有用到人任何的许可证对象。
信号量就是允许声明多把锁(含一把锁,此时为互斥信号量)。
实现原理:Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起;而一旦有一个线程释放一个资源,那么就有可能重新唤醒等待队列中的线程继续执行。

举例:比如参观中山陵,人比较多每次只能放一批人进去,没有准许进去的只能在外面等,等有一批人出来了再放进去一批。
package com.soecode.lyf.web.test;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
public static void main(String[] args) {

//打印Semaphore的所有的方法
//acquire():获取信号量,信号量内部计数器减1
//release():释放信号量,信号量内部计数器加1
//tryAcquire():这个方法试图获取信号量,如果能够获取返回true,否则返回false
Method[] method = Semaphore.class.getDeclaredMethods();
System.out.println("打印Semaphore所有的方法:");
for(Method m : method){
System.out.println(m.getName());
}

//每次只能容纳4人进入
Semaphore s=new Semaphore(4);
//初始化中山陵风景区
VisitorSunMausoleum vm = new VisitorSunMausoleum(s);
//定义一群游客
List<VisitorStart> list = new ArrayList<VisitorStart>();
for(int i=0;i<100;i++){
list.add(new VisitorStart(vm));
}
//开始组团参观
int i=0;
for(VisitorStart v : list){
System.out.println(i++);
v.run();
}

}

}
//参观中山陵风景区
class VisitorSunMausoleum{

private Semaphore semaphore;

public VisitorSunMausoleum(Semaphore semaphore){
this.semaphore = semaphore;
}

public void visitStart() {
try {
System.out.println("前");
//一旦超过访问限制就会在此处拦截
semaphore.acquire();
/*boolean b = semaphore.tryAcquire();
System.out.println(b);*/
System.out.println("后");
System.out.println("参观开始=====");
} catch (Exception e) {
e.printStackTrace();
}
}
public void visitEnd() {
try {
semaphore.release();
System.out.println("参观结束*****");

} catch (Exception e) {
e.printStackTrace();
}
}

}

//游客
class VisitorStart implements Runnable{

private VisitorSunMausoleum sun;

public VisitorStart(VisitorSunMausoleum sun){
this.sun=sun;
}

@Override
public void run() {
try {
sun.visitStart();
Thread.sleep(1000);
//sun.visitEnd();
} catch (Exception e) {
e.printStackTrace();
}
}

}

打印的日志:
打印Semaphore所有的方法:
acquire
acquire
tryAcquire
tryAcquire
tryAcquire
tryAcquire
toString
getQueueLength
getQueuedThreads
hasQueuedThreads
isFair
release
release
acquireUninterruptibly
acquireUninterruptibly
availablePermits
drainPermits
reducePermits
0


参观开始=====
1


参观开始=====
2


参观开始=====
3


参观开始=====
4

当第四个打印出"后",便不再继续执行了。将//visitEnd();这一行释放掉可以看到日志如下:打印Semaphore所有的方法:
toString
getQueueLength
getQueuedThreads
hasQueuedThreads
isFair
release
release
acquire
acquire
tryAcquire
tryAcquire
tryAcquire
tryAcquire
acquireUninterruptibly
acquireUninterruptibly
availablePermits
drainPermits
reducePermits
0


参观开始=====
参观结束*****
1


参观开始=====
参观结束*****
2


参观开始=====
参观结束*****
3


参观开始=====
参观结束*****
4


参观开始=====
参观结束*****
5


参观开始=====
参观结束*****
6


参观开始=====
参观结束*****

......

 2、CountDownLatch
关键词(Latch 门闩),它被用来同步一个或多个任务,强制他们等待由其他任务执行的一组操作完成。可以向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用wait的方法都将阻塞,直至这个计数值到达0。其他任务在结束其他工作时,可以在该对象上调用countDown来减小这个计数值。countDownLatch,被设计为只触发一次,计数值不能被重置如果需要重置计数值的版本,可以使用CyclicBarrier。调用countDown的任务在产生这个调用时并没有被阻塞,只有对await的调用阻塞直至计数值到达0。
CountDownLatch可以理解为一个计数器在初始化时设置初始值,当一个线程需要等待某些操作先完成时,需要调用await()方法。这个方法让线程进入休眠状态直到等待的所有线程都执行完成。每调用一次countDown()方法内部计数器减1,直到计数器为0时唤醒。这个可以理解为特殊的CyclicBarrier。线程同步点比较特殊,为内部计数器值为0时开始。
实现原理:使用共享锁实现。

举例:比如10人共同报名旅行社去外地游玩,在出发的时候必须等所有人到齐然后才能一起出发。package com.soecode.lyf.web.test;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest {
public static void main(String[] args) {

//获取CountDownLatch的方法
//countDown():使CountDownLatch维护的内部计数器减1,每个被等待的线程完成的时候调用
//await():线程在执行到CountDownLatch的时候会将此线程置于休眠
Method[] method = CountDownLatch.class.getDeclaredMethods();
for(Method m : method){
System.out.println(m.getName());
}

//旅行社班车荷载10人
CountDownLatch countDown = new CountDownLatch(10);
System.out.println("本次报团人数是:"+countDown.getCount());
//登记10名旅客的姓名
List<Tourist> list = new ArrayList<Tourist>();
for(int i=0;i<10;i++){
list.add(new Tourist("游客"+i));
}
//等待游客上车
ExecutorService service = Executors.newCachedThreadPool();
for(Tourist t:list){
Bus bus = new Bus(countDown);
bus.guidCheck(t);
service.execute(bus);
}

}
}
//旅行社班车
class Bus implements Runnable{

private final CountDownLatch countDown;

public Bus(CountDownLatch countDown){
this.countDown=countDown;
}

public void guidCheck(Tourist tourist){
countDown.countDown();
System.out.println("上车人的姓名:"+tourist.getName());
System.out.println("导游报道:获取剩余人数是 "+countDown.getCount());
}

@Override
public void run() {
try {
//放行李找位置,和送行的人告别
Thread.sleep(3000);
//等待其游客
countDown.await();
System.out.println("呀吼,出发了~~");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
//组团旅游的人
class Tourist{
private String name;

public Tourist(String name){
this.name=name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

}

打印的日志如下:toString
getCount
countDown
await
await
本次报团人数是:10
上车人的姓名:游客0
导游报道:获取剩余人数是 9
上车人的姓名:游客1
导游报道:获取剩余人数是 8
上车人的姓名:游客2
导游报道:获取剩余人数是 7
上车人的姓名:游客3
导游报道:获取剩余人数是 6
上车人的姓名:游客4
导游报道:获取剩余人数是 5
上车人的姓名:游客5
导游报道:获取剩余人数是 4
上车人的姓名:游客6
导游报道:获取剩余人数是 3
上车人的姓名:游客7
导游报道:获取剩余人数是 2
上车人的姓名:游客8
导游报道:获取剩余人数是 1
上车人的姓名:游客9
导游报道:获取剩余人数是 0
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
呀吼,出发了~~
从日志可以看出,等最后一个游客上车,即getCount()等于0的时候所有的方法再一起往下执行。
3、CyclicBarrier
关键词(Barrier)栅栏、Cyclic(循环)。当你希望创建一组任务,它们并行的执行工作,然后在执行下一个步骤之前等待,直至所有的任务都完成(看起来比较像join)。它使得所有的并行任务都将在栅栏处列队,因此可以一直地向前移动。这非常像CountDownLatch,只是CountDownLatchs是只触发一次事件,而CyclicBarrier可以多次重用。
栅栏允许两个或者多个线程在某个集合点同步。当一个线程到达集合点时,它将调用await()方法等待其它的线程。线程调用await()方法后,CyclicBarrier将阻塞这个线程并将它置入休眠状态等待其它线程的到来。等最后一个线程调用await()方法时,CyclicBarrier将唤醒所有等待的线程然后这些线程将继续执行。CyclicBarrier可以传入另一个Runnable对象作为初始化参数。当所有的线程都到达集合点后,CyclicBarrier类将Runnable对象作为线程执行。
举例:流水线作业的汽车行业必须要等上一个流程安装完毕才开始安装下一个流程,比如先安装四个轮子再安装四个车门,每一个流程安装完都需要质检员检查一次,合格之后才流向下一个流程。package com.soecode.lyf.web.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
public static void main(String[] args) {
//待组装的车辆
Car car =new Car();
//质检员
Inspectorinspector = new Inspector(car);
//栅栏
CyclicBarrier cyclicBarrier = new CyclicBarrier(4,inspector);
//操作工
Jockey jockey = new Jockey(cyclicBarrier,car);
for(int i=0;i<4;i++){
Thread thread = new Thread(jockey);
thread.start();
}

}
}
//操作工
class Jockey implements Runnable{

final CyclicBarrier cyclicBarrier;

private Car car;

public Jockey(CyclicBarrier cyclicBarrier,Car car){
this.cyclicBarrier = cyclicBarrier;
this.car=car;
}

@Override
public void run() {
car.setWheel();
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

car.setWindow();

try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

System.out.println("准备材料组装下一辆车...");
}

}
//待组装的车辆
class Car{
//车轮
private int wheel=0;
//车窗
private int window=0;

public int getWheel() {
return wheel;
}

public synchronized void setWheel() {
wheel = ++wheel;
System.out.println("安装第"+wheel+"个车轮");
}

public int getWindow() {
return window;
}

public synchronized void setWindow() {
window = ++window;
System.out.println("安装第"+window+"个车窗");
}

}
//质检员
class Inspectorimplements Runnable{

private Car car;

public Inspector(Car car){
this.car = car;
}

@Override
public void run() {
System.out.println("质检员检查安装质量......检测结果(车轮:"+car.getWheel()+",车窗:"+car.getWindow()+"),流向下一岗");
}

}

打印结果如下:安装第1个车轮
安装第2个车轮
安装第3个车轮
安装第4个车轮
质检员检查安装质量......检测结果(车轮:4,车窗:0),流向下一岗
安装第1个车窗
安装第2个车窗
安装第3个车窗
安装第4个车窗
质检员检查安装质量......检测结果(车轮:4,车窗:4),流向下一岗
准备材料组装下一辆车...
准备材料组装下一辆车...
准备材料组装下一辆车...
准备材料组装下一辆车...
如上打印结果,每一次执行到
cyclicBarrier.await();
都会等4个任务执行完之后再执行如下的任务一次。并且可以多次调用。问题也是很明显如果我安装方向盘或者观后镜不需要执行4次线程怎么办用CyclicBarrier显然不好办到,这个例子大致可以说清CyclicBarrier的使用步骤。
Inspector

4、Phaser

更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时,(CyclicBarrier是分成两步),就可以选择使用Phaser。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加和删除任务数。方法讲解可以参考:http://shift-alt-ctrl.iteye.com/blog/2302923
举例:以下模拟其中两个场景,初中教学中,一群学生一起入学升级和毕业。也有不同时入学并且中途退学的场景。package com.soecode.lyf.web.test;

import java.util.concurrent.Phaser;

public class PhaserTest {
public static void main(String[] args) {
Phaser phaser = new Phaser();
/*
//比较蒙逼的是每执行五次都会有一次是乱序
SchoolStudentTest1 school ;
for(int i=0;i<5;i++){
Student s = new Student();
s.setId(i);
s.setName("学生"+i);
school = new SchoolStudentTest1(phaser,s);
Thread thread = new Thread(school);
thread.start();
//将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
phaser.register();
}*/

//此时使用arriveAndAwaitAdvance不能保证同步
SchoolStudentTest2 school2 ;
for(int i=0;i<3;i++){
Student s = new Student();
s.setId(i);
s.setName("学生"+i);
school2 = new SchoolStudentTest2(phaser,s);
Thread thread = new Thread(school2);
thread.start();
//将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
phaser.register();
}

}
}
class SchoolStudentTest1 implements Runnable{

final Phaser phaser;

private Student student;

public SchoolStudentTest1(Phaser phaser,Student student
){
this.phaser = phaser;
this.student = student;
}

@Override
public void run() {

System.out.println(student.getName()+"在读初一");
//类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行
phaser.arriveAndAwaitAdvance();
System.out.println(student.getName()+"在读初二");
phaser.arriveAndAwaitAdvance();
System.out.println(student.getName()+"在读初三");
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
System.out.println(student.getName()+"毕业典礼");
//注销掉所有的线程
}

}
class SchoolStudentTest2 implements Runnable{

final Phaser phaser;

private Student student;

public SchoolStudentTest2(Phaser phaser,Student stude
d0bf
nt
){
this.phaser = phaser;
this.student = student;
}

@Override
public void run() {

System.out.println(student.getName()+"在读初一");
System.out.println(student.getName()+"在读初二");
if(checkExam(student,1)){
System.out.println(student.getName()+"的成绩不合格予以退学");
//强制Phaser进入终止态
/*if(!phaser.isTerminated()){
phaser.forceTermination();
}*/
//把执行到此处的线程注销掉
phaser.arriveAndDeregister();
//不加return不能实现效果
return;
}
System.out.println(student.getName()+"在读初三");
System.out.println(student.getName()+"毕业典礼");
}

private boolean checkExam(Student student2, int i) {
if(student2.getId()==i){
return true;
}
return false;
}

}
class Student{

private int id;

private String name;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

两个场景的打印日志如下:学生0在读初一
学生3在读初一
学生2在读初一
学生1在读初一
学生4在读初一
学生4在读初二
学生2在读初二
学生0在读初二
学生1在读初二
学生3在读初二
学生0在读初三
学生4在读初三
学生2在读初三
学生1在读初三
学生3在读初三
学生4毕业典礼
学生2毕业典礼
学生3毕业典礼
学生1毕业典礼
学生0毕业典礼
学生0在读初一
学生1在读初一
学生1在读初二
学生1的成绩不合格予以退学
学生0在读初二
学生0在读初三
学生0毕业典礼
学生2在读初一
学生2在读初二
学生2在读初三
学生2毕业典礼
这个辅助类的功能比较多,欢迎分享交流。

5、Exchanger
Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
举例:比如A、B两家公司要进行股权交换,必须两公司法人代表碰面交换股权协议,先到场的等待后到场的然后交换文件。package com.soecode.lyf.web.test;

import java.lang.reflect.Method;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {
public static void main(String[] args) {

Method[] method = Exchanger.class.getDeclaredMethods();
for(Method m : method){
System.out.println(m.getName());
}

System.out.println("====================================");
Exchanger<String> exchanger = new Exchanger<String>();
CompanyA a = new CompanyA(exchanger);
CompanyB b = new CompanyB(exchanger);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(a);
service.execute(b);
}
}
class CompanyA implements Runnable{

private Exchanger<String> exchange;

public CompanyA(Exchanger<String> exchange){
this.exchange = exchange;
}

@Override
public void run() {
System.out.println("A公司的法人代表正在赶来");
try {
Thread.sleep(2000);
String str = exchange.exchange("A11");
System.out.println("A公司带回的资料文件是:"+str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
class CompanyB implements Runnable{

private Exchanger<String> exchange;

public CompanyB(Exchanger<String> exchanger){
this.exchange=exchanger;
}

@Override
public void run() {
System.out.println("B公司的法人代表正在赶来");
try {
Thread.sleep(5000);
String str = exchange.exchange("B22");
System.out.println("B公司带回的文件是:"+str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}打印的日志如下:await
awaitNanos
createSlot
doExchange
exchange
exchange
hashIndex
scanOnTimeout
spinWait
tryCancel
====================================
A公司的法人代表正在赶来
B公司的法人代表正在赶来
B公司带回的文件是:A11
A公司带回的资料文件是:B22
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  多线程
相关文章推荐