使用线程池ExecutorService实现并行查询
2017-08-02 20:12
302 查看
基于spring-boot框架,设计一个实现并行查询,汇总的业务。
使用ExecutorService的FixedThreadPool实现并行,通过future的get()方法收集结果。
1、在spring-boot里面注入一个线程池管理类。
通过FixedThreadPool指定大致上使用的线程数量,来创建一个池的实例。
关于注入,spring-boot十分智能,在要注入的类上面加上@Configuration并且在方法上加上@Bean就可以在程序初始化的时候,实现注入。这个@Configuration相当于mvc配置文件中的
2、在service层接收controller一组请求参数,发放到线程池中。
这个是简单实现的代码,数据库接口就是根据id查询数据库操作。
首先通过 @Autowired的方式托管pool。
pool.submit()方法中放入一个带有返回值的Callable的线程,通过他的run方法请求数据。
用Future对象来接收pool.submit()方法返回的数据,这里的future接收的类型是List类型,list里面是返回的一个个查询的实体对象。
关于Future,它可以拿到callable的执行结果,如果是一个很耗时的操作,我们可以做其他的操作,或者继续添加线程去执行,在未来某一个时间拿到结果。
我们把提交的任务返回值添加到集合中,futureList。这个过程时间是忽略不计的,瞬间完成,真正耗时的操作是future的get()方法,所以我们循环futureList,拿到future的结果,就是我们想要的结果。
做个测试
我们在每个线程中选择sleep几秒,这里直接用业务的逻辑去睡觉,每个线程睡的时间不一样,并且打印出当前线程的名字。然后我们同样把任务加到future集合里面。
设置一个总的等待时间,设置一个超时的异常。设置总的时间为10s,还有设置一个计算已经用掉的时间,因为每个线程的返回时间不一致。同时在循环future集合执行get()方法前后设置两个时间节点,before,after来记录执行的总时间。后面捕获一个timeout异常,如果总响应时间超过10s,要抛一个自定义的异常。自定义异常,通过不同的异常表示出不同的错误码,以便于在controller层,处理自己的异常的时候,判断相应的错误码来执行相应的业务逻辑,把异常与业务逻辑结合起来。
执行结果:
可以看到,执行时间大概是4s出头,因为睡的最久的线程是4s时间,使用这个方法,总的时间就大致上取决于响应时间最长的那个线程。
如果把一个线程睡眠时间设置为10s就会抛出timeout异常。
使用ExecutorService的FixedThreadPool实现并行,通过future的get()方法收集结果。
1、在spring-boot里面注入一个线程池管理类。
@Configuration public class ExecutorServiceHolder { @Bean public ExecutorService getPool() { ExecutorService pool = Executors.newFixedThreadPool(4); System.out.println("load ExecutorServiceHolder。。。。。。。"); return pool; } }
通过FixedThreadPool指定大致上使用的线程数量,来创建一个池的实例。
关于注入,spring-boot十分智能,在要注入的类上面加上@Configuration并且在方法上加上@Bean就可以在程序初始化的时候,实现注入。这个@Configuration相当于mvc配置文件中的
<beans></beans>,而@Bean相当于
<bean></bean>。
2、在service层接收controller一组请求参数,发放到线程池中。
@Service public class PathService { private static final Logger logger = LoggerFactory.getLogger(PathService.class); @SuppressWarnings("SpringJavaAutowiringInspection") @Autowired private PathMapper pathMapper; @Autowired private ExecutorService pool; public List<Path> getPool(List<Integer> cates) throws OperationException { ArrayList<Future<List<Path>>> futureList = new ArrayList(4); Future<List<Path>> future; for (Integer cateid : cates) { future = pool.submit(new Callable<List<Path>>() { @Override public List<Path> call() throws Exception { return pathMapper.get(cateid); } }); futureList.add(future); } List<Path> returnList = new ArrayList<>(); List<Path> singleThreadResult; try{ for (Future<List<Path>> f : futureList) { singleThreadResult = f.get(); returnList.addAll(singleThreadResult); } }catch (Exception e){ throw new OperationException(e,3); } return returnList; } }
这个是简单实现的代码,数据库接口就是根据id查询数据库操作。
首先通过 @Autowired的方式托管pool。
pool.submit()方法中放入一个带有返回值的Callable的线程,通过他的run方法请求数据。
用Future对象来接收pool.submit()方法返回的数据,这里的future接收的类型是List类型,list里面是返回的一个个查询的实体对象。
关于Future,它可以拿到callable的执行结果,如果是一个很耗时的操作,我们可以做其他的操作,或者继续添加线程去执行,在未来某一个时间拿到结果。
我们把提交的任务返回值添加到集合中,futureList。这个过程时间是忽略不计的,瞬间完成,真正耗时的操作是future的get()方法,所以我们循环futureList,拿到future的结果,就是我们想要的结果。
做个测试
@Service public class PathService { private static final Logger logger = LoggerFactory.getLogger(PathService.class); @SuppressWarnings("SpringJavaAutowiringInspection") @Autowired private PathMapper pathMapper; @Autowired private ExecutorService pool; public List<Path> getPool(List<Integer> cates) throws OperationException { ArrayList<Future<List<Path>>> futureList = new ArrayList(4); Future<List<Path>> future; for (Integer cateid : cates) { future = pool.submit(new Callable<List<Path>>() { @Override public List<Path> call() throws Exception { if(cateid==63){ TimeUnit.SECONDS.sleep(1); }else if(cateid==8603){ TimeUnit.SECONDS.sleep(2); }else if(cateid==65){ TimeUnit.SECONDS.sleep(3); }else if(cateid==20){ TimeUnit.SECONDS.sleep(4); } //获取当前循环的使用的线程池中线程的名字 System.out.println(Thread.currentThread().getName()); logger.info(Thread.currentThread().getName()); return pathMapper.get(cateid); } }); futureList.add(future); } List<Path> returnList = new ArrayList<>(); //设置总等待时间,4个并行的查询,最大时间取决于最长的一个那一个查询。 long timeout = 10000, elapse = 0; long before = System.currentTimeMillis(); List<Path> singleThreadResult; try{ for (Future<List<Path>> f : futureList) { long t1 = System.currentTimeMillis(); singleThreadResult = f.get(timeout - elapse, TimeUnit.MILLISECONDS); long t2 = System.currentTimeMillis(); returnList.addAll(singleThreadResult); elapse += (t2 - t1); } }catch (TimeoutException e){ throw new OperationException(e,1); }catch (ExecutionException e){ throw new OperationException(e,2); }catch (Exception e){ throw new OperationException(e,3); } long after = System.currentTimeMillis(); System.out.println(after - before); return returnList; } }
我们在每个线程中选择sleep几秒,这里直接用业务的逻辑去睡觉,每个线程睡的时间不一样,并且打印出当前线程的名字。然后我们同样把任务加到future集合里面。
设置一个总的等待时间,设置一个超时的异常。设置总的时间为10s,还有设置一个计算已经用掉的时间,因为每个线程的返回时间不一致。同时在循环future集合执行get()方法前后设置两个时间节点,before,after来记录执行的总时间。后面捕获一个timeout异常,如果总响应时间超过10s,要抛一个自定义的异常。自定义异常,通过不同的异常表示出不同的错误码,以便于在controller层,处理自己的异常的时候,判断相应的错误码来执行相应的业务逻辑,把异常与业务逻辑结合起来。
执行结果:
可以看到,执行时间大概是4s出头,因为睡的最久的线程是4s时间,使用这个方法,总的时间就大致上取决于响应时间最长的那个线程。
如果把一个线程睡眠时间设置为10s就会抛出timeout异常。
相关文章推荐
- 线程池的原理以及实现线程池的类ExecutorService中方法的使用
- 线程池的原理以及实现线程池的类ExecutorService中方法的使用
- ExecutorService线程池的使用
- java利用线程池(ExecutorService)配合Callable和Future实现执行方法超时的阻断
- 运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接
- Android中线程池ExecutorService的使用
- Android(Java) 线程池ExecutorService 使用的注意事项
- 线程池使用之ScheduledExecutorService
- JAVA并行异步编程,线程池+FutureTask+callable+ExecutorService
- 线程池 ExecutorService的使用
- JAVA多线程实现的三种方式(继承Thread类、实现Runnable接口、使用ExecutorService、Callable、Future实现有返回结果的多线程)
- 使用ScheduledExecutorService实现Timer
- ExecutorService来进行线程池的方式实现多线程
- 使用SPRING中的线程池ThreadPoolTaskExecutor实现JAVA并发
- 在线程池(ExecutorService)中使用Threadlocal时,需要注意他的复位操作
- android线程池ExecutorService的使用
- Android线程池:ExecutorService和Executors使用
- ExecutorService实现线程池
- 使用ScheduledExecutorService接口实现Web轮询项目
- java的ExecutorService 实现线程池