您的位置:首页 > 其它

用CountDownLatch控制主线程等待所有多线程结束

2016-11-21 14:45 781 查看
程序需要从数据库抓取大量数据,有170万+的记录,经过处理后写到一个CSV 文件,然后调用R 程序执行匹配算法,之前测试的是单个主线程执行,光R 程序就耗费了70分钟左右。所以,我考虑使用多线程来缩减执行时间。

(这里,大致阐述下我们的需求,假设病例组有3000人,而对照组有170万人,如果按照 1:4 的比例,从这170万人中找出 3000 × 4 =12000人出来,R程序就是读取这个含有170万条记录的CSV文件,然后返回 15000 条记录,其中包含 病例组 3000人。)

我的算法是:根据病例组和对照组的人数比例,来确定线程多少,比如如果对照组人数是病例组的 16倍以上,那么就开 8 个线程,每个线程读入病例组,然后基本均分 1/8 的对照组,比如我这个情况就是每组20多万人。线程都执行完后,每个线程返回15000,然后用 Set 接这些记录,也就是99000人的样子(每个组都包含3000病例组人数),再次执行一次匹配程序,就可以获得最终的非常匹配病例组的 12000人。

===============================================================

看下面的程序,有那么几点我花了心思,犯了错:

1. CountDownLatch的countDown() 必须在 call() 方法里调用,否则不起作用;如果不用CountDownLatch或者其他的方式,主线程会继续执行,其实我们需要主线程必须等待所有子线程执行完,然后汇总他们的匹配结果

2. 为了在 call() 方法里可以调用 countDown() 方法,需要定义一个类,类里面传入 CountDownLatch实例。我定义了一个抽象的方法内部类,并且泛型是 T ,在真正调用的时候使用 <List<String> 并且即时实现 call() 方法

3. 然后调用 await() 方法,挂起主程序,等待所有子线程结束;结束后,就销毁线程池

4. 从 Future 里面拿结果的时候,我这里用了 while loop, 其实有了前面的 await 同步后,while loop 是多余的,为了警示,我还是要了,并且加了 break 退出循环。特别提出这个break 语句,之前没加,导致程序一直报 java.lang.OutOfMemoryError: Java heap space 

Set<String> reducedPersonIdList = new HashSet<>();
int groupSize = controlList.size() / threadNum + 1; // 加 1 是防止最后一组的limit会漏掉几个人,因为int型是floor截断的

logger.debug("先执行一次多线程跑匹配程序。");
CountDownLatch runningThreadNum = new CountDownLatch(threadNum);
ExecutorService es = Executors.newFixedThreadPool(threadNum);
List<Future<List<String>>> futureResults = new ArrayList<>();

abstract class MyCallable<T> implements Callable<T> {
CountDownLatch latch;
int matchRatio;
List<String> matchVarNames;
Collection<LinkedHashMap<String, String>> caseList;
Collection<LinkedHashMap<String, String>> subControlList;
String dirStr;

public MyCallable(List<String> matchVarNames,
Collection<LinkedHashMap<String, String>> caseList,
Collection<LinkedHashMap<String, String>> subControlList,
int matchRatio, String dirStr, CountDownLatch latch) {
super();
this.latch = latch;
this.matchRatio = matchRatio;
this.matchVarNames = matchVarNames;
this.caseList = caseList;
this.subControlList = subControlList;
this.dirStr = dirStr;
}
}

for (int i = 0; i < threadNum; i++){
Collection<LinkedHashMap<String, String>> groupControlList =
controlList.stream().skip(i * groupSize).limit(groupSize).collect(Collectors.toList());

Future<List<String>> threadRet = es.submit(new MyCallable<List<String>>(
matchVarNames, caseList, groupControlList, matchRatio, dirStr, runningThreadNum){
@Override
public List<String> call() throws Exception {
String[] ret = executeRInThreads(matchVarNames, caseList, subControlList, matchRatio, dirStr);
latch.countDown();
return Lists.newArrayList(ret);
}
});
futureResults.add(threadRet);
}

try {
runningThreadNum.await();
} catch (InterruptedException e) {
logger.debug("等待所有子线程执行完毕才能走主线程,但是却被打断了,发生了异常: ", e);
}
es.shutdown(); // 关闭线程池,释放资源

logger.debug("所有子线程都已执行完毕,开始从Future里面拿初次匹配后的用户ID列表。");
for (Future<List<String>> result : futureResults){
try {
// 其实CountDownLatch已经控制了所有子线程同步结束,所以isDone()肯定返回true
while(result.isDone()){
List<String> pIdList = result.get();
reducedPersonIdList.addAll(pIdList);
break; // 此处必须退出循环,否则会不断执行,直到报出java.lang.OutOfMemoryError: Java heap space
}
} catch (Exception e) {
throw new RuntimeException("执行多线程匹配的时候,在获取Future.get()的时候发出异常:", e);
}
}

logger.debug("Map-Reduce后,可以汇总初次匹配后的对照组。");
List<LinkedHashMap<String, String>> reducedControlList = new ArrayList<>();
for (LinkedHashMap<String, String> item : controlList){
if (reducedPersonIdList.contains(item.get(VinciConstants.VARIABLE_PERSON_ID_NAME))){
reducedControlList.add(item);
}
}

logger.debug("执行最后一次匹配程序,将返回最终匹配的对照组用户ID列表。");
personIdsAfterMatch = executeRInThreads(matchVarNames, caseList, reducedControlList, matchRatio, dirStr);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息