您的位置:首页 > 编程语言 > Java开发

Java:多线程等待所有线程结束(CountDownLatch/CyclicBarrier) .

2013-11-29 10:40 926 查看
本文主要是参考官方文档做一学习用途。

官方链接:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/CountDownLatch.html

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/CyclicBarrier.html


多线程设计过程中,经常会遇到需要等待其它线程结束以后再做其他事情的情况,比如多线程下载文件,每个线程都会下载文件的一部分,在所有线程结束以后,需要将各部分再次拼接成一个完整的文件。

有几种方案:

1.在主线程中设置一自定义全局计数标志,在工作线程完成时,计数减一。主线程侦测该标志是否为0,一旦为0,表示所有工作线程已经完成。

2.使用Java标准的类CountDownLatch来完成这项工作,原理是一样的,计数。

CountDownLatch

CountDownLatch 初始化设置count,即等待(await)count个线程或一个线程count次计数,通过工作线程来countDown计数减一,直到计数为0,await阻塞结束。

设置的count不可更改,如需要动态设置计数的线程数,可以使用CyclicBarrier.

下面的例子,所有的工作线程中准备就绪以后,并不是直接运行,而是等待主线程的信号后再执行具体的操作。
package com.example.multithread;  
  
import java.util.concurrent.CountDownLatch;  
  
class Driver  
{  
    private static final int TOTAL_THREADS = 10;  
    private final CountDownLatch mStartSignal = new CountDownLatch(1);  
    private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
  
    void main()  
    {  
        for (int i = 0; i < TOTAL_THREADS; i++)  
        {  
            new Thread(new Worker(mStartSignal, mDoneSignal, i)).start();  
        }  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
        doPrepareWork();// 准备工作   
        mStartSignal.countDown();// 计数减一为0,工作线程真正启动具体操作   
        doSomethingElse();//做点自己的事情   
        try  
        {  
            mDoneSignal.await();// 等待所有工作线程结束   
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
        System.out.println("All workers have finished now.");  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
    }  
  
    void doPrepareWork()  
    {  
        System.out.println("Ready,GO!");  
    }  
  
    void doSomethingElse()  
    {  
        for (int i = 0; i < 100000; i++)  
        {  
            ;// delay   
        }  
        System.out.println("Main Thread Do something else.");  
    }  
}  
  
class Worker implements Runnable  
{  
    private final CountDownLatch mStartSignal;  
    private final CountDownLatch mDoneSignal;  
    private final int mThreadIndex;  
  
    Worker(final CountDownLatch startSignal, final CountDownLatch doneSignal,  
            final int threadIndex)  
    {  
        this.mDoneSignal = doneSignal;  
        this.mStartSignal = startSignal;  
        this.mThreadIndex = threadIndex;  
    }  
  
    @Override  
    public void run()  
    {  
        // TODO Auto-generated method stub   
        try  
        {  
            mStartSignal.await();// 阻塞,等待mStartSignal计数为0运行后面的代码   
                                    // 所有的工作线程都在等待同一个启动的命令   
            doWork();// 具体操作   
            System.out.println("Thread " + mThreadIndex + " Done Now:"  
                    + System.currentTimeMillis());  
            mDoneSignal.countDown();// 完成以后计数减一   
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
    }  
  
    public void doWork()  
    {  
        for (int i = 0; i < 1000000; i++)  
        {  
            ;// 耗时操作   
        }  
        System.out.println("Thread " + mThreadIndex + ":do work");  
    }  
}  
  
public class CountDownLatchTest  
{  
    public static void main(String[] args)  
    {  
        // TODO Auto-generated method stub   
        new Driver().main();  
    }  
  
}  

通过Executor启动线程:
class CountDownLatchDriver2  
{  
    private static final int TOTAL_THREADS = 10;  
    private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  

  
    void main()  
    {  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
        doPrepareWork();// 准备工作   
  
        Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS);  
        for (int i = 0; i < TOTAL_THREADS; i++)  
        {  
            // 通过内建的线程池维护创建的线程   
            executor.execute(new RunnableWorker(mDoneSignal, i));  
        }  
        doSomethingElse();// 做点自己的事情   
        try  
        {  
            mDoneSignal.await();// 等待所有工作线程结束   
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
        System.out.println("All workers have finished now.");  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
    }  
  
    void doPrepareWork()  
    {  
        System.out.println("Ready,GO!");  
    }  
  
    void doSomethingElse()  
    {  
        for (int i = 0; i < 100000; i++)  
        {  
            ;// delay   
        }  
        System.out.println("Main Thread Do something else.");  
    }  
}  
  
class RunnableWorker implements Runnable  
{  
  
    private final CountDownLatch mDoneSignal;  
    private final int mThreadIndex;  
  
    RunnableWorker(final CountDownLatch doneSignal, final int threadIndex)  
    {  
        this.mDoneSignal = doneSignal;  
        this.mThreadIndex = threadIndex;  
    }  
  
