并发编程复习(十):master和worker模式
2017-09-24 16:52
274 查看
首先说说这个模式的特点与原理,看图:
原理就是master用来管理调度worker并负责结果集的处理,实际开发中你可以任意的开启任意多的worker去执行任务,取决于你的CPU得性能,下面来看看模拟:
任务类:Task:
package com.zkingsoft.masterandworker;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: 任务类
*/
public class Task {
private int id;
private String name;
private int price;
public Task(int id, String name, int price) {
this.id = id;
this.name = name;
this.price = price;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
调度管理类master:
package com.zkingsoft.masterandworker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: master负责调度worker,以及进行结果集的汇总
*/
public class Master {
//存放任务的并发容器
private ConcurrentLinkedQueue<Task> queueTask = new ConcurrentLinkedQueue<>();
//存放worker对象
private HashMap<String,Thread> hashMap = new HashMap<>();
//存放执行的结果集
private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<>();
//将引用传递给worker类
public Master(Worker workers,int workerCount){
workers.setQueueTask(queueTask);
workers.setResultMap(resultMap);
for (int i = 0; i < workerCount; i++) {
hashMap.put("worker"+Integer.toString(i),new Thread(workers));
}
}
//提交任务方法
public void submit(Task task){
this.queueTask.add(task);
}
//启动方法
public void execute(){
for (Map.Entry<String,Thread> me : hashMap.entrySet()){
me.getValue().start();
}
}
//判断线程是否执行完毕
public boolean isCompelete(){
for(Map.Entry<String,Thread> me : hashMap.entrySet()){
if (me.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
public int getResult(){
int res = 0;
for (Map.Entry<String,Object> o:resultMap.entrySet()){
res += (int) o.getValue();
}
return res;
}
}
执行任务的类worker:
package com.zkingsoft.masterandworker;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: 具体工作的worker类
*/
public class Worker implements Runnable {
private ConcurrentLinkedQueue<Task> queueTask;
private ConcurrentHashMap<String, Object> resultMap;
public void setQueueTask(ConcurrentLinkedQueue<Task> queueTask) {
this.queueTask = queueTask;
}
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}
@Override
public void run() {
while (true) {
Task task = this.queueTask.poll();
if (task==null) break;
Object output = handler(task);
this.resultMap.put(Integer.toString(task.getId()),output);
}
}
//任务执行的真实代码,实际中可以定义一个空方法,由其子类去真正的执行任务
public Object handler(Task input){
Object output = null;
try {
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}
测试类:Main
package com.zkingsoft.masterandworker;
import java.util.Random;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description:
*/
public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(),20);
Random random = new Random();
for (int i = 1; i <= 100; i++) {
Task task = new Task(i,"任务"+i,random.nextInt(10000));
master.submit(task);
}
long startTime = System.currentTimeMillis();
master.execute();
while (true){
if (master.isCompelete()){
long end = System.currentTimeMillis()-startTime;
int res = master.getResult();
System.out.println("最终结果"+res+";执行耗时"+end+"毫秒");
break;
}
}
}
}
当我开启10个worker的时候,结果是:
最终结果505935;执行耗时5078毫秒
当我开启20个worker的时候,结果是:
最终结果476217;执行耗时2704毫秒
原理就是master用来管理调度worker并负责结果集的处理,实际开发中你可以任意的开启任意多的worker去执行任务,取决于你的CPU得性能,下面来看看模拟:
任务类:Task:
package com.zkingsoft.masterandworker;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: 任务类
*/
public class Task {
private int id;
private String name;
private int price;
public Task(int id, String name, int price) {
this.id = id;
this.name = name;
this.price = price;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
调度管理类master:
package com.zkingsoft.masterandworker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: master负责调度worker,以及进行结果集的汇总
*/
public class Master {
//存放任务的并发容器
private ConcurrentLinkedQueue<Task> queueTask = new ConcurrentLinkedQueue<>();
//存放worker对象
private HashMap<String,Thread> hashMap = new HashMap<>();
//存放执行的结果集
private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<>();
//将引用传递给worker类
public Master(Worker workers,int workerCount){
workers.setQueueTask(queueTask);
workers.setResultMap(resultMap);
for (int i = 0; i < workerCount; i++) {
hashMap.put("worker"+Integer.toString(i),new Thread(workers));
}
}
//提交任务方法
public void submit(Task task){
this.queueTask.add(task);
}
//启动方法
public void execute(){
for (Map.Entry<String,Thread> me : hashMap.entrySet()){
me.getValue().start();
}
}
//判断线程是否执行完毕
public boolean isCompelete(){
for(Map.Entry<String,Thread> me : hashMap.entrySet()){
if (me.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
public int getResult(){
int res = 0;
for (Map.Entry<String,Object> o:resultMap.entrySet()){
res += (int) o.getValue();
}
return res;
}
}
执行任务的类worker:
package com.zkingsoft.masterandworker;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description: 具体工作的worker类
*/
public class Worker implements Runnable {
private ConcurrentLinkedQueue<Task> queueTask;
private ConcurrentHashMap<String, Object> resultMap;
public void setQueueTask(ConcurrentLinkedQueue<Task> queueTask) {
this.queueTask = queueTask;
}
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}
@Override
public void run() {
while (true) {
Task task = this.queueTask.poll();
if (task==null) break;
Object output = handler(task);
this.resultMap.put(Integer.toString(task.getId()),output);
}
}
//任务执行的真实代码,实际中可以定义一个空方法,由其子类去真正的执行任务
public Object handler(Task input){
Object output = null;
try {
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}
测试类:Main
package com.zkingsoft.masterandworker;
import java.util.Random;
/**
* @Author Lee_Hoo
* @DATE Created in 2017/9/24
* @Description:
*/
public class Main {
public static void main(String[] args) {
Master master = new Master(new Worker(),20);
Random random = new Random();
for (int i = 1; i <= 100; i++) {
Task task = new Task(i,"任务"+i,random.nextInt(10000));
master.submit(task);
}
long startTime = System.currentTimeMillis();
master.execute();
while (true){
if (master.isCompelete()){
long end = System.currentTimeMillis()-startTime;
int res = master.getResult();
System.out.println("最终结果"+res+";执行耗时"+end+"毫秒");
break;
}
}
}
}
当我开启10个worker的时候,结果是:
最终结果505935;执行耗时5078毫秒
当我开启20个worker的时候,结果是:
最终结果476217;执行耗时2704毫秒
相关文章推荐
- 并发编程之Master-Worker模式
- 并发编程-Master-Worker模式
- java并发编程之Master-Worker模式
- Master-Worker模式--并发编程
- Java并发编程---Master-Worker模式
- 并发模型(二)——Master-Worker模式
- Java 并发模式之Master-Worker
- 并发模型(二)——Master-Worker模式
- 并发模型(二)——Master-Worker模式
- 并发模型(二)——Master-Worker模式
- 并发模型(二)——Master-Worker模式
- 并发模式(二)Master-Worker模式
- 并发模型(二)——Master-Worker模式
- 并发模型(二)——Master-Worker模式
- 并发编程复习(九):Future模式
- Go 自带的 http/server.go 的连接解析 与 如何结合 master-worker 并发模式,提高单机并发能力
- 并发模式(二)Master-Worker模式
- 并发模型之Master-Worker设计模式
- 并发编程实现模型之(二)Master-Worker模式