您的位置:首页 > 编程语言

并发编程复习(十):master和worker模式

2017-09-24 16:52 274 查看
首先说说这个模式的特点与原理,看图:



原理就是master用来管理调度worker并负责结果集的处理,实际开发中你可以任意的开启任意多的worker去执行任务,取决于你的CPU得性能,下面来看看模拟:

任务类:Task:

package com.zkingsoft.masterandworker;

/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: 任务类
*/
public class Task {
private int id;
private String name;
private int price;

public Task(int id, String name, int price) {
this.id = id;
this.name = name;
this.price = price;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getPrice() {
return price;
}

public void setPrice(int price) {
this.price = price;
}
}


调度管理类master:
package com.zkingsoft.masterandworker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: master负责调度worker,以及进行结果集的汇总
*/
public class Master {

//存放任务的并发容器
private ConcurrentLinkedQueue<Task> queueTask = new ConcurrentLinkedQueue<>();
//存放worker对象
private HashMap<String,Thread> hashMap = new HashMap<>();
//存放执行的结果集
private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<>();

//将引用传递给worker类
public Master(Worker workers,int workerCount){
workers.setQueueTask(queueTask);
workers.setResultMap(resultMap);
for (int i = 0; i < workerCount; i++) {
hashMap.put("worker"+Integer.toString(i),new Thread(workers));
}
}
//提交任务方法
public void submit(Task task){
this.queueTask.add(task);
}
//启动方法
public void execute(){
for (Map.Entry<String,Thread> me : hashMap.entrySet()){
me.getValue().start();
}
}
//判断线程是否执行完毕
public boolean isCompelete(){
for(Map.Entry<String,Thread> me : hashMap.entrySet()){
if (me.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
public int getResult(){
int res = 0;
for (Map.Entry<String,Object> o:resultMap.entrySet()){
res += (int) o.getValue();
}
return res;
}
}


执行任务的类worker:
package com.zkingsoft.masterandworker;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: 具体工作的worker类
*/
public class Worker implements Runnable {
private ConcurrentLinkedQueue<Task> queueTask;
private ConcurrentHashMap<String, Object> resultMap;

public void setQueueTask(ConcurrentLinkedQueue<Task> queueTask) {
this.queueTask = queueTask;
}

public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}

@Override
public void run() {
while (true) {
Task task = this.queueTask.poll();
if (task==null) break;
Object output = handler(task);
this.resultMap.put(Integer.toString(task.getId()),output);
}
}
//任务执行的真实代码,实际中可以定义一个空方法,由其子类去真正的执行任务
public Object handler(Task input){
Object output = null;
try {
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}

return output;
}
}
测试类:Main
package com.zkingsoft.masterandworker;

import java.util.Random;

/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description:
*/
public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(),20);
Random random = new Random();
for (int i = 1; i <= 100; i++) {
Task task = new Task(i,"任务"+i,random.nextInt(10000));
master.submit(task);
}
long startTime = System.currentTimeMillis();
master.execute();
while (true){
if (master.isCompelete()){
long end = System.currentTimeMillis()-startTime;
int res = master.getResult();
System.out.println("最终结果"+res+";执行耗时"+end+"毫秒");
break;
}
}
}
}
当我开启10个worker的时候,结果是:
最终结果505935;执行耗时5078毫秒
当我开启20个worker的时候,结果是:

最终结果476217;执行耗时2704毫秒
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息