您的位置:首页 > 编程语言 > Java开发

java 并发编程 CountDownLatch、CyclicBarrier和 Semaphore

2018-01-22 18:04 796 查看

1   简单介绍

在java1.5中,java.util.concurrent提供了一些辅助类帮助我们进行编发编程,本文主要介绍CountDownLatch、CyclicBarrier和 Semaphore的使用和区别。

2  CountDownLatch说明和用法

2.1     说明

CountDownLatch类位于java.util.concurrent包下,利用他可以实现“计数器”的功能。比如,有一个任务A,他必须要等待其他4个任务执行完之后,他才能执行,就可以利用CountDownLatch来实现。

2.2      CountDownLatch中的方法

CountDownLatch的构造方法

//参数count为计数值
public CountDownLatch(int count) {
        if (count < 0)thrownew
IllegalArgumentException("count < 0");
        this.sync =new
Sync(count);
    }

 

CountDownLatch中的常用重要方法

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
publicvoid await()throws InterruptedException
{
        sync.acquireSharedInterruptibly(1);
    }

//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
publicboolean await(long timeout, TimeUnit
unit)
        throws InterruptedException {
        returnsync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

//将count值减1
publicvoid countDown() {
        sync.releaseShared(1);
    }

 

2.3    实例

publicclass CountDownLatchDemo
{
 
    privatestaticfinalintCOUNT
= 5;  
    privatestatic CountDownLatchcountDownLatch;  
   
    publicstaticvoid main( String[] args )
    {
        try
        {
            countDownLatch =new CountDownLatch(COUNT
);          
            for(int i = 0; i < 6;i++){
                new Thread(new Runnable()
                {
                    @Override
                    publicvoid run()
                    {
                        try
                        {
                            Thread.sleep( 5000 );
                        } catch ( Exception e )
                        {
                            e.printStackTrace();
                        }                        
                        System.out.println(Thread.currentThread().getName() +"sleep 1000ms");
                        countDownLatch.countDown();                      
                    }
                }).start();               
            }           
            System.out.println("main wait begin...");          
            countDownLatch.await();           
            System.out.println("main wait end...");          
        } catch ( Exception e )
        {
            e.printStackTrace();
        }            
    }
}

 

运行结果:

main wait begin...
Thread-0sleep 1000ms
Thread-2sleep 1000ms
Thread-4sleep 1000ms
Thread-5sleep 1000ms
Thread-1sleep 1000ms
Thread-3sleep 1000ms
main wait end...
 

 

3  CyclicBarrier说明和用法

3.1     说明

CyclicBarrier:回环栅栏。通过它可以实现让一组线程互相等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放后,CyclicBarrier可以被重用。当调用 await()方法之后,线程就处于barrier了。

3.2     CyclicBarrier中的方法

CyclicBarrier类位于java.util.concurrent包下。

构造方法:

//参数parties指让多少个线程或者任务等待至barrier状态
public CyclicBarrier(int parties) {
        this(parties,null);
    }

//参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0)thrownew
IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

 

 

 

常用方法:

最重要的方法就是await()方法了,他有2个重载版本:

publicintawait()throws
InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            thrownew Error(toe);// cannot happen;
        }
    }

publicint await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

 

第一个版本比较常用,用来挂起当前线程,直到所有线程都到达barrier状态再同时执行后续任务;

第二个版本,让这些线程等待至一定时间,如果还有线程没有到达barrier状态,就直接让到达barrier的线程执行后续任务。

3.3     实例

publicclass CyclicBarrierDemo
{
    privatestaticintSIZE
= 5; 
    privatestatic CyclicBarriercBarrier;
    publicstaticvoid main( String[] args )
    {
       cBarrier =new CyclicBarrier(SIZE);      
       //新建5个线程
       for (int i = 0; i <SIZE;
i++ ) {
           new Thread(new Runnable(){
            @Override
            publicvoid run()
            {
                try
                {
                    System.out.println(Thread.currentThread().getName() +"正在写入数据...");
                    Thread.sleep( 2000 );
                    System.out.println(Thread.currentThread().getName() +"写入数据完毕...");
                    //将cBarrier的参与量+1
                    cBarrier.await();
                    System.out.println(Thread.currentThread().getName() +" continued.");                  
                } catch ( Exception e )
                {
                    e.printStackTrace();
                }                                                      
            }
        }).start();       
       }
    }
}

运行结果:

Thread-2正在写入数据...
Thread-1正在写入数据...
Thread-4正在写入数据...
Thread-0写入数据完毕...
Thread-3写入数据完毕...
Thread-2写入数据完毕...
Thread-1写入数据完毕...
Thread-4写入数据完毕...
Thread-4 continued.
Thread-1 continued.
Thread-2 continued.
Thread-3 continued.
Thread-0 continued.