    @Override  
    public void run()  
    {  
        // TODO Auto-generated method stub   
  
        doWork();// 具体操作   
        System.out.println("Thread " + mThreadIndex + " Done Now:"  
                + System.currentTimeMillis());  
        mDoneSignal.countDown();// 完成以后计数减一   
                                // 计数为0时,主线程接触阻塞,继续执行其他任务   
        try  
        {  
            // 可以继续做点其他的事情,与主线程无关了   
            Thread.sleep(5000);  
            System.out.println("Thread " + mThreadIndex  
                    + " Do something else after notifing main thread");  
  
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
  
    }  
  
    public void doWork()  
    {  
        for (int i = 0; i < 1000000; i++)  
        {  
            ;// 耗时操作   
        }  
        System.out.println("Thread " + mThreadIndex + ":do work");  
    }  
}  

输出:
Main Thread Now:1359959480786
Ready,GO!
Thread 0:do work
Thread 0 Done Now:1359959480808
Thread 1:do work
Thread 1 Done Now:1359959480811
Thread 2:do work
Thread 2 Done Now:1359959480813
Main Thread Do something else.
Thread 3:do work
Thread 3 Done Now:1359959480825
Thread 5:do work
Thread 5 Done Now:1359959480827
Thread 7:do work
Thread 7 Done Now:1359959480829
Thread 9:do work
Thread 9 Done Now:1359959480831
Thread 4:do work
Thread 4 Done Now:1359959480833
Thread 6:do work
Thread 6 Done Now:1359959480835
Thread 8:do work
Thread 8 Done Now:1359959480837
All workers have finished now.
Main Thread Now:1359959480838
Thread 0 Do something else after notifing main thread
Thread 1 Do something else after notifing main thread
Thread 2 Do something else after notifing main thread
Thread 3 Do something else after notifing main thread
Thread 9 Do something else after notifing main thread
Thread 7 Do something else after notifing main thread
Thread 5 Do something else after notifing main thread
Thread 4 Do something else after notifing main thread
Thread 6 Do something else after notifing main thread
Thread 8 Do something else after notifing main thread

CyclicBarrier

使用CyclickBarrier的例子:

class WalkTarget  
{  
    private final int mCount = 5;  
    private final CyclicBarrier mBarrier;  
    ExecutorService mExecutor;  
  
    class BarrierAction implements Runnable  
    {  
        @Override  
        public void run()  
        {  
            // TODO Auto-generated method stub   
            System.out.println("所有线程都已经完成任务,计数达到预设值");  
            //mBarrier.reset();//恢复到初始化状态          
              
        }  
    }  
  
    WalkTarget()  
    {  
        //初始化CyclicBarrier   
        mBarrier = new CyclicBarrier(mCount, new BarrierAction());  
        mExecutor = Executors.newFixedThreadPool(mCount);  
  
        for (int i = 0; i < mCount; i++)  
        {  
            //启动工作线程   
            mExecutor.execute(new Walker(mBarrier, i));  
        }  
    }  
}  
  
//工作线程   
class Walker implements Runnable  
{  
    private final CyclicBarrier mBarrier;  
    private final int mThreadIndex;  
  
    Walker(final CyclicBarrier barrier, final int threadIndex)  

    {  
        mBarrier = barrier;  
        mThreadIndex = threadIndex;  
    }  
  
    @Override  
    public void run()  
    {  
        // TODO Auto-generated method stub   
        System.out.println("Thread " + mThreadIndex + " is running...");  
        // 执行任务   
        try  
        {  
            TimeUnit.MILLISECONDS.sleep(5000);  
            // do task   
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
  
        // 完成任务以后,等待其他线程完成任务   
        try  
        {  
            mBarrier.await();  
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
        catch (BrokenBarrierException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
        // 其他线程任务都完成以后,阻塞解除,可以继续接下来的任务   
        System.out.println("Thread " + mThreadIndex + " do something else");  
    }  
  
}  
  
public class CountDownLatchTest  
{  
    public static void main(String[] args)  
    {  
        // TODO Auto-generated method stub   
        //new CountDownLatchDriver2().main();   
        new WalkTarget();  
    }  
  
}  

输出(注意,只有所有的线程barrier.await之后才能继续执行其他的操作):
Thread 0 is running... Thread 2 is running... Thread 3 is running... Thread 1 is running... Thread 4 is running... 所有线程都已经完成任务,计数达到预设值 Thread 4 do something else Thread 0 do something else Thread 2 do something else Thread 3 do something else Thread 1 do something else


CountDownLatch和CyclicBarrier简单比较:
 
CountDownLatch
CyclicBarrier
软件包
java.util.concurrent
java.util.concurrent
适用情景
主线程等待多个工作线程结束
多个线程之间互相等待,直到所有线程达到一个障碍点(Barrier point)
主要方法
CountDownLatch(int count) (主线程调用)

初始化计数

CountDownLatch.await (主线程调用)

阻塞,直到等待计数为0解除阻塞

CountDownLatch.countDown

计数减一(工作线程调用)
CyclicBarrier(int parties, Runnable barrierAction) //初始化参与者数量和障碍点执行Action,Action可选。由主线程初始化

CyclicBarrier.await() //由参与者调用

阻塞,直到所有线程达到屏障点
等待结束
各线程之间不再互相影响,可以继续做自己的事情。不再执行下一个目标工作。
在屏障点达到后,允许所有线程继续执行,达到下一个目标。可以重复使用CyclicBarrier
异常
 
如果其中一个线程由于中断,错误,或超时导致永久离开屏障点,其他线程也将抛出异常。
其他
 
如果BarrierAction不依赖于任何Party中的所有线程,那么在任何party中的一个线程被释放的时候,可以直接运行这个Action。

If(barrier.await()==2)

{

//do action

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: