ListeningExecutorService配合CountDownLatch多线程并发处理数据
2017-06-05 16:56
507 查看
如今项目里面有个日终任务,会得到一大堆list,如果一个线程去执行的话会花费很长时间,看了不少资料, 决定用如下方式解决,现将200以单位,分为若干个线程,并行执行,等待所有线程执行完之后再执行别的操作。 这种情况下会出现两种情况,一种是需要list对象有返回值得,另外一种是数据处理。 我们先说数据处理的,这种较简单,日终得到一大堆数据,需要并发一个线程执行200,代码处有省略,有基础的应该都可以看得懂。 将list/200+1,决定并发几个线程,for循环执行,并用计数器同步,当所有的并发走完之后,计数器值减为0,此时,统一执行下个任务。 times,如果配置的线程过少,则队列会持续变大,消耗过多内存。而过多的线程又会 由于频繁的上下文切换导致整个 系统的速度变缓——殊途而同归
static ListeningExecutorService Service= null; static CountDownLatch Latch =null; //线程计数器 如果不用执行完后统一走流程,可以不要这个 public static void main(String[] args) { List <User> userlist =new ArrayList<User>();//得到一个list,开启多线程执行 int start=0; int end=200; int times=userlist.size()/200+1; //初始化5的线程池,newFixedThreadPool适用于稳定的并发 Service=MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); //Service=MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());不建议用,有生命周期 Latch=new CountDownLatch(times);//计数器 for(int i=0;i<times;i++){ final List <User> part=userlist.subList(start, start+end);//此处省略分割,自己写循环 Service.execute(new Runnable(){ @Override public void run() { //此处添加业务逻辑 System.out.println(part.size()); } }); Latch.countDown();//计数器会减1; } try { Latch.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ Service.shutdown(); System.out.println("所有的线程在计数器值不为0的时候都会停在Latch.await(),等计 数器值减为0后才会统一执行"); } }
接下来是数据处理后并返回的
static ListeningExecutorService Service = null; // 线程计数器 如果不用执行完后统一走业务,可以不要这个。 static CountDownLatch Latch = null; public static void main(String[] args) { final List <User> userlist =new ArrayList<User>();//得到一个list,开启多线程执行 final List <User> reuserlist =new ArrayList<User>();//得到一个list,开启多线程执行 int times=userlist.size()/200+1; //初始化5的线程池,newFixedThreadPool适用于稳定的并发 Service=MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); Latch=new CountDownLatch(userlist.size());//计数器 for (final User user : reuserlist) { final ListenableFuture<User> submit = Service.submit(new Callable<User>() {//不必与循环对象一致 @Override public User call() throws Exception { //各种逻辑 System.out.println(user.getClass()); return user; } }); Futures.addCallback(submit, new FutureCallback<User>() { //与submit(new Callable<T>())一致 @Override public void onSuccess(User user) { reuserlist.add(user);//将数据处理完的user进行add } @Override public void onFailure(Throwable t) { reuserlist.add(user); log.info(user.getId()+"这个对象执行失败,可以用分布线程池写入日志,此处就不举例了"); t.printStackTrace(); } }); Latch.countDown();//计数器会减1; } try { Latch.await(); reuserlist.size();//此处的reuserlist就是全部经过数据处理的userlist } catch (InterruptedException e) { e.printStackTrace(); }finally{ Service.shutdown(); System.out.println("可以返回这个list:reuserlist"); } }
相关文章推荐
- Java多线程:Callable、ExecutorService、CountDownLatch、Future和FutureTask
- Java多线程——6 并发流程控制CountDownLatch、CycliBarrier
- java多线程并发executorservice(任务调度)类
- 并发工具类(一)等待多线程完成的CountDownLatch
- 共同学习Java源代码-多线程与并发-AbstractExecutorService类(二)
- 多线程和并发中新的任务执行架构Executor、ExecutorService、ScheduledExecutorService
- 【Java多线程与并发库】14.java5的CountDownLatch同步工具
- 线程池使用ExecutorService 多线程处理队列任务
- Java核心知识点学习----多线程 倒计时记数器CountDownLatch和数据交换的Exchanger
- 多线程并发常用类:condition,semaphore,CyclicBarrier,countdownlatch,exchanger使用整理
- [Java并发]使用CountDownLatch和CyclicBarrier等待多线程完成
- C# 多线程并发处理数据库数据,发送信号等待处理完统一插入.(转)
- Java Socket 多线程编程,处理百万级的数据并发。
- Java/Android多线程并发、同步,线程之间通信,主、子线程的一些问题(CountDownLatch、CyclicBarrier和Semaphore)
- 共同学习Java源代码-多线程与并发-Executor、ExecutorService接口
- 一直被忽略的java 多线程并发工具类 CountDownLatch、Semaphore
- Java多线程并发之CountDownLatch
- 多线程并发快速处理数据
- C# 多线程并发处理数据库数据,发送信号等待处理完统一插入
- Java核心知识点学习----多线程 倒计时记数器CountDownLatch和数据交换的Exchanger