您的位置:首页 > 编程语言 > Java开发

java多线程编程--工作线程模式《java多线程编程调试模式》

2017-03-23 12:56 423 查看
一.本例来自《java多线程编程调试模式》:

题意:模拟流水线上的工人,

 工人一直在流水线上作业,零件(可看作客户端发的请求)一到达,工人就开始进行工作,无零件时工作处于等待状态

具体的业务代码有详注释:



一测试类:
package worker.thread.pattern;

public class Main {
public static void main(String[] args) {
Channel channel = new Channel(5);   // Worker Thread数量
channel.startWorkers();
ClientThread alice = new ClientThread("Alice", channel);
ClientThread bobby = new ClientThread("Bobby", channel);
ClientThread chris = new ClientThread("Chris", channel);
alice.start();
bobby.start();
chris.start();

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
alice.stopThread();
bobby.stopThread();
chris.stopThread();
channel.stopAllWorkers();
}
}


2.客户端类:主要发送工作请求
package worker.thread.pattern;

import java.util.Random;

/**
* 不停的发送请求给工人类
*
* 1.创建请求实例,并将实例发送给管理工人线程的类
*
* @author lxb
*
*/
public class ClientThread extends Thread {

private final Channel channel;
private static final Random random = new Random();

private volatile boolean terminated = false; // 停止请求标志

public ClientThread(String name, Channel channel) {
super(name);
this.channel = channel;
}

public void run() {
try {
for (int i = 0; !terminated; i++) {
try {
Request request = new Request(getName(), i);
channel.putRequest(request);
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
terminated = true;
}
}
} finally {
System.out.println(Thread.currentThread().getName()
+ " is terminated.");
}
}

public void stopThread() {
terminated = true;
interrupt();
}
}


3.请求对象:

package worker.thread.pattern;

import java.util.Random;

/**
* 请求实体类
* @author lxb
*
*/
public class Request {

private final String name; //
private final int number;  //
private static final Random random = new Random();
public Request(String name, int number) {
this.name = name;
this.number = number;
}
public void execute() {
System.out.println(Thread.currentThread().getName() + " executes " + this);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
public String toString() {
return "[ Request from " + name + " No." + number + " ]";
}
}


4.请求队列与工人线程管理类:
package worker.thread.pattern;

/**
* 用来接受客户端线程发来的请求、将请求传递给工人线程,所以至少需要以下数据结构:
* 1.一个队列来保存客户端发来的请求,还必须提供两个接口,一个putRequest保存请求, 另一个是取出请求takeRequest
* 2.线程池来管理各个工人线程,这个池可以是一个线程数组,数组大小可自定义
*
* @author lxb
*
*/
public class Channel {

private static final int MAX_REQUEST = 100; // 假设最大的请求队列值

private final Request[] requestQueue; // 保存的请求队列

private int tail;
private int head;
private int count; // 请求队列中的请求数量

private final WorkerThread[] threadPool; // 工人线程池

public Channel(int threads) {

this.requestQueue = new Request[MAX_REQUEST];
this.head = 0;
this.count = 0;
this.tail = 0;
threadPool = new WorkerThread[threads];

for (int i = 0; i < threadPool.length; i++) {
threadPool[i] = new WorkerThread("Worker-" + i, this);
}
}

/**
* 开启工作线程
*/
public void startWorkers() {
for (int i = 0; i < threadPool.length; i++) {
threadPool[i].start();
}
}

public void stopAllWorkers(){
for(int i=0;i<threadPool.length;i++){
threadPool[i].stopThread();
}
}

/**
* 保存客户端发来的工作请求
*
* @param request
* @throws InterruptedException
*/
public synchronized void putRequest(Request request) throws InterruptedException {
while (count >= requestQueue.length) {
wait();
}

requestQueue[tail] = request;

tail = (tail + 1) % requestQueue.length;

count++;

notifyAll();
}

public synchronized Request takeRequest() throws InterruptedException {
while (count <= 0) {
wait();
}

Request request = requestQueue[head];
head = (head + 1) % requestQueue.length;

count--;
notifyAll();
return request;
}

}


5工人线程:

package worker.thread.pattern;

/**
* 工作线程,处理客户端发来的具体的工作请求
* @author lxb
*
*/
public class WorkerThread extends Thread{

private final Channel channel;
private volatile boolean terminated = false;	//停止请求标志

public WorkerThread(String name,Channel channel){
super(name);
this.channel = channel;
}

public void run() {
try {
while (!terminated) {
try {
Request request = channel.takeRequest();
request.execute();
} catch (InterruptedException e) {
terminated = true;
}
}
} finally {
System.out.println(Thread.currentThread().getName() + " is terminated.");
}
}
public void stopThread() {
terminated = true;
interrupt();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程 多线程