您的位置:首页 > 其它

ListeningExecutorService配合CountDownLatch多线程并发处理数据

2017-06-05 16:56 507 查看
如今项目里面有个日终任务,会得到一大堆list,如果一个线程去执行的话会花费很长时间,看了不少资料,
决定用如下方式解决,现将200以单位,分为若干个线程,并行执行,等待所有线程执行完之后再执行别的操作。
这种情况下会出现两种情况,一种是需要list对象有返回值得,另外一种是数据处理。

我们先说数据处理的,这种较简单,日终得到一大堆数据,需要并发一个线程执行200,代码处有省略,有基础的应该都可以看得懂。
将list/200+1,决定并发几个线程,for循环执行,并用计数器同步,当所有的并发走完之后,计数器值减为0,此时,统一执行下个任务。

times,如果配置的线程过少,则队列会持续变大,消耗过多内存。而过多的线程又会 由于频繁的上下文切换导致整个
系统的速度变缓——殊途而同归


static ListeningExecutorService Service= null;
static CountDownLatch Latch =null; //线程计数器          如果不用执行完后统一走流程,可以不要这个
public static void main(String[] args) {
List <User> userlist =new ArrayList<User>();//得到一个list,开启多线程执行
int start=0;
int end=200;
int times=userlist.size()/200+1;
//初始化5的线程池,newFixedThreadPool适用于稳定的并发
Service=MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
//Service=MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());不建议用,有生命周期
Latch=new CountDownLatch(times);//计数器
for(int i=0;i<times;i++){
final List <User> part=userlist.subList(start, start+end);//此处省略分割,自己写循环
Service.execute(new Runnable(){
@Override
public void run() {
//此处添加业务逻辑
System.out.println(part.size());
}
});
Latch.countDown();//计数器会减1;
}
try {
Latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
Service.shutdown();
System.out.println("所有的线程在计数器值不为0的时候都会停在Latch.await(),等计
数器值减为0后才会统一执行");
}
}


接下来是数据处理后并返回的


static ListeningExecutorService Service = null;
// 线程计数器 如果不用执行完后统一走业务,可以不要这个。
static CountDownLatch Latch = null;

public static void main(String[] args) {
final List <User> userlist =new ArrayList<User>();//得到一个list,开启多线程执行
final List <User> reuserlist =new ArrayList<User>();//得到一个list,开启多线程执行
int times=userlist.size()/200+1;
//初始化5的线程池,newFixedThreadPool适用于稳定的并发
Service=MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
Latch=new CountDownLatch(userlist.size());//计数器
for (final User user : reuserlist) {
final ListenableFuture<User> submit = Service.submit(new Callable<User>() {//不必与循环对象一致
@Override
public User call() throws Exception {
//各种逻辑
System.out.println(user.getClass());
return user;
}
});

Futures.addCallback(submit, new FutureCallback<User>() {    //与submit(new Callable<T>())一致
@Override
public void onSuccess(User user) {
reuserlist.add(user);//将数据处理完的user进行add
}

@Override
public void onFailure(Throwable t) {
reuserlist.add(user);
log.info(user.getId()+"这个对象执行失败,可以用分布线程池写入日志,此处就不举例了");
t.printStackTrace();
}
});
Latch.countDown();//计数器会减1;
}
try {
Latch.await();
reuserlist.size();//此处的reuserlist就是全部经过数据处理的userlist
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
Service.shutdown();
System.out.println("可以返回这个list:reuserlist");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