java并发编程实战 第五章(2)合并任务的结果
2015-07-24 09:53
645 查看
2、合并任务的结果
Fork/Join框架提供了执行任务并返回结果的能力,这类任务都是通过RecursiveTask实现的,并且该类实现了Future接口,
所以通过Future接口得到返回结果。
RecursiveTask类有个抽象方法compute该方法返回由子类决定的泛型类型,并且该方法是执行任务的主要方法。
可以在该方法执行任务的分解与合并并且
返回执行任务后的最终结果。
可以通过Future的get()方法获取每个任务的结果并且合并任务的结果。
该方法的重载方法:
get(long timeout,TimeUnit unit):如果任务的结果没有准备好,将等待指定的时间。如果等待指定时间超出,
而结果还没有出现,则该方法就会返回null值。
实例代码:
运行截图:
Fork/Join框架提供了执行任务并返回结果的能力,这类任务都是通过RecursiveTask实现的,并且该类实现了Future接口,
所以通过Future接口得到返回结果。
RecursiveTask类有个抽象方法compute该方法返回由子类决定的泛型类型,并且该方法是执行任务的主要方法。
可以在该方法执行任务的分解与合并并且
返回执行任务后的最终结果。
可以通过Future的get()方法获取每个任务的结果并且合并任务的结果。
该方法的重载方法:
get(long timeout,TimeUnit unit):如果任务的结果没有准备好,将等待指定的时间。如果等待指定时间超出,
而结果还没有出现,则该方法就会返回null值。
实例代码:
[code]/** * * @author fcs * @date 2015-7-24 * 描述:生成字符串文档 * 说明: */ public class Document { private String words []= {"the","hello","goodbye","packts","java","thread","pool","random","zhangsan","lisi","forkjoin","executors"}; public String [][] generateDocument(int numLines,int numWords,String word){ int counter = 0; String document [][] = new String [numLines][numWords]; Random random = new Random(); for(int i = 0;i<numLines;i++){ for(int j = 0;j<numWords;j++){ int index = random.nextInt(words.length); document[i][j] = words[index]; if(document[i][j].equals(word)){ counter++; } } } System.out.println("DocumentMock : The word appears "+counter+" times in the document"); return document; } }
[code]/** * * @author fcs * @date 2015-6-17 * 描述:文档任务,将遍历文档中的每一行来查找这个词 * 说明: */ public class DocumentTask extends RecursiveTask<Integer>{ private static final long serialVersionUID = -3309913898436499241L; private String document [][]; private int start ,end; private String word; public DocumentTask(String[][] document, int start, int end, String word) { this.document = document; this.start = start; this.end = end; this.word = word; } /** * 任务分解 */ @Override protected Integer compute() { Integer result = null; if(end - start< 10){ result = processLine(document, start, end, word); } else{ int mid = (start + end) / 2; DocumentTask task1 = new DocumentTask(document,start,mid,word); DocumentTask task2 = new DocumentTask(document,mid,end,word); invokeAll(task1,task2); try { result = groupResults(task1.get(),task2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return result; } /** * * 作者:fcs * 说明:查找字符串 * 返回:该文档有多少单词 * 参数: * 时间:2015-7-24 */ private Integer processLine(String [][] document,int start,int end,String word){ List<LineTask> tasks = new ArrayList<LineTask>(); for(int i=start;i<end;i++){ LineTask task = new LineTask(document[i],0,document[i].length,word); tasks.add(task); } invokeAll(tasks); int result = 0; //合计这些任务的返回值,并返回结果 for(int i =0;i<tasks.size();i++){ LineTask task = tasks.get(i); try { result = result+task.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return new Integer(result); } //计算两个数字的和,并返回结果 private Integer groupResults(Integer number1,Integer number2){ Integer result; result = number1+number2; return result; } }
[code]/** * * @author fcs * @date 2015-6-17 * 描述:行任务,将在文档的一部分中查找这个词 * 说明: */ public class LineTask extends RecursiveTask<Integer> { private static final long serialVersionUID = -5169618143429955903L; private String line[]; private int start,end; private String word; public LineTask(String[] line, int start, int end, String word) { this.line = line; this.start = start; this.end = end; this.word = word; } //如果end和start属性的差异小于100,将这一组词拆分成两组,然后创建两个 //新的LineTask对象处理这两组,调用invokeAll()方法在线程池中执行他们。 @Override protected Integer compute() { Integer result = null; if(end - start < 100){ result = count(line,start,end, word); }else{ int mid = (start + end) / 2; LineTask task1 = new LineTask(line,start,mid, word); LineTask task2 = new LineTask(line,mid, end ,word); //调用该方法在线程池中执行该方法 invokeAll(task1,task2); try { //调用该方法将两个任务返回的值相加,最后返回任务计算的结果 result = groupResults(task1.get(), task2.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return result; } //查找每一行的单词个数 private Integer count(String [] line,int start,int end ,String word){ int counter = 0; for(int i =start;i < end; i++){ if(line[i].equals(word)){ counter++; } } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return counter; } //计算两个数字的和,并返回结果 private Integer groupResults(Integer number1,Integer number2){ Integer result; result = number1+number2; return result; } }
[code]public class Main { public static void main(String[] args) { Document doc = new Document(); String [][] document = doc.generateDocument(100,100, "the"); DocumentTask task = new DocumentTask(document,0,100,"the"); ForkJoinPool pool = new ForkJoinPool(); pool.execute(task); do{ System.out.println("*************************************"); System.out.printf("Main: Parallelism : %d\n",pool.getParallelism()); System.out.printf("Main: Active Threads:%d\n",pool.getActiveThreadCount()); System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount()); System.out.printf("Main: Steal Count: %d\n",pool.getStealCount()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }while(!task.isDone()); pool.shutdown(); try { //调用awaitTermination等待任务执行结束 pool.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.printf("Main: The word appears %d in the document",task.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
运行截图:
相关文章推荐
- Struts result param详细设置
- 深入Java关键字null
- 深入了解Struts2返回JSON数据的原理及具体应用范例
- Eclipse和InteliJ IDEA 使用区别
- java回调
- eclipse 上传svn 不传的内容
- 蘑菇街2015校招 Java研发笔试题 详解
- 【java】五子棋小游戏
- java 虚拟机的理解
- org.springframework.transaction.CannotCreateTransactionException
- 【连载】Maven系列(四)——配置私服
- JDK环境变量配置
- Java Socket编程
- Java语言系列03——基本数据类型
- 一种最简单的Java环境变量配置
- eclipse运行maven的jetty插件内存溢出
- Spring读取配置文件,获取bean的几种方式
- 性能分析工具Eclipse Mat
- String在java编程中使用细则
- Eclipse debug高级 技巧(转)