从上述输出结果可看出,每个写入线程执行完写数据操作之后,就在等待其他线程写入操作完毕;当所有线程都写入超市做完毕之后,所有线程就继续进行后续操作了。

 

如果想在所有线程写入操作之后,进行额外的其他操作,就可以为 CyclicBarrier 提供Runnable参数。

publicclass CyclicBarrierDemo02
{
    privatestaticintSIZE
= 5;   
    privatestatic CyclicBarriercBarrier;  
    publicstaticvoid main( String[] args )
    {
       cBarrier =new CyclicBarrier(SIZE,new
Runnable()
    {
        @Override
        publicvoid run()
        {
               System.out.println("CyclicBarrier's parties is: "+cBarrier.getParties());
        }
    });      
       //新建5个线程
       for (int i = 0; i <SIZE;
i++ ) {
           new Thread(new Runnable(){
            @Override
            publicvoid run()
            {
                try
                {
                    System.out.println(Thread.currentThread().getName() +"wait for CyclicBarrier");
                    //将cBarrier的参与量+1
                    cBarrier.await();
                    System.out.println(Thread.currentThread().getName() +" continued.");                   
                } catch ( Exception e )
                {
                    e.printStackTrace();
                }                                                    
            }
        }).start();      
       }
    }
}
 

 

运行结果:

Thread-0wait for CyclicBarrier
Thread-2wait for CyclicBarrier
Thread-1wait for CyclicBarrier
Thread-3wait for CyclicBarrier
Thread-4wait for CyclicBarrier
CyclicBarrier's parties is: 5
Thread-2 continued.
Thread-1 continued.
Thread-4 continued.
Thread-0 continued.
Thread-3 continued.

 

4  Semaphore说明和用法

4.1    说明

semaphore可以控制同时访问的线程个数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可。

4.2    常用方法

构造方法

//参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(intpermits) {

    sync =newNonfairSync(permits);

}
 

//参数fair表示是否是公平的,即等待时间越久的越先获取许可

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

 

其他常用方法

//获取一个许可

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

 

//获取permits个许可

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

 

//释放一个许可

public void release() {
    sync.releaseShared(1);
}

//释放permits个许可

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

 

 

acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可;

release()用来释放许可。注意,在释放许可之前,必须先获得许可。

 

4.3    实例

publicclass SemaphoreDemo
{
    privatestaticfinalintSEX_MAX
= 10;
   
    publicstaticvoid main(String args[]){
        Semaphore semaphore = new Semaphore(SEX_MAX );
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool( 3 );
        //在线程池中执行任务
        threadPool.execute( new MyThread( semaphore, 10 ) );
        threadPool.execute( new MyThread( semaphore, 4 ) );
        threadPool.execute( new MyThread( semaphore, 7 ) );
        //关闭线程池
        threadPool.shutdown();
    }
}
 
class MyThreadimplements Runnable{
    //信号量
    privatevolatile Semaphoresemaphore;
    //申请信号量的大小
    privateintcount;
    public MyThread(Semaphore semaphore,int count)
    {
       this.semaphore = semaphore;
       this.count = count;
    }
 
    @Override
    publicvoid run()
    {
        try{
            //从信号量中获取count个许可
            semaphore.acquire(count);
            Thread.sleep( 2000 );
            System.out.println(Thread.currentThread().getName() +" acquire count = "
+count );
           
        }catch (Exception e) {
            e.printStackTrace();
        }finally{
            semaphore.release(count);
            System.out.println(Thread.currentThread().getName() +" release count = "
+count);
        }
    }
}

运行结果

pool-1-thread-1 acquire count = 10
pool-1-thread-1 release count = 10
pool-1-thread-2 acquire count = 4
pool-1-thread-2 release count = 4
pool-1-thread-3 acquire count = 7
pool-1-thread-3 release count = 7

    信号量semaphore的许可总数是10个;共3个线程,分别需要获取的信号量许可数是:10,4,7.前面第1个线程获取到的许可量的许可后,
semaphore
剩余的可用的许可数是0;因此,后两个线程必须等待第一个线程释放了它所持有的信号量许可之后,才能获得信号量4,同理第三个线程也是一样。

 

5   总结

(1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过他们侧重点不同;CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而
CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;另外,CountDownLatch是不能够重用的,而 CyclicBarrier是可以重用的。

(2)Semaphore起始和锁有点类似,它一般用于控制对某组资源的访问权限。

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: