您的位置:首页 > 编程语言 > Java开发

java并发编程实战 第五章(2)合并任务的结果

2015-07-24 09:53 645 查看
2、合并任务的结果

Fork/Join框架提供了执行任务并返回结果的能力,这类任务都是通过RecursiveTask实现的,并且该类实现了Future接口,

所以通过Future接口得到返回结果。

RecursiveTask类有个抽象方法compute该方法返回由子类决定的泛型类型,并且该方法是执行任务的主要方法。

可以在该方法执行任务的分解与合并并且

返回执行任务后的最终结果。

可以通过Future的get()方法获取每个任务的结果并且合并任务的结果。

该方法的重载方法:

get(long timeout,TimeUnit unit):如果任务的结果没有准备好,将等待指定的时间。如果等待指定时间超出,

而结果还没有出现,则该方法就会返回null值。

实例代码:

[code]/**
 * 
 * @author fcs
 * @date 2015-7-24
 * 描述:生成字符串文档
 * 说明:
 */
public class Document {
    private String words []= {"the","hello","goodbye","packts","java","thread","pool","random","zhangsan","lisi","forkjoin","executors"};
    public String [][] generateDocument(int numLines,int numWords,String word){
        int counter = 0;
        String document  [][] = new String [numLines][numWords];
        Random random = new Random();
        for(int i = 0;i<numLines;i++){
            for(int j = 0;j<numWords;j++){
                int index = random.nextInt(words.length);
                document[i][j] = words[index];
                if(document[i][j].equals(word)){
                    counter++;
                }
            }
        }
        System.out.println("DocumentMock : The word appears "+counter+" times in the document");
        return document;
    }
}


[code]/**
 * 
 * @author fcs
 * @date 2015-6-17
 * 描述:文档任务,将遍历文档中的每一行来查找这个词
 * 说明:
 */
public class DocumentTask extends RecursiveTask<Integer>{
    private static final long serialVersionUID = -3309913898436499241L;

    private String document  [][];
    private int start ,end;
    private String word;

    public DocumentTask(String[][] document, int start, int end, String word) {
        this.document = document;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    /**
     * 任务分解
     */
    @Override
    protected Integer compute() {
        Integer  result = null;
        if(end - start< 10){
            result = processLine(document, start, end, word);
        } else{
            int mid = (start + end) / 2;
            DocumentTask task1 = new DocumentTask(document,start,mid,word);
            DocumentTask task2 = new DocumentTask(document,mid,end,word);
            invokeAll(task1,task2);
            try {
                result = groupResults(task1.get(),task2.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }
    /**
     * 
     * 作者:fcs
     * 说明:查找字符串
     * 返回:该文档有多少单词
     * 参数:
     * 时间:2015-7-24
     */
    private Integer processLine(String [][] document,int start,int end,String word){
        List<LineTask>  tasks = new ArrayList<LineTask>();
        for(int  i=start;i<end;i++){
            LineTask  task = new LineTask(document[i],0,document[i].length,word);
            tasks.add(task);
        }
        invokeAll(tasks);
        int result = 0;
        //合计这些任务的返回值,并返回结果
        for(int  i =0;i<tasks.size();i++){
            LineTask task = tasks.get(i);
            try {
                result = result+task.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return new Integer(result);
    }

    //计算两个数字的和,并返回结果
    private Integer groupResults(Integer number1,Integer number2){
        Integer result; 
        result = number1+number2;
        return result;
    }

}


[code]/**
 * 
 * @author fcs
 * @date 2015-6-17
 * 描述:行任务,将在文档的一部分中查找这个词
 * 说明:
 */
public class LineTask extends RecursiveTask<Integer>   {

    private static final long serialVersionUID = -5169618143429955903L;
    private String line[];
    private int start,end;
    private String word;

    public LineTask(String[] line, int start, int end, String word) {
        this.line = line;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    //如果end和start属性的差异小于100,将这一组词拆分成两组,然后创建两个
    //新的LineTask对象处理这两组,调用invokeAll()方法在线程池中执行他们。
    @Override
    protected Integer compute() {
        Integer result = null;
        if(end - start < 100){
            result = count(line,start,end, word);

        }else{
            int mid = (start + end) / 2;
            LineTask task1 = new LineTask(line,start,mid, word);
            LineTask  task2 = new LineTask(line,mid, end ,word);
            //调用该方法在线程池中执行该方法
            invokeAll(task1,task2);
            try {
                //调用该方法将两个任务返回的值相加,最后返回任务计算的结果
                result = groupResults(task1.get(), task2.get());

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }
        return result;
    }

    //查找每一行的单词个数
    private Integer count(String [] line,int start,int end ,String word){
        int counter = 0;
        for(int  i =start;i < end; i++){
            if(line[i].equals(word)){
                counter++;
            }
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return counter;
    }

    //计算两个数字的和,并返回结果
    private Integer groupResults(Integer number1,Integer number2){
        Integer result; 
        result = number1+number2;
        return result;
    }
}


[code]public class Main {
    public static void main(String[] args) {
        Document doc = new Document();
        String  [][] document = doc.generateDocument(100,100, "the");
        DocumentTask task = new DocumentTask(document,0,100,"the");
        ForkJoinPool  pool = new ForkJoinPool();
        pool.execute(task);

        do{
            System.out.println("*************************************");
            System.out.printf("Main: Parallelism : %d\n",pool.getParallelism());
            System.out.printf("Main: Active Threads:%d\n",pool.getActiveThreadCount());
            System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount());
            System.out.printf("Main: Steal Count: %d\n",pool.getStealCount());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }while(!task.isDone());
        pool.shutdown();

        try {
            //调用awaitTermination等待任务执行结束
            pool.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.printf("Main: The word appears %d in the document",task.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}


运行截图:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: