您的位置:首页 > 编程语言

并发编程实战手册-线程同步辅助类之CyclicBarrier

2015-09-01 17:24 369 查看
CyclicBarrier
是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

方法摘要
 int
await()

          在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
 int
await(long timeout, TimeUnit unit)

          在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
 int
getNumberWaiting()

          返回当前在屏障处等待的参与者数目。
 int
getParties()
          返回要求启动此 barrier 的参与者数目。
 boolean
isBroken()

          查询此屏障是否处于损坏状态。
 void
reset()
          将屏障重置为其初始状态。
下面的例子是在一个矩阵中查找指定的数字:
MatrixMock.java
import java.util.Random;
public class MatrixMock {
private int data[][];

public MatrixMock(int size,int length,int number){
int counter=0;
data=new int[size][length];
Random random=new Random();
for(int i=0;i<size;i++){
for(int j=0;j<length;j++){
data[i][j]=random.nextInt(10);
if(data[i][j]==number){
counter++;
}
}
}

System.out.println("MOCK: There are "+counter +" ocurrences of numbers in mock!");
}

public int[] getRow(int row){
if((row>=0)&&(row<data.length)){
return data[row];
}
return null;
}
}


Results.java
public class Results {
private int data[];

public Results(int size){
data=new int[size];
}

public int[] getData() {
return data;
}

public void setData(int position,int value) {
data[position] = value;
}

}



Searcher.java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Searcher implements Runnable{

private int firstRow;
private int lastRow;
private MatrixMock matrixMock;
private Results results;
private int number;
private final CyclicBarrier barrier;

public Searcher(int firstRow,int lastRow,MatrixMock matrixMock,Results results,int number,CyclicBarrier barrier){
this.firstRow=firstRow;
this.lastRow=lastRow;
this.matrixMock=matrixMock;
this.results=results;
this.number=number;
this.barrier=barrier;
}

@Override
public void run() {
int counter;
System.out.println(Thread.currentThread().getName()+"processing line from "+firstRow+" to "+lastRow);
for(int i=firstRow;i<lastRow;i++){
//获取二维数组的某一行
int row[]=matrixMock.getRow(i);
counter=0;
//在特定行中查找值
for(int j=0;j<row.length;j++){
if(row[j]==number){
counter++;
}
}
results.setData(i, counter);
}
System.out.println(Thread.currentThread().getName()+"processed!");

try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}


Grouper.java

public class Grouper implements Runnable{
private Results results;

public Grouper(Results results){
this.results=results;
}

@Override
public void run() {
int sum=0;
int data[]=results.getData();
for(int number:data){
sum+=number;
}
System.out.println("Grouper: Total result: "+sum);
}
}

Main.java
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Main {
public static void main(String[] args) {
final int ROWS=10000;
final int NUMBERS=10000;
final int SEARCHER=5;
final int THREADS=5;

//初始化二维数组
MatrixMock matrixMock=new MatrixMock(ROWS, NUMBERS, SEARCHER);
//初始化结果集,results的大小是和二维数组的总行数对应的,results[i]的值即为二维数组matrixMock的第i中要查找值的个数。
Results results=new Results(10000);
//初始化住线程
Grouper grouper=new Grouper(results);
//创建一个新的 CyclicBarrier,在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作即grouper
CyclicBarrier barrier=new CyclicBarrier(THREADS, grouper);
//初始化线程池
Executor exe=Executors.newFixedThreadPool(THREADS);
Searcher []searchers=new Searcher[THREADS];
for(int i=0;i<THREADS;i++){
//在不同的线程中进行统计
searchers[i]=new Searcher(i*2000, (i*2000)+2000, matrixMock, results, SEARCHER, barrier);
exe.execute(searchers[i]);
}
System.out.println("Main: The main thread has finised!");
}
}

执行结果如图:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: