您的位置:首页 > 编程语言 > Go语言

多线程技术模拟并行计算之二:数组前缀和(Prefix Sum)

2014-09-06 20:59 435 查看
一、前缀和(Prefix Sum)定义:
给定一个数组A[1..n],前缀和数组PrefixSum[1..n]定义为:PrefixSum[i] = A[0]+A[1]+...+A[i-1];
例如:A[5,6,7,8] --> PrefixSum[5,11,18,26]
PrefixSum[0] =A[0] ;
PrefixSum[1] =A[0] + A[1] ; 
PrefixSum[2] =A[0] + A[1] + A[2] ;
PrefixSum[3] =A[0] + A[1] + A[2] + A[3] ;
 
二、串行程序(Sequential Program):
时间复杂度:O(n);空间复杂度:O(n)。n是数组A的长度。
伪代码:

Procedure PS(A,n,S)
S[0] = A[0];
if n = 1 then exit;
else { for j = 1 to n do S[j] = S[j-1] +A[j] }</span>


串行代码sequence.c:

#include<stdio.h>
#include<time.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>

#define n 1000
#define RANGE 2

int A
={0};
intprefix_sum
={0};
int num;

void work()
{
int i ;
prefix_sum[0]=A[0];
for(i=1;i<num;i++)
{
prefix_sum[i] = prefix_sum[i-1]+A[i];
}
}

void generate()
{
FILE *file;
if( (file=fopen("/home/huawei/leaguenew/parallel_computing/A.txt","wt"))==NULL )
{
perror("open fail\n");
exit(1);
}
int i;
srand((unsigned)time(NULL)) ;
for( i =0 ; i < num ; i++ )
fprintf(file,"%-4d",rand()%RANGE);
//fprintf(file,"\n");
fclose(file);
}

void Read()
{
FILE *file;
if( (file =fopen("/home/huawei/leaguenew/parallel_computing/A.txt","rt"))==NULL )
{
perror("open fail\n");
exit(1);
}
int i;
srand((unsigned)time(NULL));
for(i=0;i<num;i++)
fscanf(file,"%d",&A[i]);
fclose(file);
}

int main()
{
printf("please input the scale ofinput :  ");
scanf("%d",&num);
generate();
Read();
int h;

clock_t start = clock();
work();
clock_t finish = clock();
printf("the time cost is : %.6fseconds\n",(double)(finish-start)/CLOCKS_PER_SEC);

for(h=0;h<num;h++)
printf("%d ",A[h]);
printf("\nThe prefix sum is :\n");
for(h=0;h<num;h++)
printf("%d ",prefix_sum[h]);
printf("\n");
return 0;
}

代码中的文件路径需要自行修改。

 
递归实现:

#include<iostream>
using namespace std;

int ps[10];

int prefix_sum(inta[] ,int n)
{
if( n == 0 )
{
ps[0] = a[0];
return ps[0];
}
ps
=  a
+prefix_sum(a,n-1);
return ps
;
}

int main()
{
int a[] = {3,1,2,5,7,4,9};
int len = sizeof(a)/sizeof(a[0]);
prefix_sum(a,len-1) ;

for(int i=0;i<len;i++)
{
cout<<ps[i]<<"";
}
cout<<endl;
system("pause");
return 0;
}


三、并行算法
3.1.Non RecursivePrefix_Sum
首先让A[1..n]来表示n个整数的数组。让B[h][j]作为辅助数据在二叉树的前向遍历中保存中间信息(后面会说明方法),其中1<=h<=log2(n) and 1<=j<=n/2^h,然而数组C被用作后向遍历树下存放结果。
 
Input:大小为n=2^h的一个数组A,h是一个非负整数
Output:矩阵C存放结果,C[0][j]就是前缀和的第j项,1<=j<=n
 
伪代码:

begin
for 1<=j<=n <strong>pardo</strong>
set B[0,j] = A[j];

for h = 1 to log2(n) do
for 1<=j<=n/2^h <strong>pardo</strong>
set B[h,j] = B[h-1,2*j-1] + B[h-1,2*j];

for h = log2(n) to 0 do
for 1<=j<n/2^h <strong>pardo</strong>
{
j is even : set C[h,j] = C[h+1,j/2];
j = 1     : set C[h,1] = B[h,1];
j is odd  : set C[h,j] = C[h+1,(j-1)/2] + B[h,j];
}
end


说明:伪代码中的pardo是parallel do的意思,即并行处理。



                                         Data Flow(Forward)
运行时间:并行的运行时间就是O(log2(n))其实也就是树的高度,每一层并行一次时间复杂度为O(1)。
注意:这是一个前向遍历,从底而上。对于C的后向遍历同样的,区别在于从上而下。
 


                                           DataFlow(Backward)
数组C的生成根据上述伪代码生成,最终的C[0][j](1<=j<=n)就是最后的前缀和的结果。

上代码:

#include<stdio.h>
#include<time.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>
#include<stdlib.h>
#include<math.h>

#define RANGE 99
#define M 1000

void generate(int n);
void Read(int n);
void *func1(void *arg);
void *func2(void *arg);
void *func3(void *arg);
void calculate();
void resultToFile();

int A[M];
int B[20][1000];
int prefix_sum[20][1000]={0};
pthread_t tids[20][1000];
pthread_mutex_t mutex;
int n;

struct S
{
int i ;
int j ;
};

int main()
{
printf("please input the scale of input :  ");
scanf("%d",&n);
generate(n);
Read(n);

clock_t  start = clock();
calculate();
resultToFile();
clock_t finish = clock();

printf("the time cost is : %.6f seconds\n",(double)(finish-start)/CLOCKS_PER_SEC);
return 0;
}

void calculate()
{
int h,i,j,k,l,flag;
/*======set B[0][j]=A[j] in parallel==================*/
for(i=1;i<=n;i++)
{
struct S *s;
s = (struct S *)malloc(sizeof(struct S));
s->i = i;
if( pthread_create(&tids[0][i],NULL,func1,s) )
{
perror("Fail to create thread!");
exit(1);
}
}
for(j=1;j<=n;j++)
pthread_join(tids[0][j],NULL);

/***Show the B[0][j]***************************/
#if 1
printf("Show the A[i]:   ");
for(j=1;j<=n;j++)
printf("%d ",A[j]);
printf("\nShow the B[0][j]:");
for(j=1;j<=n;j++)
printf("%d ",B[0][j]);
printf("\n");
#endif
/***********************************************/

/**Data Flow Forward Algorithm:build the tree***/
for(h=1;h<=log2(n);h++)
{
for(j=1;j<=n/pow(2,h);j++)
{
struct S *s;
s = (struct S*)malloc(sizeof(struct S));
s->i = h;
s->j = j;
if( pthread_create(&tids[h][j],NULL,func2,s) )
{
perror("sth is wrong");
exit(1);
}
}
/*****wait all the threads terminate to continue*****/
for(j=1;j<=n/pow(2,h);j++)
pthread_join(tids[h][j],NULL);
}

#if 1
/******Print Data Flow Forward************/
printf("Data Flow Forward:\n");
for(h=0;h<=log2(n);h++)
{
for(l=1;l<=2*h+1;l++)
printf(" ");
for(j=1;j<=n/pow(2,h);j++)
{
printf("%d",B[h][j]);
for(l=1;l<=1+h;l++)
printf(" ");
}
printf("\n");
}
#endif

/*********************************************/

/***********Data Flow Backward:Traverse*******/
for(h=log2(n);h>=0;h--)
{
for(j=1;j<=n/pow(2,h);j++)
{
struct S *s;
s = (struct S*)malloc(sizeof(struct S));
s->i = h;
s->j = j;
if( pthread_create(&tids[h][j],NULL,func3,s) )
{
perror("sth is wrong");
exit(1);
}
}
for(j=1;j<=n/pow(2,h);j++)
pthread_join(tids[h][j],NULL);
}

#if 1
/********Print Data Flow Backward************/
printf("Data Flow Backward:\n");
for(h=log2(n);h>=0;h--)
{
for(l=1;l<=2*h+1;l++)
printf(" ");
for(j=1;j<=n/pow(2,h);j++)
{
printf("%d ",prefix_sum[h][j]);
for(l=1;l<=h;l++)
printf(" ");
}
printf("\n");
}
#endif
/*********************************************/
}

void resultToFile()
{
int i;
/***********write the result to file**********/
FILE *file	;
file = fopen("/home/leaguenew/parallel_computing/prefixSumResult.txt","wt");
for(i=1;i<=n;i++)
{
fprintf(file,"%-6d",prefix_sum[0][i]);
}
/*********************************************/

}

void *func1(void *arg)//set B[0][j]=A[j]
{
int i;
struct S *p;
p = (struct S*)arg;
i = p->i;
pthread_mutex_lock(&mutex);
B[0][i]=A[i];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}

void *func2(void *args)//B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
{
int h , j;
struct S *p;
p = (struct S*)args;
h = p->i;
j = p->j;
pthread_mutex_lock(&mutex);
B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}

void *func3(void *arg)//get the prefix sum
{
int h,j;
struct S *p;
p = (struct S*)arg;
h = p->i;
j = p->j;
pthread_mutex_lock(&mutex);
if(j==1) prefix_sum[h][1] = B[h][1];
else if (j%2==0)  prefix_sum[h][j] = prefix_sum[h+1][j/2] ;
else prefix_sum[h][j] = prefix_sum[h+1][(j-1)/2] + B[h][j] ;
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}

void generate(int n)
{
FILE *file1;
if( (file1=fopen("/home/leaguenew/parallel_computing/arrayA.txt","wt") )==NULL )
{
perror("fopen");
exit(1);
}

int i,j;
srand( (unsigned)time(NULL) );
for( i = 1 ; i <= n ;i++)
fprintf(file1,"%-8d",rand()%RANGE);
fprintf(file1,"\n");
fclose(file1);
}

void Read(int n)
{
FILE *file1;
if( (file1=fopen("/home/leaguenew/parallel_computing/arrayA.txt","rt") )==NULL )
{
perror("fopen");
exit(1);
}
int i,j;
srand( (unsigned)time(NULL) );
for( i = 1 ; i <= n ;i++)
fscanf(file1,"%d",&A[i]);
fclose(file1);
}


程序的流程是:

Main()
{
Generate();//生成随机数据
Read();//读取随机数据到数组A中
Calculate()
{
Pthread_create(func1);
Pthread_create(func2);
Pthread_create(func3);
}
ResultToFile();//把结果写入文件
}
Func1();//计算:B[0][j] = A[j]
Func2();//计算:B[h][j]=B[h-1][2*j-1]+B[h-1][2*j]
Func3();//计算:C[i][j]


描述:首先通过随机数生成一组随机数,存放到文件中去,然后通过读取文件中的数据到数组A中。然后开始计时,调用计算函数calculate,创建多线程计算:

<span style="font-family:Microsoft YaHei;font-size:14px;">B[0][j] = A[j];
B[h][j]=B[h-1][2*j-1]+B[h-1][2*j];
{
if(j==1) prefix_sum[h][1]= B[h][1];
elseif (j%2==0)  prefix_sum[h][j] =prefix_sum[h+1][j/2] ;
elseprefix_sum[h][j] = prefix_sum[h+1][(j-1)/2] + B[h][j] ;
}</span>


最后输出结果到文件。

结果截图:
n=2:



n=4:



n=8:



n=16:



n=500:



n=600:



n=800:



3.2.Get rid of the 2-D array
这是计算prefix sum的另外一种算法,去除二维数组。
伪代码:

in:A[1..n],out:Prefix[1..n]

for 1<=i<=n do in parallel
Prefix[i] = A[i-1] + A[i];
end in parallel
k = 2:
while(k<n) do
for k+1<=i<=n do in parallel
Prefix[i] = Prefix[i-k] + Prefix[i]
end in parallel
k = k + k;
end while


代码:

#include<stdio.h>
#include<time.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include<memory.h>
#include<stdlib.h>
#include<math.h>

#define RANGE 99
#define M 1000

void generate();
void Read();
void *func1(void *arg);
void *func2(void *arg);
void calculate();
void resultToFile();

int A[M]={0};
int prefix_sum[M]={0};
int temp[M]={0};
int n;
pthread_t tids[1000][1000];
pthread_mutex_t mutex;

struct S
{
int i ;
int j ;
};

