您的位置:首页 > 编程语言 > Java开发

[转载] java多线程学习-java.util.concurrent详解(二)Semaphore/FutureTask/Exchanger

2015-07-02 00:34 591 查看
转载自http://janeky.iteye.com/blog/770393

-----------------------------------------------------------------------------

3. Semaphore
我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
“一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。”

我们一般用它来控制某个对象的线程访问对象

例如,对于某个容器,我们规定,最多只能容纳n个线程同时操作
使用信号量来模拟实现

具体代码如下(参考 [JCIP])

Java代码


import java.util.Collections;

import java.util.HashSet;

import java.util.Set;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

public class TestSemaphore {

public static void main(String[] args) {

ExecutorService exec = Executors.newCachedThreadPool();

TestSemaphore t = new TestSemaphore();

final BoundedHashSet<String> set = t.getSet();

for (int i = 0; i < 3; i++) {//三个线程同时操作add

exec.execute(new Runnable() {

public void run() {

try {

set.add(Thread.currentThread().getName());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

}

for (int j = 0; j < 3; j++) {//三个线程同时操作remove

exec.execute(new Runnable() {

public void run() {

set.remove(Thread.currentThread().getName());

}

});

}

exec.shutdown();

}

public BoundedHashSet<String> getSet() {

return new BoundedHashSet<String>(2);//定义一个边界约束为2的线程

}

class BoundedHashSet<T> {

private final Set<T> set;

private final Semaphore semaphore;

public BoundedHashSet(int bound) {

this.set = Collections.synchronizedSet(new HashSet<T>());

this.semaphore = new Semaphore(bound, true);

}

public void add(T o) throws InterruptedException {

semaphore.acquire();//信号量控制可访问的线程数目

set.add(o);

System.out.printf("add:%s%n",o);

}

public void remove(T o) {

if (set.remove(o))

semaphore.release();//释放掉信号量

System.out.printf("remove:%s%n",o);

}

}

}

总结:Semaphore通常用于对象池的控制

4.FutureTask
我们先来学习一下JDK1.5 API中关于这个类的详细介绍:

“取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。
可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。
除了作为一个独立的类外,此类还提供了 protected 功能,这在创建自定义任务类时可能很有用。 “

应用举例:我们的算法中有一个很耗时的操作,在编程的是,我们希望将它独立成一个模块,调用的时候当做它是立刻返回的,并且可以随时取消的

具体代码如下(参考 [JCIP])

Java代码


import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.FutureTask;

public class TestFutureTask {

public static void main(String[] args) {

ExecutorService exec=Executors.newCachedThreadPool();

FutureTask<String> task=new FutureTask<String>(new Callable<String>(){//FutrueTask的构造参数是一个Callable接口

@Override

public String call() throws Exception {

return Thread.currentThread().getName();//这里可以是一个异步操作

}});

try {

exec.execute(task);//FutureTask实际上也是一个线程

String result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待

System.out.printf("get:%s%n",result);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

}

总结:FutureTask其实就是新建了一个线程单独执行,使得线程有一个返回值,方便程序的编写

5. Exchanger
我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
“可以在pair中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。 “

应用举例:有两个缓存区,两个线程分别向两个缓存区fill和take,当且仅当一个满了,两个缓存区交换

代码如下(参考了网上给的示例 http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d.html)
Java代码


import java.util.ArrayList;

import java.util.concurrent.Exchanger;

public class TestExchanger {

public static void main(String[] args) {

final Exchanger<ArrayList<Integer>> exchanger = new Exchanger<ArrayList<Integer>>();

final ArrayList<Integer> buff1 = new ArrayList<Integer>(10);

final ArrayList<Integer> buff2 = new ArrayList<Integer>(10);

new Thread(new Runnable() {

@Override

public void run() {

ArrayList<Integer> buff = buff1;

try {

while (true) {

if (buff.size() >= 10) {

buff = exchanger.exchange(buff);//开始跟另外一个线程交互数据

System.out.println("exchange buff1");

buff.clear();

}

buff.add((int)(Math.random()*100));

Thread.sleep((long)(Math.random()*1000));

}

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}).start();

new Thread(new Runnable(){

@Override

public void run() {

ArrayList<Integer> buff=buff2;

while(true){

try {

for(Integer i:buff){

System.out.println(i);

}

Thread.sleep(1000);

buff=exchanger.exchange(buff);//开始跟另外一个线程交换数据

System.out.println("exchange buff2");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}}).start();

}

}

总结:Exchanger在特定的使用场景比较有用(两个伙伴线程之间的数据交互)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: