学会使用线程的并发工具类(ForkJoin、CountdownLatch、CyclicBarrier、Semaphore、Exchange)
线程的并发工具类
ForkJoin
工作原理
工作密取
即当前线程的Task已经全被执行完毕,则自动取到其他线程的Task池中取出Task继续执行。
ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。
使用范式
我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。
- RecursiveAction,用于没有返回结果的任务。
- RecursiveTask< V >,用于有返回值的任务。
ForkjoinTask要通过ForkJoinPool来执行,使用submit 或 invoke 提交。
两者的区别是:
- invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码。
- submit是异步执行。
join和get方法当任务完成的时候返回计算结果。
在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。
示例如下:
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * ForkJoin执行累加 */ public class SumArray { private static class SumTask extends RecursiveTask<Integer>{ /*阈值*/ private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10; private int[] src; private int fromIndex; private int toIndex; public SumTask(int[] src, int fromIndex, int toIndex) { this.src = src; this.fromIndex = fromIndex; this.toIndex = toIndex; } @Override protected Integer compute() { //任务的大小是否合适 if (toIndex - fromIndex < THRESHOLD){ //合适,做我们自己的工作 System.out.println(" from index = "+fromIndex +" toIndex="+toIndex); int count = 0; for(int i= fromIndex;i<=toIndex;i++){ count = count + src[i]; } //提交 return count; }else{ //不满足,拆分任务,这里数量为2个 int mid = (fromIndex+toIndex)/2; SumTask left = new SumTask(src,fromIndex,mid); SumTask right = new SumTask(src,mid+1,toIndex); //提交任务ForkJoinPool invokeAll(left,right); //任务完成返回计算结果 return left.join()+right.join(); } } } public static void main(String[] args) { int[] src = MakeArray.makeArray(); //创建ForkJoin池 ForkJoinPool pool = new ForkJoinPool(); //创建任务 SumTask innerFind = new SumTask(src,0,src.length-1); long start = System.currentTimeMillis(); //异步提交任务 pool.submit(innerFind); //同步提交任务 //pool.invoke(innerFind); System.out.println("Task is Running....."); System.out.println("The count is "+innerFind.join() +" spend time:"+(System.currentTimeMillis()-start)+"ms"); } }
import java.util.Random; public class MakeArray { //数组长度 public static final int ARRAY_LENGTH = 4000000; public static int[] makeArray() { //new一个随机数发生器 Random r = new Random(); int[] result = new int[ARRAY_LENGTH]; for(int i=0;i<ARRAY_LENGTH;i++){ //用随机数填充数组 result[i] = r.nextInt(ARRAY_LENGTH*3); } return result; } }
结果:
Task is Running..... from index = 0 toIndex=249999 from index = 1000000 toIndex=1249999 from index = 3000000 toIndex=3249999 from index = 2000000 toIndex=2249999 from index = 500000 toIndex=749999 from index = 1500000 toIndex=1749999 from index = 3250000 toIndex=3499999 from index = 2250000 toIndex=2499999 from index = 1250000 toIndex=1499999 from index = 250000 toIndex=499999 from index = 1750000 toIndex=1999999 from index = 750000 toIndex=999999 from index = 2750000 toIndex=2999999 from index = 2500000 toIndex=2749999 from index = 3500000 toIndex=3749999 from index = 3750000 toIndex=3999999 The count is -951988472 spend time:25ms
若改为同步提交,结果:
from index = 0 toIndex=249999 from index = 2000000 toIndex=2249999 from index = 3000000 toIndex=3249999 from index = 500000 toIndex=749999 from index = 1000000 toIndex=1249999 from index = 1500000 toIndex=1749999 from index = 250000 toIndex=499999 from index = 2250000 toIndex=2499999 from index = 1250000 toIndex=1499999 from index = 750000 toIndex=999999 from index = 1750000 toIndex=1999999 from index = 3250000 toIndex=3499999 from index = 3500000 toIndex=3749999 from index = 3750000 toIndex=3999999 from index = 2750000 toIndex=2999999 from index = 2500000 toIndex=2749999 Task is Running..... The count is -1751947229 spend time:14ms
CountDownLatch
闭锁,CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
工作原理
CountDownLatch是通过一个计数器来实现的,计数器的初始值为初始任务的数量。每当完成了一个任务后,计数器的值就会减1(CountDownLatch.countDown()方法)。当计数器值到达0时,它表示所有的已经完成了任务,然后在闭锁上等待CountDownLatch.await()方法的线程就可以恢复执行任务。
示例:
import java.util.concurrent.CountDownLatch; /** *类说明:演示CountDownLatch用法, * 共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行 */ public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); /*初始化线程*/ private static class InitThread implements Runnable{ public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown(); for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } /*业务线程等待latch的计数器为0完成*/ private static class BusiThread implements Runnable{ public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown(); System.out.println("begin step 2nd......."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown(); }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } //主线程等待 latch.await(); System.out.println("Main do work........"); } }
结果:
Thread_14 ready init work...... Thread_17 ready init work...... Thread_17 ........continue do its work Thread_17 ........continue do its work Thread_15 ready init work...... Thread_16 ready init work...... Thread_15 ........continue do its work Thread_14 ........continue do its work Thread_15 ........continue do its work Thread_16 ........continue do its work Thread_16 ........continue do its work Thread_14 ........continue do its work Thread_12 ready init work step 1st...... begin step 2nd....... Thread_12 ready init work step 2nd...... Main do work........ BusiThread_13 do business----- BusiThread_13 do business----- BusiThread_13 do business-----
应用场景
实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了,例如处理excel中多个表单。
CyclicBarrier
工作原理
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
示例:
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; /** *类说明:演示CyclicBarrier用法,共4个子线程,他们全部完成工作后,交出自己结果, *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串 */ public class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(4,new CollectThread()); //存放子线程工作结果的容器 private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>(); /*汇总的任务*/ private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } /*相互等待的子线程*/ private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId(); resultMap.put(id+"",id); try { Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do something "); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); //可以多次调用 barrier.await(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { for(int i=0;i<4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } } }
结果:
Thread_12 ....do something Thread_13 ....do something Thread_14 ....do something Thread_15 ....do something the result = [12][13][14][15] do other business........ Thread_12 ....do its business Thread_13 ....do its business Thread_14 ....do its business Thread_15 ....do its business the result = [12][13][14][15] do other business........
应用场景
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
CountDownLatch和CyclicBarrier辨析
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以反复使用。
CountDownLatch.await一般阻塞工作线程,所有的进行预备工作的线程执行countDown,而CyclicBarrier通过工作线程调用await从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。
在控制多个线程同时运行上,CountDownLatch可以不限线程数量,而CyclicBarrier是固定线程数。
同时,CyclicBarrier还可以提供一个barrierAction,合并多线程计算结果。
Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
应用场景
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。。Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire()方法尝试获取许可证。
示例:
import java.sql.Connection; import java.util.LinkedList; import java.util.concurrent.Semaphore; /** *类说明:演示Semaphore用法,一个数据库连接池的实现 */ public class DBPoolSemaphore { private final static int POOL_SIZE = 10; //两个指示器,分别表示池子还有可用连接和已用连接 private final Semaphore useful,useless; //存放数据库连接的容器 private static LinkedList<Connection> pool = new LinkedList<Connection>(); //初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } public DBPoolSemaphore() { this.useful = new Semaphore(10); this.useless = new Semaphore(0); } /*归还连接*/ public void returnConnect(Connection connection) throws InterruptedException { if(connection!=null) { System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!" +"可用连接数:"+useful.availablePermits()); useless.acquire(); synchronized (pool) { pool.addLast(connection); } useful.release(); } } /*从池子拿连接*/ public Connection takeConnect() throws InterruptedException { useful.acquire(); Connection connection; synchronized (pool) { connection = pool.removeFirst(); } useless.release(); return connection; } }
import java.sql.*; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; /** *类说明:数据库连接的平庸实现 */ public class SqlConnectImpl implements Connection{ /*拿一个数据库连接*/ public static final Connection fetchConnection(){ return new SqlConnectImpl(); } //省略重写的方法 }
import java.sql.Connection; import java.util.Random; /** *类说明:测试数据库连接池 */ public class AppTest { private static DBPoolSemaphore dbPool = new DBPoolSemaphore(); private static class BusiThread extends Thread{ @Override public void run() { Random r = new Random();//让每个线程持有连接的时间不一样 long start = System.currentTimeMillis(); try { Connection connect = dbPool.takeConnect(); System.out.println("Thread_"+Thread.currentThread().getId() +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms."); //模拟业务操作,线程持有连接查询数据 Thread.sleep(2000+r.nextInt(1000)); System.out.println("查询数据完成,归还连接!"); dbPool.returnConnect(connect); } catch (InterruptedException e) { } } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { Thread thread = new BusiThread(); thread.start(); } } }
其他方法
•intavailablePermits():返回此信号量中当前可用的许可证数。
•intgetQueueLength():返回正在等待获取许可证的线程数。
•booleanhasQueuedThreads():是否有线程正在等待获取许可证。
•void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
•Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。
Exchange
Exchange(交换)是一个用于线程间协作的工具类。Exchange用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
示例:
import java.util.HashSet; import java.util.Set; import java.util.concurrent.Exchanger; /** *类说明:演示Exchange用法 */ public class UseExchange { private static final Exchanger<Set<String>> exchange = new Exchanger<>(); public static void main(String[] args) { new Thread(() -> { Set<String> setA = new HashSet<>();//存放数据的容器 try { setA.add("A1"); setA.add("A2"); setA = exchange.exchange(setA);//交换set /*处理交换后的数据*/ } catch (InterruptedException e) { } for (String a:setA) { System.out.println(a + "in setA"); } }).start(); new Thread(() -> { Set<String> setB = new HashSet<>();//存放数据的容器 try { setB.add("B1"); setB.add("B2"); setB = exchange.exchange(setB);//交换set /*处理交换后的数据*/ } catch (InterruptedException e) { } for (String b:setB) { System.out.println(b + "in setB"); } }).start(); } }
结果:
B2in setA B1in setA A1in setB A2in setB
Callable< V >、Future< V >和FutureTask< V >
Runnable是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
我们通过一个线程运行Callable,但是Thread不支持构造方法中传递Callable的实例,所以我们需要通过FutureTask把一个Callable包装成Runnable,然后再通过这个FutureTask拿到Callable运行后的返回值。
要new一个FutureTask的实例,有两种方法:
示例:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** *类说明:演示Future等的使用 */ public class UseFuture { /*实现Callable接口,允许有返回值*/ private static class UseCallable implements Callable<Integer>{ private int sum; @Override public Integer call() { System.out.println("Callable子线程开始计算!"); for(int i=0 ;i<5000;i++){ if(Thread.currentThread().isInterrupted()) { System.out.println("Callable子线程计算任务中断!"); return null; } sum=sum+i; System.out.println("sum="+sum); } System.out.println("Callable子线程计算结束!结果为: "+sum); return sum; } } public static void main(String[] args) throws InterruptedException, ExecutionException { UseCallable useCallable = new UseCallable(); //包装 FutureTask<Integer> futureTask = new FutureTask<>(useCallable); Random r = new Random(); new Thread(futureTask).start(); Thread.sleep(1); System.out.println("Get UseCallable result = "+futureTask.get()); // System.out.println("Cancel................. "); // futureTask.cancel(true); } }
结果:
- Java并发编程之2——同步工具类的使用(CountDownLatch,CyclicBarrier,BlockungQueue,Semaphore)
- Java多线程—JAVA中并发的工具类CountDownLatch、CyclicBarrier、Semaphore、Exchanger
- java并发编程2.2并发工具类——CountDownLatch和CyclicBarrier使用及比较
- Java线程知识__其他几种线程同步的工具类的使用(CyclicBarrier,CountDownLatch,Exchanger)
- 并发工具类CountDownLatch、CyclicBarrier、Semaphore、Exchanger详解
- 线程并发工具类之CountDownLatch的使用及原理分析
- Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger
- Java线程(CountDownLatch、CyclicBarrier、Semaphore)并发控制工具类
- Java中的并发工具类--CountDownLatch、CyclicBarrier、Semaphore和Exchanger
- Java/Android多线程并发、同步,线程之间通信,主、子线程的一些问题(CountDownLatch、CyclicBarrier和Semaphore)
- Java中的并发工具类:CountDownLatch、CyclicBarrier和Semaphore
- [Java并发]使用CountDownLatch和CyclicBarrier等待多线程完成
- java高并发之CountDownLatch,CyclicBarrier和join
- 并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)
- Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger
- 线程执行顺序——CountDownLatch、CyclicBarrier 、join()、线程池
- 四个并发工具类CountDownLatch,CyclicBarrier,Semaphore,Exchanger
- 使用Java辅助类(CountDownLatch、CyclicBarrier、Semaphore)并发编程
- java 并发工具类CountDownLatch & CyclicBarrier