int main()
{
printf("please input the scale of input : ");
scanf("%d",&n);
generate();
Read();

clock_t start = clock();
calculate();
resultToFile();
clock_t finish = clock();

printf("The time cost is : %.6f second\n",(double)(finish-start)/CLOCKS_PER_SEC);
return 0 ;
}

void resultToFile()
{
int i;
/***********write the result to file**************/
FILE *file;
file = fopen("/home/leaguenew/parallel_computing/GetRidOf2DArray.txt","wt");
for(i=1;i<=n;i++)
{
fprintf(file,"%-6d",prefix_sum[i]);
}
/*************************************************/
}

void calculate()
{
int h,i,j,k,l ;
/************************************************/
for(i=1;i<=n;i++)
{
struct S* s;
s = (struct S*)malloc(sizeof(struct S));
s->i = i;
if( pthread_create(&tids[0][i],NULL,func1,s))
{
perror("Fail to create thread!");
exit(1);
}
}
for(j=1;j<=n;j++)
pthread_join(tids[0][j],NULL);
/************************************************/

k=2;
while(k<n)
{
for(i=1;i<=n;i++)
temp[i] = prefix_sum[i];
for(i=k+1;i<=n;i++)
{
struct S* s;
s = (struct S*)malloc(sizeof(struct S));
s->i = i;
s->j = k;
if( pthread_create(&tids[k][i],NULL,func2,s))
{
perror("Fail to create thread!");
exit(1);
}
}
for(j=k+1;j<=n;j++)
pthread_join(tids[k][j],NULL);

for(i=1;i<=n;i++)
prefix_sum[i] = temp[i];
k = k + k;
}

#if 0
/************show the result***************/
printf("A: \n");
for(i=1;i<=n;i++)
printf("%d ",A[i]);
printf("\nprefix_sum:\n");
for(i=1;i<=n;i++)
printf("%d ",prefix_sum[i]);
printf("\n");
/******************************************/
#endif

}

void *func1(void *arg)
{
int i ;
struct S *p;
p = (struct S*)arg;
i = p->i;
pthread_mutex_lock(&mutex);
if(i==1)  prefix_sum[i] = A[i];
else	  prefix_sum[i] = A[i-1] + A[i];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}

void *func2(void *arg)
{
int i ,k ;
struct S *p;
p = (struct S*)arg;
i = p->i;
k = p->j;
pthread_mutex_lock(&mutex);
temp[i] = prefix_sum[i-k] + prefix_sum[i];
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}

void generate()
{
FILE *file;
if( (file = fopen("/home/leaguenew/parallel_computing/arrayB.txt","wt"))==NULL )
{
perror("fopen");
exit(1);
}
int i , j ;
srand((unsigned)time(NULL) );
for( i=1;i<=n;i++ )
fprintf(file,"%-6d",rand()%RANGE);
fprintf(file , "\n");
fclose(file);
}

void Read()
{
FILE *file;
if( (file = fopen("/home/leaguenew/parallel_computing/arrayB.txt","rt"))==NULL )
{
perror("fopen");
exit(1);
}
int i , j ;
srand((unsigned)time(NULL) );
for( i=1;i<=n;i++ )
fscanf(file,"%d",&A[i]);
fclose(file);
}

程序的流程是:

Main()
{
Generate();//生成随机数据写入文件
Read();//读取文件中的数据到数组中
Calculate()
{
Pthread_create(func1);
Pthread_create(func2);
}
ResultToFile();//把结果写入文件
}
Func1();//计算:prefix_sum[i] = A[i-1] + A[i];
Func2();//计算:prefix[i] = prefix_sum[i-k] + prefix_sum[i];<strong>

结果截图:

n=4:



n=6:



n=16:





四、总结

a.pthread的链接库问题:编译的时候要添加–lpthread。

b.Scanf参数没有添加&造成段错误。

c.通过打印调试调试程序,及打印相关的输出语句来判断哪个位置发生错误。

d.sleep()不占计算clock()时间。

f.在UNIX和LINUX系统中在要命令中加入-lm这个功能是把数学库与源程序链接在一起。
g.Pthread_join的理解,等待所有创建的线程结束后继续进行。

h.通过动态创建结构体来给pthread_create()传递多个参数。

转载注明:http://blog.csdn.net/lavorange/article/details/39103609
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息