您的位置:首页 > 其它

理解CountDownLatch和CyclicBarrier(主线程等待所有子线程执行完毕在执行)

2019-02-12 18:32 225 查看

1.sleep()
用sleep方法,让主线程睡眠一段时间。

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
long id = Thread.currentThread().getId();
String name = Thread.currentThread().getName();
int count = 0;

while (true) {
System.out.println("---线程id" + id + "|name|" + name + "第" + count++ + "次执行!");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
t1.setDaemon(true);
t1.start();

2.join()
使用Thread的join()等待所有的子线程执行完毕,主线程在执行,thread.join()把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。比如在线程B中调用了线程A的Join()方法,直到线程A执行完毕后,才会继续执行线程B。

public static void main(String[] args) throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(2);
Vector<Thread> vector = new Vector<>();

for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
long id = Thread.currentThread().getId();
String name = Thread.currentThread().getName();
for (int j = 0; j < 5; j++) {
System.out.println(id + "|" + name + "|" + j);
countDownLatch.countDown();
System.out.println("countDowncount:" + countDownLatch.getCount());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
vector.add(thread);
thread.setDaemon(false);
thread.start();
}
for(Thread t : vector){
t.join();
}
countDownLatch.await();
System.out.println("main start");
}

下面介绍一些并发包里非常有用的并发工具类

3.等待多线程完成的CountDownLatch
在这里说明一点,countDownLatch不可能重新初始化或者修改CountDownLatch对象内部计数器的值,一个线程调用countdown方法(计数减1)happen-before另外一个线程调用await方法,直到计数为0执行await方法之后的程序。
示例:

package com;

import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
public static void main(String[] args) throws InterruptedException {
// 计数器
final CountDownLatch countDownLatch = new CountDownLatch(5);
// 线程池
ExecutorService es = Executors.newFixedThreadPool(50);
// 线程1
Thread t1 = new MyCommon(countDownLatch);
es.submit(t1);

// 线程2
Thread t2 = new MyCommon2(countDownLatch);
es.submit(t2);

// 线程3--输入线程
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
while (true) {
String in = scanner.next();
if ("1".equals(in)) {
countDownLatch.countDown();
System.out.println("count==="+countDownLatch.getCount());
} else if ("0".equals(in)) {

} else if ("3".equals(in)) {
break;
}
}
System.out.println("输入线程执行结束==="+countDownLatch.getCount());
}
});
es.submit(t3);
es.shutdown();
Thread.sleep(1000 * 15);

System.out.println("主线程执行结束");
}
}

class MyCommon extends Thread {

CountDownLatch countDownLatch;

public CountDownLatch getCountDownLatch() {
return countDownLatch;
}

public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

public MyCommon() {

}

public MyCommon(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
long id = Thread.currentThread().getId();
String name = Thread.currentThread().getName();
int count = 0;

while (true) {
System.out.println("---线程id" + id + "|name|" + name + "第" + count++ + "次执行!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
t1.setDaemon(true);
t1.start();
long id = Thread.currentThread().getId();
String name = Thread.currentThread().getName();

/*for (int i = 0; i < 5; i++) {
System.out.println("线程id"+id+"|name"+name+"第" + i + "次执行!");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}*/
System.out.println("---*****线程id" + id + "|name" + name + "第执行结束!");

}

}

class MyCommon2 extends Thread {

CountDownLatch countDownLatch;

public CountDownLatch getCountDownLatch() {
return countDownLatch;
}

public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

public MyCommon2() {

}

public MyCommon2(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
long id = Thread.currentThread().getId();
String name = Thread.currentThread().getName();
int count = 0;

while (true) {
System.out.println("++++线程id" + id + "|name" + name + "第" + count++ + "次执行!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
t1.setDaemon(false);
t1.start();

long id = Thread.currentThread().getId();
String name = Thread.currentThread().getName();

try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.interrupt();
System.out.println("+++***线程id" + id + "|name" + name + "第执行结束!");

}

}

class MyDaemon implements Runnable {
public void run() {
for (long i = 0; i < 6L; i++) {
System.out.println("后台线程第" + i + "次执行!");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("后台线程执行结束!");
}
}

注意调大睡眠时间,给控制台输入点击“enter”余留时间;
MyCommon2线程run会创建非守护线程t1并在一定时间后中断它;

4.同步屏障CyclicBarrier
字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。
  CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:
public CyclicBarrier(int parties, Runnable barrierAction) {
}
public CyclicBarrier(int parties) {
}
  参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
  然后CyclicBarrier中最重要的方法就是await方法,它有2个重载版本:
public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
  第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
  第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
  
写到这里大家不免有些疑问,countDownLatch和cyclicBarrier有什么区别呢,他们的区别:countDownLatch只能使用一次,而CyclicBarrier方法可以使用reset()方法重置,所以CyclicBarrier方法可以能处理更为复杂的业务场景。

大概可以这么理解:
countDownLatch 是调用countdown方法计数到达0时唤醒所有await 的地方,CyclicBarrier是await到达设定值时候唤醒所有await 的地方。
CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;

而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;

另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

上代码:

package com;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class CyclicBarrierTest {

public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
//final  CyclicBarrier cb = new CyclicBarrier(3);//创建CyclicBarrier对象并设置3个公共屏障点
final CyclicBarrier cb = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("********我最先执行***********threadId"+Thread.currentThread().getId());
//                try {
//                    Thread.sleep(1000*2);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
System.out.println("********我执行完了***********threadId"+Thread.currentThread().getId());
}
});
for (int i = 0; i < 3; i++) {
Runnable runnable = new Runnable() {
public void run() {
try {
Thread.sleep((long) (2000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");
cb.await();//到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行

System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "完结");
Thread.sleep((long) (2000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");
cb.await();    //这里CyclicBarrier对象又可以重用

System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "完结");
Thread.sleep((long) (2000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");
cb.await();

System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "完结");
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
package com;

import java.util.Scanner;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest2 {

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
final CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("我是CyclicBarrier,优先执行...");
}
});

Scanner scanner = new Scanner(System.in);
while (true) {
String in = scanner.next();
if ("1".equals(in)) {

new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("线程===" + Thread.currentThread().getId() + "|" + Thread.currentThread().getName());
System.out.println("getNumberWaiting===" + cyclicBarrier.getNumberWaiting());
System.out.println("getParties===" + cyclicBarrier.getParties());
}
}).start();

} else if ("p".equals(in)) {
System.out.println();
System.out.println("getNumberWaiting===" + cyclicBarrier.getNumberWaiting());
System.out.println("getParties===" + cyclicBarrier.getParties());
} else if ("3".equals(in)) {
break;
} else if ("2".equals(in)) {
cyclicBarrier.reset();
}
}
System.out.println("输入线程执行结束===" + cyclicBarrier);

System.out.println("main done");
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: