您的位置:首页 > 其它

线程池和阻塞队列的组合使用

2016-07-04 19:09 330 查看
场景

通过MQ接收消息队列,或者订阅了MQ的消息

消费端接收消息后的处理存在耗时操作,此时考虑多线程并发处理

通过线程池和阻塞队列实现

线程池是为了便于管理多线程

阻塞队列是为了确保程序运行在可控范围内,不至于因为资源耗尽而崩溃

// 将消息存储于阻塞队列中

ConsumerBlockingQueue.queue.put(textMsg.getText());

public class ConsumerBlockingQueue {

// 声明一个容量为10的缓存队列
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
// 申请一个固定数量为2的线程池
private static ExecutorService executor = Executors.newFixedThreadPool(2);

/**
* 处理消息
*/
public static void handle() {
// 启动线程
executor.execute(new Consumer(queue));
}

}

public class Consumer implements Runnable {

private static final Logger logger = Logger.getLogger(Consumer.class);

BlockingQueue<String> queue;

public Consumer(BlockingQueue<String> initQ) {
queue = initQ;
}

public void run() {
logger.info(Thread.currentThread().getName() + "启动消费者线程!");
boolean isRunning = true;
try {
while (isRunning) {
logger.info("正从队列获取数据...");
// waiting if necessary until an element becomes available.
String data = queue.take();
if (null != data) {
logger.info("拿到数据:" + data);
logger.info(Thread.currentThread().getName() + "正在消费数据:" + data);
} else {
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
logger.info(Thread.currentThread().getName() + "退出消费者线程!");
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: