您的位置:首页 > 编程语言

并行计算简介和多核CPU编程Demo

2011-11-24 11:46 447 查看


作者: 侯思松 (1
篇文章) 日期: 五月 5, 2009 在 5:18 下午

http://software.intel.com/zh-cn/blogs/2009/05/05/cpudemo/

tag:多线程,并行计算,OpenMP,多核编程,工作线程池

( 2008.01.19 更新 鉴于读者反映代码阅读困难,重新改写了文章和实现,使文章更易读 )

( 2007.09.04 更新 把用事件控制的线程启动更新为临界区的实现 )

2006年是双核的普及年,双核处理器出货量开始超过单核处理器出货量;2006年的11月份Intel开始供货4核;AMD今年也将发布4核,并计划今年下半年发布8核;

按照Intel一个文档所说:"假定22纳米处理时帧上有一枚13毫米大小的处理器,其上有40亿个晶体管、48MB高速缓存,功耗为100W。利用如此数量的晶体管,我们可设计拥有12个较大内核、48个(多核)中型内核、或144个小型内核(许多个内核)的处理器。"

而且Intel已经开发完成了一款80核心处理器原型,速度达到每秒一万亿次浮点运算。

随着个人多核CPU的普及,充分利用多核CPU的性能优势摆在了众多开发人员的面前;

以前的CPU升级,很多时候软件性能都能够自动地获得相应提升,而面对多核CPU,免费的午餐没有了,开发人员必须手工的完成软件的并行化,以从爆炸性增长的CPU性能中获益;

(ps:我想,以后的CPU很可能会集成一些专门用途的核(很可能设计成比较通用的模式),比如GPU的核、图象处理的核、向量运算的核、加解密编解码的核、FFT计算的核、物理计算的核、神经网络计算的核等等:D )

先来看一下单个CPU上的并行计算:

单CPU上常见的并行计算:多级流水线(提高CPU频率的利器)、超标量执行(多条流水线并同时发送多条指令)、乱序执行(指令重排)、单指令流多数据流SIMD、超长指令字处理器(依赖于编译器分析)等

并行计算简介

并行平台的通信模型: 共享数据(POSIX、windows线程、OpenMP)、消息交换(MPI、PVM)

并行算法模型: 数据并行模型、任务依赖图模型、工作池模型、管理者-工作者模型、消费者模型

对于并行计算一个任务可能涉及到的问题: 任务分解、任务依赖关系、任务粒度分配、并发度、任务交互

并行算法性能的常见度量值: 并行开销、加速比、效率(加速比/CPU数)、成本(并行运行时间*CPU数)

A:一个简单的计算Demo

演示中主要完成的工作在Sum0函数(工作本身没有什么意义,主要是消耗一些时间来代表需要做的工作:),然后分别用OpenMP工具(vc和icc编译器支持)和一个自己手工写的线程工具来并行化该函数,来看看多核优化后的效果; 我测试用的编译器是vc2005;CPU是双核的AMD64x2 4200+(2.37G);内存2G双通道DDR2 677MHz;

原始代码如下:

#include

#include

#include

#include

//一个简单的耗时任务

double Sum0(double* data,long data_count);

int main()

{

long data_count=200000;

double* data=new double[data_count];

long i;

//初始化测试数据

for (i=0;i

data[i]=(double)(rand()*(1.0/RAND_MAX));

const long test_count=200*2;//为了能够测量出代码执行的时间,让函数执行多次

double sumresult=0;

double runtime=(double)clock();

for( i=0; i

{

sumresult+=Sum0(data,data_count);

}

runtime=((double)clock()-runtime)/CLOCKS_PER_SEC;

printf ("< Sum0 > ");

printf (" 最后结果 = %10.4f ",sumresult);

printf (" 执行时间(秒) = %f ",runtime); delete [] data;

return 0;

}

double Sum0(double* data,long data_count)

{

double result=0;

for (long i=0;i

{

data[i]=(double)sin(cos(data[i]));

result+=data[i];

}

return result;

}

在我的电脑上运行输出如下:

< Sum0 >

最后结果 = 55590743.4039

执行时间(秒) = 6.156000

B:使用OpenMP来优化(并行化)Sum0函数

OpenMP是基于编译器命令的并行编程标准,使用的共享数据模型,现在可以用在C/C++、Fortan中;OpenMP命令提供了对并发、同步、数据读写的支持;

(需要在项目属性中打开多线程和OpenMP支持,并要在多核CPU上执行才可以看到多CPU并行的优势)

OpenMP的实现如下:

#include

#include

#include

#include

//需要在项目属性中打开多线程和OpenMP支持

#include

//用OpenMP实现

double Sum_OpenMP(double* data,long data_count);

int main()

{

long data_count=200000;

double* data=new double[data_count];

long i;

//初始化测试数据

for (i=0;i

data[i]=(double)(rand()*(1.0/RAND_MAX));

const long test_count=200*2;//为了能够测量出代码执行的时间,让函数执行多次

double sumresult=0;

double runtime=(double)clock();

for( i=0; i

{

sumresult+=Sum_OpenMP(data,data_count);

}

runtime=((double)clock()-runtime)/CLOCKS_PER_SEC;

printf ("< Sum_OpenMP > ");

printf (" 最后结果 = %10.4f ",sumresult);

printf (" 执行时间(秒) = %f ",runtime); delete [] data;

return 0;

}

double Sum_OpenMP(double* data,long data_count)

{

double result=0;

#pragma omp parallel for schedule(static) reduction(+: result)

for (long i=0;i

{

data[i]=(double)sin(cos(data[i]));

result+=data[i];

}

return result;

}

Sum_OpenMP函数相对于Sum0函数只是增加了一句"#pragma omp parallel for schedule(static) reduction(+: result)" ; 它告诉编译器并行化下面的for循环,并将多个result变量值用+合并;(更多的OpenMP语法请参阅相关资料);

程序运行输出如下:

< Sum_OpenMP >

最后结果 = 55590743.4039

执行时间(秒) = 3.078000

在我的双核电脑上,OpenMP优化的并行代码使程序速度提高了约100%!

C:利用多线程来并行化Sum0函数(使用了我的CWorkThreadPool多线程工具类,完整源代码在后面)

需要在项目属性中打开多线程支持; 多线程并行实现如下:

#include

#include

#include

#include

#include

#include "WorkThreadPool.h" //使用CWorkThreadPool类

double Sum_WorkThreadPool(double* data,long data_count);

int main()

{

long data_count=200000;

double* data=new double[data_count];

long i;

//初始化测试数据

for (i=0;i

data[i]=(double)(rand()*(1.0/RAND_MAX));

const long test_count=200*2;//为了能够测量出代码执行的时间,让函数执行多次

double sumresult=0;

double runtime=(double)clock();

for( i=0; i

{

sumresult+=Sum_WorkThreadPool(data,data_count);

}

runtime=((double)clock()-runtime)/CLOCKS_PER_SEC;

printf ("< Sum_WorkThreadPool > ");

printf (" 最后结果 = %10.4f ",sumresult);

printf (" 执行时间(秒) = %f ",runtime); delete [] data;

return 0;

}

double Sum0(double* data,long data_count)

{

double result=0;

for (long i=0;i

{

data[i]=(double)sin(cos(data[i]));

result+=data[i];

}

return result;

}

struct TWorkData

{

double* part_data;

long part_data_count;

double result;

};

void sum_callback(TWorkData* wd)

{

wd->result=Sum0(wd->part_data,wd->part_data_count);

}

double Sum_WorkThreadPool(double* data,long data_count)

{

long work_count=CWorkThreadPool::best_work_count();

std::vector work_list(work_count);

std::vector pwork_list(work_count);

long i;

//给线程分配任务

long part_data_count=data_count/work_count;

for (i=0;i

{

work_list[i].part_data=&data[part_data_count*i];

work_list[i].part_data_count=part_data_count;

}

work_list[work_count-1].part_data_count=data_count-part_data_count*(work_count-1);

for (i=0;i

pwork_list[i]=&work_list[i];

//利用多个线程执行任务 阻塞方式的调用

CWorkThreadPool::work_execute((TThreadCallBack)sum_callback,(void**)&pwork_list[0],pwork_list.size());

double result=0;

for (i=0;i

result+=work_list[i].result;

return result;

}

用多线程来把代码并行化从而利用多个CPU核的计算能力,这种方式具有比OpenMP更好的灵活性;但容易看出这种方式没有OpenMP的实现简便; Sum_WorkThreadPool函数更多的代码在处理将计算任务分解成多个独立任务,然后将这些任务交给CWorkThreadPool执行; 程序执行输出如下:

< Sum_WorkThreadPool >

最后结果 = 55590743.4039

执行时间(秒) = 3.063000

在我的双核电脑上,多线程优化的并行代码使程序速度提高了约101%!

D: 附录: CWorkThreadPool类的完整源代码

(欢迎改进CWorkThreadPool类的代码,使它满足各种各样的并行需求)

//CWorkThreadPool的声明文件 WorkThreadPool.h

//WorkThreadPool.h

/////////////////////////////////////////////////////////////

//工作线程池 CWorkThreadPool

//用于把一个任务拆分成多个线程任务,从而可以使用多个CPU

//HouSisong@GMail.com

////////////////////////////

//todo:改成任务领取模式

//要求:1.任务分割时分割的任务量比较接近

// 2.任务也不要太小,否则线程的开销可能会大于并行的收益

// 3.任务数最好是CPU数的倍数

#ifndef _WorkThreadPool_H_

#define _WorkThreadPool_H_

typedef void (*TThreadCallBack)(void * pData);

class CWorkThreadPool

{

public:

static long best_work_count(); //返回最佳工作分割数,现在的实现为返回CPU个数

static void work_execute(const TThreadCallBack work_proc,void** word_data_list,int work_count); //并行执行工作,并等待所有工作完成

static void work_execute_multi(const TThreadCallBack* work_proc_list,void** word_data_list,int work_count); //同上,但不同的work调用不同的函数

static void work_execute_single_thread(const TThreadCallBack work_proc,void** word_data_list,int work_count) //单线程执行工作,并等待所有工作完成;用于调试等

{

for (long i=0;i

work_proc(word_data_list[i]);

}

static void work_execute_single_thread_multi(const TThreadCallBack* work_proc_list,void** word_data_list,int work_count) //单线程执行工作,并等待所有工作完成;用于调试等

{

for (long i=0;i

work_proc_list[i](word_data_list[i]);

}

};

#endif //_WorkThreadPool_H_

//CWorkThreadPool的实现文件 WorkThreadPool.cpp

/////////////////////////////////////////////////////////////

//工作线程池 TWorkThreadPool

#include #include

#include "windows.h"

#include "WorkThreadPool.h"

//#define _IS_SetThreadAffinity_

//定义该标志则执行不同的线程绑定到不同的CPU,减少线程切换开销; 不鼓励

class TCriticalSection

{

private:

RTL_CRITICAL_SECTION m_data;

public:

TCriticalSection() { InitializeCriticalSection(&m_data); }

~TCriticalSection() { DeleteCriticalSection(&m_data); }

inline void Enter() { EnterCriticalSection(&m_data); }

inline void Leave() { LeaveCriticalSection(&m_data); }

};

class TWorkThreadPool;

//线程状态

enum TThreadState{ thrStartup=0, thrReady, thrBusy, thrTerminate, thrDeath };

class TWorkThread

{

public:

volatile HANDLE thread_handle;

volatile enum TThreadState state;

volatile TThreadCallBack func;

volatile void * pdata; //work data

TCriticalSection* CriticalSection;

TCriticalSection* CriticalSection_back;

TWorkThreadPool* pool;

volatile DWORD thread_ThreadAffinityMask; TWorkThread() { memset(

this,0,sizeof(TWorkThread)); }

};

void do_work_end(TWorkThread* thread_data);

void __cdecl thread_dowork(TWorkThread* thread_data) //void __stdcall thread_dowork(TWorkThread* thread_data)

{

volatile TThreadState& state=thread_data->state;

#ifdef _IS_SetThreadAffinity_

SetThreadAffinityMask(GetCurrentThread(),thread_data->thread_ThreadAffinityMask);

#endif

state = thrStartup;

while(true)

{

thread_data->CriticalSection->Enter();

thread_data->CriticalSection->Leave();

if(state == thrTerminate)

break; state = thrBusy;

volatile TThreadCallBack& func=thread_data->func;

if (func!=0)

func((void *)thread_data->pdata);

do_work_end(thread_data);

}

state = thrDeath;

_endthread();

//ExitThread(0);

}

class TWorkThreadPool

{

private:

std::vector CriticalSections;

std::vector CriticalSections_back;

std::vector work_threads;

mutable long cpu_count;

inline long get_cpu_count() const {

if (cpu_count>0) return cpu_count; SYSTEM_INFO SystemInfo;

GetSystemInfo(&SystemInfo);

cpu_count=SystemInfo.dwNumberOfProcessors;

return cpu_count;

}

inline long passel_count() const { return (long)work_threads.size()+1; }

void inti_threads()

{

long best_count =get_cpu_count();

long newthrcount=best_count - 1;

work_threads.resize(newthrcount);

CriticalSections.resize(newthrcount);

CriticalSections_back.resize(newthrcount);

long i;

for( i= 0; i < newthrcount; ++i)

{

CriticalSections[i]=new TCriticalSection();

CriticalSections_back[i]=new TCriticalSection();

work_threads[i].CriticalSection=CriticalSections[i];

work_threads[i].CriticalSection_back=CriticalSections_back[i];

CriticalSections[i]->Enter();

CriticalSections_back[i]->Enter();

work_threads[i].state = thrTerminate;

work_threads[i].pool=this;

work_threads[i].thread_ThreadAffinityMask=1<<(i+1);

work_threads[i].thread_handle =(HANDLE)_beginthread((void (__cdecl *)(void *))thread_dowork, 0, (void*)&work_threads[i]);

//CreateThread(0, 0, (LPTHREAD_START_ROUTINE)thread_dowork,(void*) &work_threads[i], 0, &thr_id);

//todo: _beginthread 的错误处理

}

#ifdef _IS_SetThreadAffinity_

SetThreadAffinityMask(GetCurrentThread(),0x01);

#endif

for(i = 0; i < newthrcount; ++i)

{

while(true) {

if (work_threads[i].state == thrStartup) break;

else Sleep(0);

}

work_threads[i].state = thrReady;

}

}

void free_threads(void)

{

long thr_count=(long)work_threads.size();

long i;

for(i = 0; i

{

while(true) {

if (work_threads[i].state == thrReady) break;

else Sleep(0);

}

work_threads[i].state=thrTerminate;

}

for (i=0;i

{

CriticalSections[i]->Leave();

CriticalSections_back[i]->Leave();

}

for(i = 0; i

{

while(true) {

if (work_threads[i].state == thrDeath) break;

else Sleep(0);

}

}

work_threads.clear();

for (i=0;i

{

delete CriticalSections[i];

delete CriticalSections_back[i];

}

CriticalSections.clear();

CriticalSections_back.clear();

}

void passel_work(const TThreadCallBack* work_proc,int work_proc_inc,void** word_data_list,int work_count) {

if (work_count==1)

(*work_proc)(word_data_list[0]);

else

{

const TThreadCallBack* pthwork_proc=work_proc;

pthwork_proc+=work_proc_inc;

long i;

long thr_count=(long)work_threads.size();

for(i = 0; i < work_count-1; ++i)

{

work_threads[i].func = *pthwork_proc;

work_threads[i].pdata =word_data_list[i+1];

work_threads[i].state = thrBusy;

pthwork_proc+=work_proc_inc;

}

for(i = work_count-1; i < thr_count; ++i)

{

work_threads[i].func = 0;

work_threads[i].pdata =0;

work_threads[i].state = thrBusy;

}

for (i=0;i

CriticalSections[i]->Leave();

//current thread do a work

(*work_proc)(word_data_list[0]);

//wait for work finish

for(i = 0; i

{

while(true) {

if (work_threads[i].state == thrReady) break;

else Sleep(0);

}

}

CriticalSections.swap(CriticalSections_back);

for (i=0;i

CriticalSections_back[i]->Enter();

}

}

void private_work_execute(TThreadCallBack* pwork_proc,int work_proc_inc,void** word_data_list,int work_count) {

while (work_count>0)

{

long passel_work_count;

if (work_count>=passel_count())

passel_work_count=passel_count();

else

passel_work_count=work_count; passel_work(pwork_proc,work_proc_inc,word_data_list,passel_work_count);

pwork_proc+=(work_proc_inc*passel_work_count);

word_data_list=&word_data_list[passel_work_count];

work_count-=passel_work_count;

}

}

public:

explicit TWorkThreadPool():work_threads(),cpu_count(0) { inti_threads(); }

~TWorkThreadPool() { free_threads(); }

inline long best_work_count() const { return passel_count(); }

inline void DoWorkEnd(TWorkThread* thread_data){

thread_data->func=0;

thread_data->state = thrReady;

std::swap(thread_data->CriticalSection,thread_data->CriticalSection_back);

} inline

void work_execute_multi(TThreadCallBack* pwork_proc,void** word_data_list,int work_count) {

private_work_execute(pwork_proc,1,word_data_list,work_count);

}

inline void work_execute(TThreadCallBack work_proc,void** word_data_list,int work_count) {

private_work_execute(&work_proc,0,word_data_list,work_count);

}

};

void do_work_end(TWorkThread* thread_data)

{

thread_data->pool->DoWorkEnd(thread_data);

}

//TWorkThreadPool end;

////////////////////////////////////////

TWorkThreadPool g_work_thread_pool;//工作线程池

long CWorkThreadPool::best_work_count() { return g_work_thread_pool.best_work_count(); }

void CWorkThreadPool::work_execute(const TThreadCallBack work_proc,void** word_data_list,int work_count)

{

g_work_thread_pool.work_execute(work_proc,word_data_list,work_count);

}

void CWorkThreadPool::work_execute_multi(const TThreadCallBack* work_proc_list,void** word_data_list,int work_count)

{

g_work_thread_pool.work_execute_multi((TThreadCallBack*)work_proc_list,word_data_list,work_count);

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: