您的位置:首页 > 其它

多个线程barrier操作--条件变量

2014-03-26 22:54 274 查看
在多个线程协同工作时,有一种应用为,当多个线程都完成工作之后,才进行下一阶段的工作,最典型的为hashjoin算法中的build阶段并行化。

在build阶段,多个线程同时读取一个relation建立hash表,在此我们不考虑线程之间写hash表冲突造成的性能降低。N个线程读relation然后build hash表,N个线程都完成工作之后,才能开始下一步的probe阶段,否则probe阶段“见到”的不是全局的数据。因此通过条件变量来控制N个线程之间的同步可以觉得这个问题。

本程序中采用的是linux的pthreads函数库,在编译程序时,需要加上-lpthread

以下为一个简单的三个线程都到达之后才打开barrier的测试

Barrier类:

/*
* Barrier.cpp
*
*      Author: casa
*/

#include "Barrier.h"

Barrier::Barrier(int nThreads) {
m_nThreads = nThreads;
int ret;
ret = pthread_mutex_init(&m_l_SyncLock, NULL);
if(ret!=0)
printf("初始化互斥量失败!\n");
printf("初始化barrier,所有的线程到达barrier才打开!\n");
ret = pthread_cond_init(&m_cv_SyncCV, NULL);
if(ret!=0)
printf("初始化条件变量失败!\n");
m_nSyncCount = 0;
}

Barrier::~Barrier() {
pthread_mutex_destroy(&m_l_SyncLock);
pthread_cond_destroy(&m_cv_SyncCV);
}

void Barrier::Arrive() {
pthread_mutex_lock(&m_l_SyncLock);
m_nSyncCount++;
if(m_nSyncCount == m_nThreads) {
printf("最后一个线程到达,开始broadcast!\n");
pthread_cond_broadcast(&m_cv_SyncCV);
m_nSyncCount = 0;
}
else {
cout<<"等待达到线程个数以致栅栏打开!"<<endl;
pthread_cond_wait(&m_cv_SyncCV, &m_l_SyncLock);
}
pthread_mutex_unlock(&m_l_SyncLock);
}


/*
* Barrier.h
*
*      Author: casa
*/

#ifndef BARRIER_H_
#define BARRIER_H_

#include <iostream>
using namespace std;

#include <pthread.h>
#include <stdio.h>

class Barrier {
public:
Barrier(int nThreads);
virtual ~Barrier();

void Arrive();

private:
/*线程个数*/
int             m_nThreads;
/*互斥量*/
pthread_mutex_t m_l_SyncLock;
/*条件变量,用条件变量的意义是,当达到线程个数这个条件时barrier打开*/
pthread_cond_t  m_cv_SyncCV;
/*当前线程计数*/
volatile int    m_nSyncCount;
};

#endif /* BARRIER_H_ */


Barrier测试类:

/*
* BarrierTest.cpp
*
*      Author: casa
*/

#include "BarrierTest.h"

BarrierTest::BarrierTest(int nThreads)
:nThreads(nThreads){

}

BarrierTest::~BarrierTest() {

}

void * BarrierTest::func(void *args){
/*静态函数调用成员变量通过this指针*/
BarrierTest *bt=(BarrierTest *)(args);
bt->barrier->Arrive();
return (void *)(0);
}

void BarrierTest::funcAdd(){
pthread_t p1;
pthread_create(&p1,NULL,func,this);
cout<<"创建线程: "<<p1<<endl;
ps.push_back(p1);
}

void BarrierTest::init(){
/*这里的barrier是含有三个线程到达即可以pass过barrier的*/
barrier=new Barrier(nThreads);
}


/*
* BarrierTest.h
*
*      Author: casa
*/

#ifndef BARRIERTEST_H_
#define BARRIERTEST_H_

#include "Barrier.h"

#include <iostream>
#include <vector>
using namespace std;

#include <pthread.h>

class BarrierTest {
public:
BarrierTest(int nThreads);
virtual ~BarrierTest();

/*添加一个func,也是在添加一个线程*/
void funcAdd();
/*初始化barrier,这个barrier中含有三个线程*/
void init();
/* 静态函数调用成员变量,要传this指针
* 静态函数被调用可以直接进行
* */
static void * func(void *args);

private:
Barrier *barrier;
int nThreads;

public:
vector<pthread_t> ps;
};
#endif /* BARRIERTEST_H_ */

测试:

int test_barrier(){
BarrierTest *bt=new BarrierTest(3);
bt->init();
bt->funcAdd();
bt->funcAdd();
bt->funcAdd();
for(unsigned i=0;i<(bt->ps).size();i++){
pthread_join((bt->ps)[i],NULL);
}
return 0;
}
(本文完)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: