您的位置:首页 > 其它

并行设计模式_Master-Worker_Future_生产者消-费者模型

2017-12-01 02:48 330 查看
关于并行设计模式-Master-Worker,future,

一.Master-Worker



1.一个pojo用于定义任务内容

package com.wpx.thread.demo05;

public class Task {

private Integer id;
private String taskName;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}

}
2.Master

package com.wpx.thread.demo05;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
*  Master-Worker模式
*  aster-Worker模式是常用的并行计算模式,核心思想是由两类进程协作工作Master和Worker
*
*	Master负责接收任务和分配任务,Worker负责处理子任务,当每个任务将结果返回geiMaster,由Master做归纳和总计
*  类似于一个包工头包了一块工程,找来多个工人一起完成,工人将工作汇报,包工头根据实际场景发个工资
*  好处:将一个大任务分解为若干个小任务,并行执行,从而提高系统的吞吐量
* @author wangpx
*
*/
public class Master {
//承装任务的集合
private ConcurrentLinkedQueue<Task> workQueue=new ConcurrentLinkedQueue<>();
//承装所有worker对象
private HashMap<String,Thread> works=new HashMap<>();
//承装每一个Worker执行任务的结果集
private ConcurrentHashMap<String,Object> resultMap=new ConcurrentHashMap<>();
//一个构造方法来统计worker
public Master(Worker worker,int workerCount) {
worker.setWorkerQueue(this.workQueue);
worker.setResultMap(this.resultMap);

for(int i=0;i<workerCount;i++) {
//key表示一个worker的名字,value表示线程
works.put("节点"+Integer.toString(i), new Thread(worker));
}
}
//提交方法
public void submit(Task task) {
this.workQueue.add(task);
}
//一个执行方法,让所有的Worker工作
public  void excute() {
for(Map.Entry<String, Thread> me: works.entrySet()) {
me.getValue().start();
}
}
//判断线程是否执行完毕
public boolean isComplete() {
for(Map.Entry<String, Thread> me: works.entrySet()) {
if(	me.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
public List<Object> getResult(){
List<Object> list=new ArrayList<>();
for(Map.Entry<String, Object> me: resultMap.entrySet()) {
list.add(me.getValue());
}
return list;
}

}
3.Worker
package com.wpx.thread.demo05;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable{
private ConcurrentLinkedQueue<Task> workQueue;

private ConcurrentHashMap<String, Object> resultMap;

public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue=workQueue;
}

public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap=resultMap;
}

@Override
public void run() {
while(true) {
Task input=this.workQueue.poll();
if(input == null) break;
//真正的去处理业务
Object output=handle(input);
this.resultMap.put(Integer.toString(input.getId()), output);
}
}

private Object handle(Task input) {
Object output=null;
try {
//处理业务耗时
Thread.sleep(1000);
output=input.getTaskName();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;

}

}


4.Main函数

package com.wpx.thread.demo05;
/**
* 10102
任务88任务89任务90任务91任务92任务93任务
94任务95任务96任务97任务10任务98任务
11任务99任务12任务13任务14任务15任务16任务17任务
18任务19任务1任务2任务3任务4任务5任务6任务7任务8任务
9任务20任务21任务22任务23任务24任务25任务26任务27
任务28任务29任务30任务31任务32任务33任务34任务35
任务36任务37任务38任务39任务40任务41任务42任务43
任务44任务45任务46任务47任务48任务49任务50任务51
任务52任务53任务54任务55任务56任务57任务58任务59
任务60任务61任务62任务63任务64任务65任务66任务67
任务68任务69任务70任务71任务72任务73任务74任务75
任务76任务77任务78任务79任务100任务80任务81任务82
任务83任务84任务85任务86任务87
* @author wangpx
*/
public class Main {
public static void main(String[] args) {

Master master =new Master(new Worker(), 10);
for(int i=1;i<101;i++) {
Task t=new Task();
t.setId(i);
t.setTaskName("任务"+i);
master.submit(t);
}
master.excute();
long start =System.currentTimeMillis();
while(true) {
if(master.isComplete()) {
long result=System.currentTimeMillis()-start;
System.out.println(result);
master.getResult().stream().forEach(System.out::print);
break;
}

}

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息