您的位置:首页 > 其它

分组聚集的K-means算法

2011-11-11 12:34 281 查看
在许多实际应用中,需要对许多数据点进行分组,划分成一个个簇(cluster),并计算出每一个簇的中心。这就是著名的k-means算法

k-means算法的输入是N个d维数据点:x_1, ..., x_N,以及需要划分的簇的数目k。算法运行的结果是每个簇的中心点m_1, ..., m_k,也可以输出每个簇中有哪些数据点。算法先通过随机,或启发式搜索,确定初始的中心点位置。再通过如下两个步骤的交替,进行数据的计算:

数据点划分。根据当前的k个中心点,确定每个中心点所在的簇中的数据点有哪些。即根据当前的中心点的坐标,计算每个点到这些中心点的距离,选取最短的距离相应的簇为该点所在的簇;

重新计算中心点:根据每个簇中所有点的坐标,求平均值得到这个簇的新的中心。

此算法不一定收敛,因此通常会选取不同的初始点多次运行算法。

k-means算法可以并行化。使用主从模式,由一个节点负责数据的调度和划分,其他各节点负责本地的中心点的计算,并把结果发送给主节点。因此,可以使用MPI并行编程来实现。大致的过程如下:

节点P_0把数据点集划分成p个块:D_1, ...,D_p,分配给P_1, ...,P_p这个p个节点;

P_0产生初始的k个中心点(m_1, ..., m_k),并广播给所有的节点;

节点P_r计算块D_r中每个点到k个中心点(m_1, ..., m_k)的距离,选取最小值对应的中心点为该点所在的簇。

节点P_r分别计算出本地数据点集D_r在每个簇中的总和以及数目,发送给P_0;

P_0收到所有这些数据后,计算出新的中心点(m_1, ..., m_k),广播给所有节点;

重复3-5步直到收敛。

附上源程序:

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>

#define N 10000    // Number of data points
#define D 2        // Dimension of data
#define K 3        // Number of clusters
#define M 1000    // Number of iterations

typedef double DataPoint[D];

// Generate a random point, which is in [0, 1] ^ D
void GenRandomPoint(DataPoint point)
{
for(int i = 0; i < D; i++)
point[i] = (double)rand()/RAND_MAX;
}

// Reset the point to 0 ^ D
void ResetPoint(DataPoint point)
{
for(int i = 0; i < D; i++)
point[i] = 0.0;
}

// Print the point
void PrintPoint(DataPoint point)
{
for(int i = 0; i < D; i++)
printf("%g ", point[i]);
putchar('\n');
}

// Calculate the distance between the two points
double Distance(DataPoint p1, DataPoint p2)
{
double s = 0.0;
for(int i = 0; i < D; i++)
s += (p1[i] - p2[i]) * (p1[i] - p2[i]);
return sqrt(s);
}

// Find the nearest centroid index which p belongs to
int GetNearestIndex(DataPoint centroid[K], DataPoint p)
{
double min = 1e10;
int min_i;
for(int i = 0; i < K; i++)
{
double dist = Distance(centroid[i], p);
if(dist < min)
{
min = dist;
min_i = i;
}
}
return min_i;
}

// Sum the point p to sum
void AddPointToSum(DataPoint sum, DataPoint p)
{
for(int i = 0; i < D; i++)
sum[i] += p[i];
}

int main(int argc, char *argv[])
{
int rank, size;
MPI_Datatype MPI_DATAPOINT;
MPI_Status status;

MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

// Test whether size >= 2
if(size < 2)
{
fprintf(stderr, "Should be at least two processors. Program terminate.\n");
MPI_Finalize();
return -1;
}

MPI_Type_contiguous(D, MPI_DOUBLE, &MPI_DATAPOINT);
MPI_Type_commit(&MPI_DATAPOINT);

DataPoint *full_data;
DataPoint *data;
DataPoint centroid[K];

DataPoint sum[K];
int number[K];

// Generate the points, and allocate the memory
if(rank == 0)
{
srand((unsigned)time(NULL));

data = NULL;
full_data = (DataPoint *)malloc(sizeof(DataPoint) * N);

for(int i = 0; i < N; i++)
GenRandomPoint(full_data[i]);

for(int i = 0; i < K; i++)
GenRandomPoint(centroid[i]);
}
else
{
data = (DataPoint *)malloc(sizeof(DataPoint) * (N  + size - 2) / (size - 1));
full_data = NULL;
}

// Broadcast the initial centroids
MPI_Bcast(centroid, K, MPI_DATAPOINT, 0, MPI_COMM_WORLD);

// Scatter the data points to each node
int *sendcounts = (int *)malloc(sizeof(int) * size);
int *displs = (int *)malloc(sizeof(int) * size);
sendcounts[0] = displs[0] = 0;

int remain = N;
for(int i = 1; i < size; i++)
{
sendcounts[i] = (N  + size - 2) / (size - 1);
if(sendcounts[i] > remain)
sendcounts[i] = remain;
displs[i] = displs[i-1] + sendcounts[i-1];
remain -= sendcounts[i];
}
MPI_Scatterv(full_data, sendcounts, displs, MPI_DATAPOINT, data, sendcounts[rank], MPI_DATAPOINT, 0, MPI_COMM_WORLD);

// Begin of the iteration
for(int iteration = 0; iteration < M; iteration++)
{
if(rank == 0)
{
DataPoint sum[K];
int number[K];
int total_number[K];
int remain = N;

// Other nodes are calculating their local centroids...

for(int i = 0; i < K; i++)
{
total_number[i] = 0;
ResetPoint(centroid[i]);
}

for(int i = 1; i < size; i++)
{
if(sendcounts[i] > 0)
{
MPI_Recv(number, K, MPI_INT, i, iteration * 2, MPI_COMM_WORLD, &status);
MPI_Recv(sum, K, MPI_DATAPOINT, i, iteration * 2 + 1, MPI_COMM_WORLD, &status);

for(int j = 0; j < K; j++)
{
total_number[j] += number[j];
AddPointToSum(centroid[j], sum[j]);
}
}
}

for(int i = 0; i < K; i++)
for(int j = 0; j < D; j++)
centroid[i][j] /= total_number[i];
}
else
{
for(int i = 0; i < K; i++)
{
ResetPoint(sum[i]);
number[i] = 0;
}

for(int i = 0; i < sendcounts[rank]; i++)
{
int index = GetNearestIndex(centroid, data[i]);
AddPointToSum(sum[index], data[i]);
number[index] ++;
}

MPI_Send(number, K, MPI_INT, 0, iteration * 2, MPI_COMM_WORLD);
MPI_Send(sum, K, MPI_DATAPOINT, 0, iteration * 2 + 1, MPI_COMM_WORLD);
}

// End of the iteration, broadcast the new centroids
MPI_Bcast(centroid, K, MPI_DATAPOINT, 0, MPI_COMM_WORLD);

}

if(rank == 0)
{
printf("K-means: \n");
for(int i = 0; i < K; i++)
PrintPoint(centroid[i]);
system("pause");
}

if(rank == 0)
free(full_data);
else
free(data);

MPI_Type_free(&MPI_DATAPOINT);

MPI_Finalize();
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: