您的位置:首页 > 其它

利用MPI求解全源最短路径的并行算法实现

2008-11-27 19:54 579 查看

利用MPI求解全源最短路径的并行算法实现

一、     
问题描述

随着计算机的出现和发展,对图论的研究不断深入。最短路径问题作为图论中的一个经典问题,被应用于众多领域。在网络通信领域,信息路径选择问题也与最短路径问题密切相关。解决最短路径问题的算法在诸多工程领域都有较强的实用价值。一般来说最短路径问题分为单源最短路径问题和全源最短路径,
公认的比较好的算法Dijkstra算法比较适合于单源最短路径,而Floyd算法适合用于求解全源最短路径。

本文所关注的所有点对之间最短路径问题也就是全源最短路径问题,就是要求解一个有向带权图中所有顶点之间的最短路径,因此选用Floyd算法作为算法基础。对《一种实用的所有点对之间最短路径并行算法》认真分析研读后,将所述并行算法予以实现。

二、     
串行Floyd算法描述及分析

Floyd算法,又称传递闭包方法,其实质就是邻接矩阵自乘n次,算法的时间复杂度为

。下面就是扩充了路径矩阵的Floyd串行算法的描述:

Floyd[/i]串行算法[/i]

输入:[/i]顶点数目n; [/i]图的邻接矩阵Array; [/i]初始化后的路径矩阵Path[/i]

计算: [/i]

/*Serialed
Floyd Algorithm*/


void
Floyd_Proc(unsigned int **Array, unsigned int **Path){


       int i,j,k;

       for ( k = 0; k < Nodes; k ++)

              for ( i = 0; i < Nodes; i ++)

                     for ( j = 0; j < Nodes;
j ++)


                            if ( Array[i][j]
> ( Array[i][k] + Array[k][j]) )


                            {

                                   Array[i][j] =
Array[i][k] + Array[k][j];


                                   Path [i][j] =
Path[i][k];


                            }

}

输出:[/i]变换后的矩阵Array[/i]和Path[/i]

邻接矩阵是一个n ×n的矩阵Array,在具体的实现中用非常大的值(如所能表示的最大整数) 来表示不存在的边。为了表示求得的任意两个顶点最短路径具体经过途径中各个顶点的顺序,构造路径矩阵Path,来实现对最短路径途径中各个顶点的追踪和记录。顶点路径矩阵path

的初始化为path[j] = j(0≤i, j≤n - 1)。对于大规模的图求解所有点对之间最短路径,Floyd算法的串行算法

这样的时间复杂度要花费较大的运算时间。

三、     
并行Floyd算法描述

在并行算法的设计中,问题的分解通常可以有两种形式:一种是域分解,即将问题分解为若干个较小的问题区域,然后分别对其求解;另一种是功能分解,即将问题按功能分解为若干个子问题,而后对各个子问题并行求解。对于所有点对之间最短路径问题,在采用了Floyd的算法思想后,选择前者比较方便。

在二维划分的模式中可以观察到, 更新各个处理机的任务数据元素其实只需要array矩阵的第k行中的长度为

的一段。同理,也只需要array矩阵第k列的长度为

的一段和path矩阵第k列的长度为

的一段(正交)。

四、     
并行Floyd算法实现

为了方便起见,约定处理机数为p = p×p,数据规模n,p可以整除n。如果有p台处理机,那么就划分p个任务,也保证p台处理机所负担的任务数据量相同。下面是二维均匀块分配方法分解任务的Floyd并行算法。

[i]二维任务划分方法的Floyd
并行算法

输入:顶点数目n; 图的邻接矩阵array; 初始化后的路径矩阵path到主机(0号进程)

计算:

(1) 如果是主机(0号进程) ,根据数据规模n和处理机数目p来划分任务,按照棋盘式均匀的分解的方法,发送array和path的数据块到相应的处理机,各处理机在本地存储为jobarra y和jobpath

(2) for ( k = 0; k < n; k ++) 循环开始。

(3) 各个处理器计算当前需要广播的第k行array所在的处理机

号。如果在本地, 那么从jobarray矩阵中选取出相应的一行数据,存储在temparrayA [],并向同一列处理机广播。各个处理器计算当前需要广播的第k列array和path所在的处理机号。如果在本地,那么从jobarray和jobpath矩阵中选取出相应的一列数据,分别存储在tem parrayB []和temppath [ ],并向同一行处理机广播。

(4)各个处理器执行并行计算:

for ( i = 0; i <

; i ++)

       for ( j = 0; j <

; j ++)


           if
( jobarray[i][j] > ( temparrayB[i] + temparrayA[j] ) )


           {


                  jobarray[i][j]
= temparrayB[i] + temparrayA [j];


                  path[i][j]
= temppath[i];


           }


(5) 跳(2) 直至k的循环结束转(6) ;

(6) 各个处理器返回运算结果jobarray和jobpath两个矩阵到主机(0号进程),主机在将其归并到本地的array和path矩阵。释放空间。

输出:变换后的矩阵array和path

五、     
并行Floyd算法复杂度分析

 

在上述算法中,第(4)步是用来更新矩阵array和path的判断和赋值语句,
时间复杂度为

;在中间循环,也就是第(3) 步,从一个处理器构造三个长度为

的消息到另外一个处理器的时间复杂度为

。由于广播到

个处理器需要

个消息传递步骤, 所以每个循环迭代的广播操作总的时间复杂度为

。并行算法的最外层循环,也就是第(2) 步,执行了n次迭代,因此并行算法总的时间复杂度为:



根据串行运行时间复杂度为

, 得到加速比和效率如下:





六、     
串行/并行Floyd算法性能分析

采用MPI实现后的程序,在测试时采用了32x32的矩阵进行分析,以便尽可能的使其降低通讯时延在总运行时间中的比例。分别在两台配置均为Pentium IV 3.0G,内存为768MHz的PC机上进行测试。以下所得数据,运行时间单位均为秒(s)。

串行Floyd算法运行时间:wall clock time = 0.169347

并行Floyd算法运行时间:


可见在相同的、较小的问题规模下,并行算法未必会带来性能大的提升。相反,由于通信开销增大,多机下,程序运行时延增大,性能可能会下降。但在更大规模的问题上并行算法效率上更有优势。

七、     
小结

针对所有点对之间最短路径问题的Floyd扩充路径并行算法, 采用MPI将该算法实现,并在多处理机下进行了性能上的简单测试,实验结果表明,该并行算法降低了计算所有点对之间最短路径问题的计算时间,提高了计算效率,具有良好的并行加速比。

八、     
参考文献

1.         周益民等, 一种实用的所有点对之间最短路径并行算法.北京. 计算机应用, 2005.12.2.         谭国真,隋春丽, PC 机群环境下最短路径并行算法的研究, 小型微型计算机系统, 2001.11.

九、     
部分源码      

注:程序部分系原创,如有转载请注明作者kingonehappy

及出处http://blog.csdn.net/kingonehappy

    1.串行程序核心代码

/*Serialed Floyd Algorithm*/
void Floyd_Proc(unsigned int **Array, unsigned int **Path)
{
    int i,j,k;
    for ( k = 0; k < Nodes; k ++)
      for ( i = 0; i < Nodes; i ++)
         for ( j = 0; j < Nodes; j ++)
        if ( Array[i][j] > ( Array[i][k] + Array[k][j]) )
        {
            Array[i][j] = Array[i][k] + Array[k][j];
            Path [i][j] = Path[i][k];
        }
}

2.并行程序核心代码

        pNArray tempblock = NULL;
    unsigned int ** TempA, ** TempB;
    unsigned int ** TempEndBuff;
    NArray JobArray,JobPath;
    int dest,tag,i,j,k , m,n;
    MPI_Status status;
    double startwtime = 0.0, endwtime;

    MPI_Datatype My_MPI_JobArr , My_MPI_JobLine;

    /*MPI Initial*/
    MPI_Init(&argc,&argv);
    startwtime = MPI_Wtime();
    MPI_Comm_size(MPI_COMM_WORLD,&NumProcs);
    MPI_Comm_rank(MPI_COMM_WORLD,&MyID);
    MPI_Get_processor_name(processor_name,&namelen);

    /*Wrong Processor numbers,exit*/
    if( !CheckProcessor(NumProcs) )
    {
        AppErrorExit(Err_APP_PROCS);
    }
    MPI_Type_contiguous(Sides*Sides,MPI_UNSIGNED,&My_MPI_JobArr);
    MPI_Type_commit(&My_MPI_JobArr);
    MPI_Type_contiguous(Sides,MPI_UNSIGNED,&My_MPI_JobLine);
    MPI_Type_commit(&My_MPI_JobLine);

    Row = MyID/SqrtProcs;
    Column = MyID%SqrtProcs;

    tag = 1;
    if(MyID == 0){
        
        ReadFile();
        Floyd_InitPath();
        
        for( i = 0 ; i < NumProcs ; i++ )
        {
            tempblock = GetJobArray(i);
            ArrayConvertToBuff(Sides,Sides,tempblock->array);
            MPI_Send(Buff, 1, My_MPI_JobArr, i, tag, MPI_COMM_WORLD);
            NArray_Free(tempblock);
        }

    }
    //printf("%d/t%d/t%d/t%d/t/n",Sides,NumProcs,SqrtProcs,Nodes);
    JobArray.array = CreateArray(Sides, Sides);
    MPI_Recv(Buff ,1 ,My_MPI_JobArr , 0 , tag ,MPI_COMM_WORLD, &status);
    printf("+++++++++++++++  %u   +++++++++++++++++/n",MyID);

    BuffConvertToArray(Sides,Sides,JobArray.array);
    //Array_Print(Sides,Sides,JobArray.array);
    
    
    TempA = CreateArray(1, Sides);
    TempB = CreateArray(1, Sides);

    for(k = 0 ; k < Nodes ; k ++)
    //for(k = 12 ; k < 13 ; k ++)
    {
        /*发送第k行数据*/
        int seq = k/Sides;
        
        if( seq == MyID/SqrtProcs )
        {
            i = k%Sides;
            for(j = 0 ; j < Sides ; j++)
            {
                TempA[0][j] = JobArray.array[i][j];
            }
            ArrayConvertToBuff(1,Sides,TempA);
            for( m  = 0 ; m < SqrtProcs ; m ++)
            {
                //printf("///t%d/t///n",m*SqrtProcs+Column);
                MPI_Send(Buff, 1, My_MPI_JobLine, m*SqrtProcs+Column, MyID , MPI_COMM_WORLD);
            }
        }
        MPI_Recv(Buff ,1 ,My_MPI_JobLine , seq*SqrtProcs+Column , seq*SqrtProcs+Column,MPI_COMM_WORLD, &status);
        BuffConvertToArray(1,Sides,TempA);
        //Array_Print(1,Sides,TempA);

        /*发送第k列数据*/
        if( seq == MyID%SqrtProcs)
        {
            i = k%Sides;
            for(j = 0 ; j < Sides ; j++)
            {
                TempB[0][j] = JobArray.array[j][i];
            }
            ArrayConvertToBuff(1,Sides,TempB);
            for( m  = 0 ; m < SqrtProcs ; m ++)
            {
                //printf("///t%d/t///n",Row*SqrtProcs+m);
                MPI_Send(Buff, 1, My_MPI_JobLine, Row*SqrtProcs+m, MyID , MPI_COMM_WORLD);
            }
        }
        MPI_Recv(Buff ,1 ,My_MPI_JobLine , Row*SqrtProcs+seq , Row*SqrtProcs+seq,MPI_COMM_WORLD, &status);
        BuffConvertToArray(1,Sides,TempB);
        //Array_Print(1,Sides,TempB);

        /*各个块内进行运算*/
        for(i = 0 ; i <Sides ; i++ )
        {
            for(j = 0 ; j <Sides ; j++)
            {
                if(JobArray.array[i][j]> (TempB[0][i] + TempA[0][j]))
                    JobArray.array[i][j] =  TempB[0][i] + TempA[0][j];
                if(JobArray.array[i][j]>METRY_MAX)
                    JobArray.array[i][j] = METRY_MAX;
            }
        }
    }

    ArrayConvertToBuff(Sides,Sides,JobArray.array);
    TempEndBuff = (unsigned int*)malloc(Nodes*Nodes*sizeof(unsigned int));
    MPI_Gather(Buff, Sides*Sides, MPI_UNSIGNED,
            &TempEndBuff[MyID*Sides*Sides], Sides*Sides, MPI_UNSIGNED,
                0, MPI_COMM_WORLD);

    if(MyID == 0)
    {

            m = 0;
            for( k = 0 ; k < NumProcs ; k++)
                for(i = 0 ; i < Sides ; i++)
                {
                    for(j = 0 ; j < Sides ; j++)
                    {
                        Array[i+k/SqrtProcs*Sides][j+k%SqrtProcs*Sides] = TempEndBuff[m++]; 
                    }
                }
            printf("////////////////////n");
            Array_Print(Nodes,Nodes,Array);
                   
    }
    //printf("%d/n",Sides);
    free(TempEndBuff);

    FreeArray(1,Sides,TempA);
    FreeArray(1,Sides,TempB);
    FreeArray(Sides,Sides,JobArray.array);

    if(MyID == 0)
    {
        FreeArray(Nodes , Nodes, Path);
        FreeArray(Nodes , Nodes, Array);
        endwtime = MPI_Wtime();
        printf("wall clock time = %f/n", endwtime-startwtime);
    }

    fprintf(stderr,"Process %d on %s/n",
        MyID, processor_name);
    MPI_Type_free(&My_MPI_JobArr);
    MPI_Type_free(&My_MPI_JobLine);

    /*MPI finalize*/
    MPI_Finalize();

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: