您的位置:首页 > 编程语言 > Java开发

Java线程池ThreadPoolExecutor简介(二)

2016-04-24 22:35 447 查看
由于ThreadPoolExecutor实在太常用了,以致于我不得不将一些常用的例子与心得记下来。在上一篇《Java线程池ThreadPoolExecutor简介》中没讲完,这篇继续。

生产者-消费者模式

在使用了那么多次线程池后,我发现了原来大部分都是生产者-消费者的场景。归根到底,线程池本身就是生产者_消费者的设计实现!
于是我慢慢的就形成了一种强迫症:在 生产者-消费者的场景用线程池,或者用线程池时在思考这是不是生产者-消费者场景。比如我们要设计一个简单的消息推送服务,供其他模块调用:
public class PushMsgService {
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
10, 10 ,5L, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>());

public static void push(String message) {
threadPool.submit(new PushMsgRunnable(message));
}

//调整线程并发数
public static void setPoolSize(int num) {
threadPool.setCorePoolSize(num);
threadPool.setMaximumPoolSize(num);
}

static class PushMsgRunnable implements Runnable {
private String message;

public PushMsgRunnable(String message) {
this.message = message;
}

@Override
public void run() {
// 实现消息推送业务逻辑
}
}
}


这是个典型的应用场景!调用方相当于生产者,而消费者是线程池中的线程。至于线程需要处理的业务逻辑,封装了在PushMsgRunnable中。而且这里是异步调用,不会导致调用方阻塞。
一开始我们设定线程池的并发数是10,并且使用了无界队列,保证不会丢任务。但在某个时间段,需要推送的消息数量剧增,导致任务迟迟得到不处理。因此这里设计成线程池的并发数是支持修改的,在很多任务系统这个参数都是做成可配置的!
说多几句,其实有动态配置这个意识真的是经验,因为来到大公司后开发者与生产环境是隔离的,你要是写在配置文件中,意味着要是并发数不合适时,就是重启机器,就要蛋疼地找运维走流程。。。

将大任务拆分为小任务

若完成一个任务需要很长的时间,我们就需要考虑将该任务拆分成多个小任务,然后当然是并发执行,以缩短时间。另外,也许还需要等待所有子任务执行完,根据执行结果来进行下一步的业务处理。

通过场景来学习无疑是最好的。比如我每个月1日都需要统计上个月的销售总额,但我这边只有一张用户信息表,需要调用第三方接口来根据用户ID查询该用户的消费总额。

首先当然是拆分这个任务,但怎么拆分呢?既然我们需要从数据库表读用户信息,可以通过用户ID来对表中的数据进行分片。分片的方式可以是简单的分段,比如1-100,101-200……或者是hash。然后一个片对于一个线程,并行处理。先定义任务类:

class MyTask implements Callable<Long> {
private long begin;
private long end;

public MyTask(long begin, long end) {
super();
this.begin = begin;
this.end = end;
}

@Override
public Long call() throws Exception {
long count = 0;
for(long i=begin ;i<=end ;i++) {
count += queryForAmount(i);
}
return count;
}
// 根据用户Id查询对应销售额
private long queryForAmount(long userId) throws Exception {
System.out.println("query for userId: "+userId);
Thread.sleep(100);
return new Random(100000).nextLong();
}
}


这里假设查询出表中数据共有1000条,且启动10个线程处理,根据用户ID分段处理:

private static void doTask() {
int threadNum = 10;
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
List<Future<Long>> resultList = new ArrayList<Future<Long>>();

int begin = 1;
for(int i=begin ;i<=threadNum ;i++) {
resultList.add( pool.submit(new MyTask(begin,i*100)) );
begin = i*100 +1;
}

long total = 0;
for(Future<Long> item : resultList) {
try {
total += item.get();

} catch (Exception e) {
e.printStackTrace();
break;
}
}
System.out.println("total :"+total);
}
这个例子充分说明了任务类选择继承Callable接口的好处:

1. 主线程能够等待子线程返回执行结果

2. 子线程若抛出异常,能通过在主线程调用Future.get()方法捕获

关闭线程池

我们都知道Thread是不支持强制停止的,所以shutdownNow()也只是向每个线程发送一下中断信号,关于如何处理中断可参考《线程的中断与InterruptedException》。其实官方的API说明都很详细,但要是不写个demo验证一下,我是不会安心的,继续使用上面的任务类。

首先我们通过下面示例测试一下线程池正在执行任务的时候关闭,可以看到未执行完的任务还是继续执行。
private static void testShutdown() {
ExecutorService pool = Executors.newFixedThreadPool(5);
for(int i=0 ;i<10 ;i++) {
pool.submit(new MyTask(1,1000));
}

pool.shutdown();
if( !pool.isTerminated() ) {
try {
pool.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(pool.isTerminated()) {
System.out.println("pool is terminated");
}else {
System.out.println("pool is not terminated");
}
}


然后我们继续测试一下强制停止会怎么样:
private static void testShutdownNow() {
ExecutorService pool = Executors.newFixedThreadPool(5);
for(int i=0 ;i<10 ;i++) {
pool.submit(new MyTask(1,1000));
}

List<Runnable> unBeginTask = pool.shutdownNow();
System.out.println("unBeginTask size: "+unBeginTask.size());
}
嗯,由于会调用到Thread.interrupt(),故有部分休眠的线程抛InterruptedException 异常了。

任务对象:单例OR非单例

后面我遇到了个很有意思的问题,如果我有个任务(可以是Runnable的子类)需要并发执行,程序说明:
public static void main(String[] args) {

int threadNum = 5;
ExecutorService pool_1 = Executors.newFixedThreadPool(threadNum);
class MyTask implements Runnable {
@Override
public void run() {
while(true) {
System.out.println(Thread.currentThread().getName() + " doTask");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

// 构造新的任务实例提交
//		for(int i=0 ;i<threadNum ;i++) {
//			pool_1.submit(new MyTask());
//		}

// 重复提交同一个任务实例
MyTask task = new MyTask();
for(int i=0 ;i<threadNum ;i++) {
pool_1.submit(task);
}
}
我之所以觉得有意思是因为就算在线程池中重复提交同一个任务实例,也是能够并发执行的!要知道spring的bean通常是单例的。但这里有个坑,就是多个线程同时操作同一个任务实例,要求该任务实例是线程安全的。相比之下,我当然觉得是new一个新的比较好。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: