您的位置:首页 > 其它

线程数量与并行应用性能相关性的测试

2015-10-06 22:29 363 查看
一直被一个问题所困扰,在并行应用中,任务划分的粒度达到多少合适?或者说,采用多线程时,启用多少线程能够达到最佳性能?

网上有一些资料给出了参考:

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

如果是IO密集型任务,参考值可以设置为2*NCPU

那么,我们就使用示例程序来实地测试一下吧!

ComputePerformanceTest.java

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/*
* @author pzy
* @version 20151006
* @funcion test the Performance of serial and parallel compute tasks
*/

public class ComputePerformanceTest {

public static void main(String[] args) {
System.out.println("ComputePerformanceTest is running..");

int processors = Runtime.getRuntime().availableProcessors();
System.out.println(Integer.toString(processors) + " processor"
+ (processors != 1 ? "s are " : " is ")
+ "available");

//串行计算
ComputeTaskManager manager = new ComputeTaskManager(20, 20);
Long startTime = System.currentTimeMillis();
manager.computeInSerial();
System.out.printf("computeInSerial finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//20线程并行计算
manager = new ComputeTaskManager(20, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(20,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//10线程并行计算
manager = new ComputeTaskManager(10, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(10,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//4线程并行计算
manager = new ComputeTaskManager(4, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(4,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//2线程并行计算
manager = new ComputeTaskManager(2, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(2,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();
}
}

class ComputeTaskManager {
private long start = 1;
private long end = 1000000000;
private long taskNum;
ExecutorService pool;

public ComputeTaskManager(int taskNum, int pooSize) {
this.taskNum = taskNum;
pool = Executors.newFixedThreadPool(pooSize);
}

public void close(){
pool.shutdown();
}

public Long computeInSerial(){
System.out.printf("=====computeInSerial() start=====%n");
long sum = new SumTask(start, end).sum();
System.out.printf("=====computeInSerial() end=====%n");
return sum;
}

public Long computeInParallel(){
System.out.printf("=====computeInParallel(%d) start=====%n", taskNum);
//将任务分拆为多个子任务
long aver = (end - start + 1)/ taskNum;
long sum = 0;
for (int i = 0; i < taskNum; i++) {
try {
long startNum = this.start + i * aver;
long endNum = startNum + aver - 1;
if(endNum > this.end)
endNum = this.end;
sum += pool.submit(new SumTask(startNum, endNum)).get();
startNum = startNum + aver + 1;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.printf("=====computeInParallel(%d) end=====%n", taskNum);
return sum;
}
}

class SumTask implements Callable<Long>{

private long start;
private long end;
private int SUM_COUNT = 50;

public SumTask(long start, long end) {
this.start = start;
this.end = end;
}

public Long sum(){
long sum = 0;
for (int i = 0; i < SUM_COUNT; i++) {
for (long j = start; j <= end; j++) {
sum += j;
}
}
return sum;
}

@Override
public Long call() throws Exception {
// TODO Auto-generated method stub
return sum();
}
}


IOPerformanceTest.java

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/*
* @author pzy
* @version 20151006
* @funcion test the Performance of serial and parallel IO tasks
*/

public class IOPerformanceTest {

public static void main(String[] args) {
System.out.println("IOPerformanceTest is running..");

//打印cpu核心数
int processors = Runtime.getRuntime().availableProcessors();
System.out.println(Integer.toString(processors) + " processor"
+ (processors != 1 ? "s are " : " is ")
+ "available");

//串行计算
IOTaskManager manager = new IOTaskManager(20, 10);
Long startTime = System.currentTimeMillis();
manager.computeInSerial();
System.out.printf("computeInSerial finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//20线程并行计算
manager = new IOTaskManager(20, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(20,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//10线程并行计算
manager = new IOTaskManager(10, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(10,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//4线程并行计算
manager = new IOTaskManager(4, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(4,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();

//2线程并行计算
manager = new IOTaskManager(2, 20);
startTime = System.currentTimeMillis();
manager.computeInParallel();
System.out.printf("result of computeInParallel(2,20) finished ,costs %d milliseconds%n",System.currentTimeMillis() - startTime);
manager.close();
}
}

class IOTaskManager {
private int taskNum;
ExecutorService pool;
CountDownLatch countDownLatch;

public IOTaskManager(int taskNum, int pooSize) {
this.taskNum = taskNum;
countDownLatch = new CountDownLatch(taskNum);
pool = Executors.newFixedThreadPool(pooSize);
}

public void close(){
pool.shutdown();
}

void computeInSerial(){
System.out.printf("=====computeInSerial(%d) start=====%n", taskNum);
for (int i = 0; i < taskNum; i++) {
new IOTask(i).compute();
}
System.out.printf("=====computeInSerial(%d) end=====%n", taskNum);

}

void computeInParallel(){
System.out.printf("=====computeInParallel(%d) start=====%n", taskNum);
for (int i = 0; i < taskNum; i++) {
pool.execute(new IOTask(i, countDownLatch));
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.printf("=====computeInParallel(%d) end=====%n", taskNum);
}
}

class IOTask implements Runnable{

int num;
CountDownLatch countDownLatch;

public IOTask(int num) {
this.num = num;
}

public IOTask(int num, CountDownLatch countDownLatch) {
this.num = num;
this.countDownLatch = countDownLatch;
}

public void compute(){

try {
Thread.currentThread();
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(countDownLatch != null){

long count = countDownLatch.getCount() - 1;
//System.out.printf("[task %d] countDownLatch is %d%n", num, count);
System.out.printf("task %d has finished %n", num);
countDownLatch.countDown();
}
}

@Override
public void run() {
// TODO Auto-generated method stub
compute();
}

}


测试环境:

PC1:联想Y450/INTEL CORE2 T6500 双核2线程/UBUNTU 14.04 64bit

PC2:联想G50-70/INTEL CORE I7-4510U 双核4线程/WIN8.1 64bit

测试过程:

在两台PC上分别运行测试程序,并统计性能数据,结果如下:

《未完待续》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: