您的位置:首页 > 编程语言 > Java开发

java线程深度解析(四)——并发模型(Master-Worker)

2016-12-20 00:53 453 查看
http://blog.csdn.net/daybreak1209/article/details/51372929

二、Master-worker ——分而治之

     Master-worker常用的并行模式之一,核心思想是由两个进程协作工作,master负责接收和分配任务,worker负责处理任务,并把处理结果返回给Master进程,由Master进行汇总,返回给客户端。

     它的好处在于能把一个大任务分解成若干个小任务,并行执行,提高系统吞吐量。而对于客户端而言,一旦提交任务,mater进程立刻返回一个处理结果,并非等待系统处理完毕再返回。



     下面利用Master-Worker模型实现一个计算1-100立方和,思路如下:

1、将计算任务分配成100个子任务,每个子任务用于计算单独数字的立方和

2、master产生固定个数的worker用于处理这个子任务

3、worker开始计算,并把结果写入resultMap中

4、master负责汇总map中的数据,求和后将最终结果返回给客户端。

Worker类的实现

[html] view
plain copy

 





public class Worker implements Runnable{  

    //任务队列,用于每个子任务  

    protected Queue<Object> workQueue;  

    //子任务处理结果集  

    protected Map<String,Object> resultMap;  

  

    public void setWorkQueue(Queue<Object> workQueue)  

    {  

        this.workQueue=workQueue;  

    }  

      

    public void setResultMap(Map<String, Object> resultMap) {  

        this.resultMap = resultMap;  

    }  

      

    //子任务处理逻辑,在子类中具体实现  

    public Object handle(Object input)  

    {  

        return input;  

    }  

      

    @Override  

    public void run() {  

        while(true)  

        {  

            //获取子任务  

            Object  input =workQueue.poll();//remove the head of queue  

            if(input==null) break;   

            //处理子任务  

            Object re=handle(input);  

            //将处理结果写入结果集  

            resultMap.put(Integer.toString(input.hashCode()),re);  

        }  

    }  

}  

Worker子类实现:单个数字立方计算,重写worker的handle方法

[html] view
plain copy

 





public class SubWorker extends Worker{  

    public Object handle(Object input)  

    {  

        Integer i=(Integer)input;  

        return i*i*i;  

    }  

}  

Master类的实现

[html] view
plain copy

 





public class Master {  

    //任务队列  

    protected Queue<Object> workQueue=new ConcurrentLinkedQueue<Object>();  

    //work进程队列  

    protected  Map<String,Thread> threadMap=new HashMap<String,Thread>();  

    //子任务处理结果集  

    protected Map<String,Object> resultMap=new ConcurrentHashMap<String,Object>();  

      

    //是否所有的子任务都结束了  

    public boolean isComplete()  

    {  

        for(Map.Entry<String, Thread> entry:threadMap.entrySet())  

        {  

            if(entry.getValue().getState()!=Thread.State.TERMINATED)  

            {  

                return false;  

            }  

        }  

        return true;  

    }  

      

    //master的构造,需要一个worker线程和worker的进程书香  

    public Master(Worker worker,int countWorker)  

    {  

        worker.setWorkQueue(workQueue);  

        worker.setResultMap(resultMap);  

        for(int i=0;i<countWorker;i++)  

        {  

            threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));  

        }  

    }  

      

    //提交任务-放入进程队列中  

    public void submit(Object job)  

    {  

        workQueue.add(job);  

        System.out.println("任务队列size:"+workQueue.size());  

    }  

      

    //返回子任务结果集  

    public Map<String,Object> getResultMap()  

    {  

        return resultMap;  

  

    }  

      

    //开始运行所有的worker进程  

    public void execute()  

    {  

        for(Map.Entry<String, Thread> entry:threadMap.entrySet())  

        {  

            entry.getValue().start();//调用子线程 worker.run  

            System.out.println(entry.getValue());  

        }  

    }  

}  

客户端实现

[html] view
plain copy

 





public class Client {  

    public static void main(String[] args) {  

        Master master=new Master(new SubWorker(), 5);//指定5个  

        for(int i=0;i<100;i++)  

            master.submit(i);  

            master.execute();  

            int re=0;  

            Map<String,Object> resultMap=master.getResultMap();  

            while(resultMap.size()>0 || !master.isComplete())  

            {  

                //不需要等待所有的worker执行完就可以计算结果  

                Set<String> keys=resultMap.keySet();  

                String key=null;  

                for(String k:keys)  

                {  

                    key=k;  

                    break;  

                }  

                Integer i=null;  

                if(key!=null)  

                    i=(Integer)resultMap.get(key);  

                if(i!=null)  

                    re+=i;//最终计算结果  

                  

                if(key!=null)  

                    resultMap.remove(key);  

            }  

            System.out.println(re); //打印最后计算结果  

        }  

}  

最终结果:

任务队列大小

size:1-100
五个线程数:

Thread[0,5,main]

Thread[1,5,main]

Thread[2,5,main]

Thread[3,5,main]

Thread[4,5,main]

最终计算结果:

24502500     

      在整个计算中,master和worker 的执行完全是异步的,master不必等到每所有worker完成,就可以进行求和操作。在获得部分子任务结果时,就已经可以对结果进行计算,从而提高并发度和吞吐量。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: