您的位置:首页 > 编程语言 > Java开发

JAVA中的并发工具类(二)---同步屏障类CyclicBarrier

2016-08-11 18:49 405 查看
本文涉及到的源码可以在我的github找到。

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障点时均被阻塞,只要规定数目的线程都到达该处,然后同时放行。

应用场景:多线程计算数据,最后合并结果的场景。例如,用一个Excel保存用户所有银行流水,每个sheet保存一个账户近一年的每笔流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet,都执行完之后,得到每个sheet的日均流水,最后,再用这些计算结果进行汇总计算。

首先是一个比较容易理解的Demo:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierDemo
{

public static void main(String[] args)
{
ExecutorService pool = Executors.newFixedThreadPool(3);

CyclicBarrier barrier = new CyclicBarrier(3);

for (int i = 0; i < 3; i++)
{
Runnable runable = new Runnable()
{

@Override
public void run()
{
try
{
// TODO Auto-generated method stub
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + "已经到达地点AAAA,当前到达数:"
+ (barrier.getNumberWaiting() + 1)
+ (barrier.getNumberWaiting() == 2 ? "  全部到达,继续走。" : "等待伙伴..."));
barrier.await();

Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + "已经到达地点BBBB,当前到达数:"
+ (barrier.getNumberWaiting() + 1)
+ (barrier.getNumberWaiting() == 2 ? "  全部到达,继续走。" : "等待伙伴..."));
barrier.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + "已经到达地点CCCC,当前到达数:"
+ (barrier.getNumberWaiting() + 1)
+ (barrier.getNumberWaiting() == 2 ? "  全部到达,继续走。" : "等待伙伴..."));
barrier.await();
} catch (Exception e)
{
e.printStackTrace();
}
}
};

pool.execute(runable);

}
pool.shutdown();
}
}


运行结果如下:

pool-1-thread-3已经到达地点AAAA,当前到达数:1等待伙伴...
pool-1-thread-1已经到达地点AAAA,当前到达数:2等待伙伴...
pool-1-thread-2已经到达地点AAAA,当前到达数:3  全部到达,继续走。
pool-1-thread-3已经到达地点BBBB,当前到达数:1等待伙伴...
pool-1-thread-2已经到达地点BBBB,当前到达数:2等待伙伴...
pool-1-thread-1已经到达地点BBBB,当前到达数:3  全部到达,继续走。
pool-1-thread-3已经到达地点CCCC,当前到达数:1等待伙伴...
pool-1-thread-1已经到达地点CCCC,当前到达数:2等待伙伴...
pool-1-thread-2已经到达地点CCCC,当前到达数:3  全部到达,继续走。


下面是一个应用场景:

import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BankWaterService implements Runnable
{

// 屏障
private CyclicBarrier barrier = new CyclicBarrier(4);
// 线程池
private ExecutorService pool = Executors.newFixedThreadPool(4);
// 保存结果的map
private ConcurrentHashMap<String, Integer> order = new ConcurrentHashMap<>();

private void count()
{
for (int i = 0; i < 4; i++)
{
Runnable action = new Runnable()
{

@Override
public void run()
{
// TODO Auto-generated method stub
order.put(Thread.currentThread().getName(), 4);// 模拟计算出的结果放入map
try
{
barrier.await();
} catch (Exception e)
{
// TODO: handle exception
}
}
};

pool.execute(action);

}

pool.shutdown();
}

@Override
public void run()
{
int result = 0;
for (Entry<String, Integer> entry : order.entrySet())
{
result += entry.getValue();// 新开启线程对结果进行统计
}

System.out.println("总结果:"+result);
}

public static void main(String[] args)
{
BankWaterService service = new BankWaterService();
service.count();
Thread thread = new Thread(service);

try
{
Thread.sleep(10000);//使得线程充分运行。
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}

thread.start();
}

}


结果:

总结果:16
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 多线程