您的位置:首页 > 产品设计 > UI/UE

3种消费者生产者实现方式 {signal, blockingQueue, synchronized }

2014-08-10 15:03 381 查看
经典的消费者生产者问题:

一、利用阻塞队列实现:这种方式最为简单

/**
 * Created with IntelliJ IDEA.
 * User: yujj
 * Date: 2/14/14
 * Time: 5:27 PM
 * To change this template use File | Settings | File Templates.
 */
 public class App {

public static void main(String[] args) throws FileNotFoundException {

new App().run();

}

public void run() {

ExecutorService service = Executors.newCachedThreadPool();
BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<Integer>(10);
service.submit(new Consumer(blockingDeque));
service.submit(new Producer(blockingDeque));
service.shutdown();

}

static class Consumer implements Runnable {

private BlockingDeque<Integer> blockingDeque;

Consumer(BlockingDeque<Integer> blockingDeque) {
this.blockingDeque = blockingDeque;
}

@Override
public void run() {

while (true) {
try {
System.out.println("consumer: " + blockingDeque.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class Producer implements Runnable {

private BlockingDeque<Integer> blockingDeque;
private Integer count = 0;

Producer(BlockingDeque<Integer> blockingDeque) {
this.blockingDeque = blockingDeque;
}

@Override
public void run() {
while (true) {
try {
count++;
System.out.println("producer: " + count);
blockingDeque.put(count);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

二、利用synchronized实现

/**
 * Created with IntelliJ IDEA.
 * User: yujj
 * Date: 2/14/14
 * Time: 7:32 PM
 * To change this template use File | Settings | File Templates.
 */
public class App {

private static final Object SYNFLAG = new Object();
public static List<Integer> integerList = new ArrayList<Integer>();

public static void main(String[] args) {

App app = new App();
app.run();

}

public void run() {

ExecutorService executorService = Executors.newCachedThreadPool();

for (Integer i = 0; i < 10; i++) {
Producer producer = new Producer("producer" + i);
Consumer consumer = new Consumer("consumer" + i);
executorService.submit(producer);
executorService.submit(consumer);

}
executorService.shutdown();
}

class Producer implements Runnable {

private String name;

Producer(String name) {
this.name = name;

}

@Override
public void run() {

while (true) {

synchronized (SYNFLAG) {

if (integerList.size() < 3) {
Integer integer = integerList.size();
integerList.add(integer);
System.out.println(name + " insert " + integer);
}

}

}

}
}

class Consumer implements Runnable {

private String name;

Consumer(String name) {
this.name = name;
}

@Override
public void run() {

while (true) {

synchronized (SYNFLAG) {

if (integerList.size() > 0) {
Integer integer = integerList.remove(0);
System.out.println(name + " remove " + integer);
}

}
}

}
}
}


三、利用Condition实现, 这种方式稍微复杂一点,需要借助 ReentrantLock 封装一个阻塞队列

首先实现模拟阻塞队列封装:
/**
 * Created with IntelliJ IDEA.
 * User: yujj
 * Date: 2/15/14
 * Time: 3:37 PM
 * To change this template use File | Settings | File Templates.
 */
public class BoundedBuffer {

private final Lock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
private final Condition empty = lock.newCondition();
private Integer capacity = 0;
private List<Integer> integerList = new ArrayList<Integer>();

public BoundedBuffer(Integer capacity) {
this.capacity = capacity;
}

public void put(Integer integer) throws InterruptedException {

try {
lock.lock();
while (integerList.size() >= capacity) {
full.await();
}
integerList.add(integer);
empty.signal();
System.out.println("insert " + integer);

} finally {
lock.unlock();
}

}

public Integer take() throws InterruptedException {

Integer toBeReturn = null;
try {
lock.lock();
while (integerList.size() < 1) {
empty.await();
}
toBeReturn = integerList.remove(0);
full.signal();
} finally {
lock.unlock();
}

return toBeReturn;

}
}


消费者、生产者部分:

/**
 * Created with IntelliJ IDEA.
 * User: yujj
 * Date: 2/14/14
 * Time: 10:49 PM
 * To change this template use File | Settings | File Templates.
 */
public class App {

private static final Object object = new Object();
private static BoundedBuffer storage = new BoundedBuffer(1);

public static void main(String[] args) {

new App().run();

}

public void run() {

ExecutorService service = Executors.newCachedThreadPool();

for (Integer i = 0; i < 100; i++) {
Producer producer = new Producer("xxx", 1);
Consumer consumer = new Consumer("xxx");

service.submit(producer);
service.submit(consumer);
}

}

class Producer implements Runnable {

AtomicInteger atomicInteger = new AtomicInteger(10);
private String name;
private Integer putId;

Producer(String name, Integer putId) {
this.name = name;
this.putId = putId;
}

@Override
public void run() {

while (true) {

putId += 1;
try {
storage.put(putId);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

}

class Consumer implements Runnable {

private String name;

Consumer(String name) {
this.name = name;

}

@Override
public void run() {

while (true) {

try {
System.out.println("take " + storage.take());
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: