线程池和阻塞队列的组合使用
2016-07-04 19:09
330 查看
场景
通过MQ接收消息队列,或者订阅了MQ的消息
消费端接收消息后的处理存在耗时操作,此时考虑多线程并发处理
通过线程池和阻塞队列实现
线程池是为了便于管理多线程
阻塞队列是为了确保程序运行在可控范围内,不至于因为资源耗尽而崩溃
// 将消息存储于阻塞队列中
ConsumerBlockingQueue.queue.put(textMsg.getText());
public class ConsumerBlockingQueue {
// 声明一个容量为10的缓存队列
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
// 申请一个固定数量为2的线程池
private static ExecutorService executor = Executors.newFixedThreadPool(2);
/**
* 处理消息
*/
public static void handle() {
// 启动线程
executor.execute(new Consumer(queue));
}
}
public class Consumer implements Runnable {
private static final Logger logger = Logger.getLogger(Consumer.class);
BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> initQ) {
queue = initQ;
}
public void run() {
logger.info(Thread.currentThread().getName() + "启动消费者线程!");
boolean isRunning = true;
try {
while (isRunning) {
logger.info("正从队列获取数据...");
// waiting if necessary until an element becomes available.
String data = queue.take();
if (null != data) {
logger.info("拿到数据:" + data);
logger.info(Thread.currentThread().getName() + "正在消费数据:" + data);
} else {
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
logger.info(Thread.currentThread().getName() + "退出消费者线程!");
}
}
}
通过MQ接收消息队列,或者订阅了MQ的消息
消费端接收消息后的处理存在耗时操作,此时考虑多线程并发处理
通过线程池和阻塞队列实现
线程池是为了便于管理多线程
阻塞队列是为了确保程序运行在可控范围内,不至于因为资源耗尽而崩溃
// 将消息存储于阻塞队列中
ConsumerBlockingQueue.queue.put(textMsg.getText());
public class ConsumerBlockingQueue {
// 声明一个容量为10的缓存队列
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
// 申请一个固定数量为2的线程池
private static ExecutorService executor = Executors.newFixedThreadPool(2);
/**
* 处理消息
*/
public static void handle() {
// 启动线程
executor.execute(new Consumer(queue));
}
}
public class Consumer implements Runnable {
private static final Logger logger = Logger.getLogger(Consumer.class);
BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> initQ) {
queue = initQ;
}
public void run() {
logger.info(Thread.currentThread().getName() + "启动消费者线程!");
boolean isRunning = true;
try {
while (isRunning) {
logger.info("正从队列获取数据...");
// waiting if necessary until an element becomes available.
String data = queue.take();
if (null != data) {
logger.info("拿到数据:" + data);
logger.info(Thread.currentThread().getName() + "正在消费数据:" + data);
} else {
// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
logger.info(Thread.currentThread().getName() + "退出消费者线程!");
}
}
}
相关文章推荐
- input输入浮动提示
- iOS navigationController颜色透明问题
- 获取系统信息(CPU、内存等)
- 【iOS开发】关于homebrew的安装和测试
- Java实现几种常见排序方法
- du和sort命令
- 获取系统信息(CPU、内存等)
- C语言一维数组、二维数组、结构体的初始化
- 指针变量作为函数参数
- 【H5】Canvas 如何自适应屏幕大小
- 第11章 认识和学习bash
- 一周总结(17周)
- shell截取字符串
- Jetty配置
- 单例设计模式
- Android UncaughtExceptionHandler捕获crash的全局异常
- Angular1.3.X中embed标签使用ng-src的bug
- linux make
- 网上一些 MFC 精确定时1us的批判
- 【原创】《精益创业思维》分享会总结