您的位置:首页 > 其它

使用线程池ExecutorService实现并行查询

2017-08-02 20:12 302 查看
基于spring-boot框架,设计一个实现并行查询,汇总的业务。

使用ExecutorService的FixedThreadPool实现并行,通过future的get()方法收集结果。

1、在spring-boot里面注入一个线程池管理类。

@Configuration
public class ExecutorServiceHolder {

@Bean
public ExecutorService getPool() {
ExecutorService pool = Executors.newFixedThreadPool(4);
System.out.println("load ExecutorServiceHolder。。。。。。。");
return pool;
}
}


通过FixedThreadPool指定大致上使用的线程数量,来创建一个池的实例。

关于注入,spring-boot十分智能,在要注入的类上面加上@Configuration并且在方法上加上@Bean就可以在程序初始化的时候,实现注入。这个@Configuration相当于mvc配置文件中的
<beans></beans>
,而@Bean相当于
<bean></bean>


2、在service层接收controller一组请求参数,发放到线程池中。

@Service
public class PathService {

private static final Logger logger = LoggerFactory.getLogger(PathService.class);
@SuppressWarnings("SpringJavaAutowiringInspection")
@Autowired
private PathMapper pathMapper;

@Autowired
private ExecutorService pool;

public List<Path> getPool(List<Integer> cates) throws OperationException {
ArrayList<Future<List<Path>>> futureList = new ArrayList(4);
Future<List<Path>> future;
for (Integer cateid : cates) {
future = pool.submit(new Callable<List<Path>>() {
@Override
public List<Path> call() throws Exception {

return pathMapper.get(cateid);
}
});
futureList.add(future);
}

List<Path> returnList = new ArrayList<>();
List<Path> singleThreadResult;

try{
for (Future<List<Path>> f : futureList) {
singleThreadResult = f.get();
returnList.addAll(singleThreadResult);
}
}catch (Exception e){
throw new OperationException(e,3);
}
return returnList;
}

}


这个是简单实现的代码,数据库接口就是根据id查询数据库操作。

首先通过 @Autowired的方式托管pool。

pool.submit()方法中放入一个带有返回值的Callable的线程,通过他的run方法请求数据。

用Future对象来接收pool.submit()方法返回的数据,这里的future接收的类型是List类型,list里面是返回的一个个查询的实体对象。

关于Future,它可以拿到callable的执行结果,如果是一个很耗时的操作,我们可以做其他的操作,或者继续添加线程去执行,在未来某一个时间拿到结果。

我们把提交的任务返回值添加到集合中,futureList。这个过程时间是忽略不计的,瞬间完成,真正耗时的操作是future的get()方法,所以我们循环futureList,拿到future的结果,就是我们想要的结果。

做个测试

@Service
public class PathService {

private static final Logger logger = LoggerFactory.getLogger(PathService.class);
@SuppressWarnings("SpringJavaAutowiringInspection")
@Autowired
private PathMapper pathMapper;

@Autowired
private ExecutorService pool;

public List<Path> getPool(List<Integer> cates) throws OperationException {
ArrayList<Future<List<Path>>> futureList = new ArrayList(4);
Future<List<Path>> future;
for (Integer cateid : cates) {
future = pool.submit(new Callable<List<Path>>() {
@Override
public List<Path> call() throws Exception {

if(cateid==63){
TimeUnit.SECONDS.sleep(1);
}else if(cateid==8603){
TimeUnit.SECONDS.sleep(2);
}else if(cateid==65){
TimeUnit.SECONDS.sleep(3);
}else if(cateid==20){
TimeUnit.SECONDS.sleep(4);
}
//获取当前循环的使用的线程池中线程的名字
System.out.println(Thread.currentThread().getName());
logger.info(Thread.currentThread().getName());

return pathMapper.get(cateid);
}
});
futureList.add(future);
}

List<Path> returnList = new ArrayList<>();
//设置总等待时间,4个并行的查询,最大时间取决于最长的一个那一个查询。
long timeout = 10000, elapse = 0;

long before = System.currentTimeMillis();
List<Path> singleThreadResult;

try{
for (Future<List<Path>> f : futureList) {
long t1 = System.currentTimeMillis();
singleThreadResult = f.get(timeout - elapse, TimeUnit.MILLISECONDS);
long t2 = System.currentTimeMillis();
returnList.addAll(singleThreadResult);
elapse += (t2 - t1);
}
}catch (TimeoutException e){
throw new OperationException(e,1);
}catch (ExecutionException e){
throw new OperationException(e,2);
}catch (Exception e){
throw new OperationException(e,3);
}

long after = System.currentTimeMillis();
System.out.println(after - before);
return returnList;
}

}


我们在每个线程中选择sleep几秒,这里直接用业务的逻辑去睡觉,每个线程睡的时间不一样,并且打印出当前线程的名字。然后我们同样把任务加到future集合里面。

设置一个总的等待时间,设置一个超时的异常。设置总的时间为10s,还有设置一个计算已经用掉的时间,因为每个线程的返回时间不一致。同时在循环future集合执行get()方法前后设置两个时间节点,before,after来记录执行的总时间。后面捕获一个timeout异常,如果总响应时间超过10s,要抛一个自定义的异常。自定义异常,通过不同的异常表示出不同的错误码,以便于在controller层,处理自己的异常的时候,判断相应的错误码来执行相应的业务逻辑,把异常与业务逻辑结合起来。

执行结果:



可以看到,执行时间大概是4s出头,因为睡的最久的线程是4s时间,使用这个方法,总的时间就大致上取决于响应时间最长的那个线程。

如果把一个线程睡眠时间设置为10s就会抛出timeout异常。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: