JAVA中的并发工具类(二)---同步屏障类CyclicBarrier
2016-08-11 18:49
405 查看
本文涉及到的源码可以在我的github找到。
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障点时均被阻塞,只要规定数目的线程都到达该处,然后同时放行。
应用场景:多线程计算数据,最后合并结果的场景。例如,用一个Excel保存用户所有银行流水,每个sheet保存一个账户近一年的每笔流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet,都执行完之后,得到每个sheet的日均流水,最后,再用这些计算结果进行汇总计算。
首先是一个比较容易理解的Demo:
运行结果如下:
下面是一个应用场景:
结果:
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障点时均被阻塞,只要规定数目的线程都到达该处,然后同时放行。
应用场景:多线程计算数据,最后合并结果的场景。例如,用一个Excel保存用户所有银行流水,每个sheet保存一个账户近一年的每笔流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet,都执行完之后,得到每个sheet的日均流水,最后,再用这些计算结果进行汇总计算。
首先是一个比较容易理解的Demo:
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierDemo { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(3); CyclicBarrier barrier = new CyclicBarrier(3); for (int i = 0; i < 3; i++) { Runnable runable = new Runnable() { @Override public void run() { try { // TODO Auto-generated method stub Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + "已经到达地点AAAA,当前到达数:" + (barrier.getNumberWaiting() + 1) + (barrier.getNumberWaiting() == 2 ? " 全部到达,继续走。" : "等待伙伴...")); barrier.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + "已经到达地点BBBB,当前到达数:" + (barrier.getNumberWaiting() + 1) + (barrier.getNumberWaiting() == 2 ? " 全部到达,继续走。" : "等待伙伴...")); barrier.await(); Thread.sleep((long) (Math.random() * 10000)); System.out.println(Thread.currentThread().getName() + "已经到达地点CCCC,当前到达数:" + (barrier.getNumberWaiting() + 1) + (barrier.getNumberWaiting() == 2 ? " 全部到达,继续走。" : "等待伙伴...")); barrier.await(); } catch (Exception e) { e.printStackTrace(); } } }; pool.execute(runable); } pool.shutdown(); } }
运行结果如下:
pool-1-thread-3已经到达地点AAAA,当前到达数:1等待伙伴... pool-1-thread-1已经到达地点AAAA,当前到达数:2等待伙伴... pool-1-thread-2已经到达地点AAAA,当前到达数:3 全部到达,继续走。 pool-1-thread-3已经到达地点BBBB,当前到达数:1等待伙伴... pool-1-thread-2已经到达地点BBBB,当前到达数:2等待伙伴... pool-1-thread-1已经到达地点BBBB,当前到达数:3 全部到达,继续走。 pool-1-thread-3已经到达地点CCCC,当前到达数:1等待伙伴... pool-1-thread-1已经到达地点CCCC,当前到达数:2等待伙伴... pool-1-thread-2已经到达地点CCCC,当前到达数:3 全部到达,继续走。
下面是一个应用场景:
import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class BankWaterService implements Runnable { // 屏障 private CyclicBarrier barrier = new CyclicBarrier(4); // 线程池 private ExecutorService pool = Executors.newFixedThreadPool(4); // 保存结果的map private ConcurrentHashMap<String, Integer> order = new ConcurrentHashMap<>(); private void count() { for (int i = 0; i < 4; i++) { Runnable action = new Runnable() { @Override public void run() { // TODO Auto-generated method stub order.put(Thread.currentThread().getName(), 4);// 模拟计算出的结果放入map try { barrier.await(); } catch (Exception e) { // TODO: handle exception } } }; pool.execute(action); } pool.shutdown(); } @Override public void run() { int result = 0; for (Entry<String, Integer> entry : order.entrySet()) { result += entry.getValue();// 新开启线程对结果进行统计 } System.out.println("总结果:"+result); } public static void main(String[] args) { BankWaterService service = new BankWaterService(); service.count(); Thread thread = new Thread(service); try { Thread.sleep(10000);//使得线程充分运行。 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } thread.start(); } }
结果:
总结果:16
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- Python3写爬虫(四)多线程实现数据爬取
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序