您的位置:首页 > 编程语言 > Java开发

Java中的线程(七)- 生产者消费者模式

2013-11-17 15:51 393 查看

什么是生产者消费者模式

某个线程或进程负责产生数据,这些数据由另一个线程或进程来负责处理。产生数据的线程称为生产者;而处理数据的线程称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式。

结构图如下:



生产者消费者的参与者

角色作用
生产者提交请求,将任务装入内存缓冲区
消费者在内存缓冲区提取并处理任务
内存缓冲区缓存生产者提交的任务供给消费者使用
任务需要完成的任务
主函数调用生产者和消费者的客户端

代码实现

(1)Data
public class Data {

private final int data;

public Data(int s) {

data = Integer.valueOf(s);

}

public int getData(){

return data;

}

public String toString() {

return "data:"+data;

}

}

(2)Producer
public class Producer implements Runnable{
private volatile boolean isRun =true;

private BlockingQueue<Data> queue;

private static AtomicInteger count = new AtomicInteger();

private static final int SLEEPTIME = 1000;

public Producer(BlockingQueue<Data> queue) {

this.queue = queue;

}

public void run() {

Data data = null;

Random r = new Random();

System.out.println("start producer id="+Thread.currentThread().getId());

try {

while(isRun){

Thread.sleep(r.nextInt(SLEEPTIME));

data = new Data(count.incrementAndGet());

queue.offer(data, 2, TimeUnit.SECONDS);

}

} catch (Exception e) {

}

}

public void stop(){

isRun=false;

}

}

(3)Consumer
public class Consumer implements Runnable{

private BlockingQueue<Data> queue;

private static final int SLEEPTIME=1000;

public Consumer(BlockingQueue<Data> queue) {

this.queue = queue;

}

public void run(){

System.out.println("start Consumer id="+Thread.currentThread().getId());

Random r = new Random();

try{

while(true){

Data data = queue.take();

if(null != data){

int result = data.getData()*data.getData();

System.out.println("result is "+result);

Thread.sleep(r.nextInt(SLEEPTIME));

}

}

}catch(Exception e){

}

}

}

(4)Main
public static void main(String[] args) {

BlockingQueue<Data> queue = new LinkedBlockingDeque<Data>(10);

Producer p1 = new Producer(queue);

Producer p2 = new Producer(queue);

Producer p3 = new Producer(queue);

Consumer c1 = new Consumer(queue);

Consumer c2 = new Consumer(queue);

Consumer c3 = new Consumer(queue);

ExecutorService service = Executors.newCachedThreadPool();

service.execute(p1);

service.execute(p2);

service.execute(p3);

service.execute(c1);

service.execute(c2);

service.execute(c3);

try {

Thread.sleep(10*1000);

p1.stop();

p2.stop();

p3.stop();

Thread.sleep(3000);

service.shutdown();

} catch (InterruptedException e) {

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: