您的位置:首页 > 运维架构 > Linux

Linux平台用C++实现信号量,同步线程

2012-07-17 10:46 435 查看
使用Linux平台上现有的信号量sem_t相关的一组API,可以方便地进行线程同步。现在用pthread_mutex_t和pthread_cond_t相关的一组API实现信号量机制。这组API包括:pthread_mutex_init,pthread_cond_init,pthread_mutex_lock,pthread_cond_signal,pthread_mutex_unlock,pthread_cond_wait,pthread_cond_timedwait,pthread_cond_destroy和pthread_mutex_destroy,可以在linux.com">http://www.9linux.com找到各API的说明。下边,是封装的信号量类,以及测试代码。使用VS2005编辑,在虚拟机
Fedora 13中编译,测试通过。

MySemaphore.h

view plain

#ifndef Semaphore_Header

#define Semaphore_Header

#include <iostream>

#include <pthread.h>

#include <errno.h>

#include <assert.h>

using namespace std;

//------------------------------------------------------------------------

class CSemaphoreImpl

{

protected:

    CSemaphoreImpl(int n, int max);

    ~CSemaphoreImpl();

    void SetImpl();

    void WaitImpl();

    bool WaitImpl(long lMilliseconds);

private:

    volatile int    m_n;

    int             m_max;

    pthread_mutex_t m_mutex;

    pthread_cond_t  m_cond;

};

inline void CSemaphoreImpl::SetImpl()

{

    if (pthread_mutex_lock(&m_mutex))

        cout<<"cannot signal semaphore (lock)"<<endl;

    if (m_n < m_max)

    {

        ++m_n;

    }

    else

    {

        pthread_mutex_unlock(&m_mutex);

        cout<<"cannot signal semaphore: count would exceed maximum"<<" m_n = "<<m_n<<"m_max = "<<m_max<<endl;

    }

    //重新开始等待m_cond的线程,可被调度

    if (pthread_cond_signal(&m_cond))

    {

        pthread_mutex_unlock(&m_mutex);

        cout<<"cannot signal semaphore"<<endl;

    }

    pthread_mutex_unlock(&m_mutex);

}

//------------------------------------------------------------------------

/*

信号量同步机制

信号量提供一个计数值,可以进行原子操作。V 将计数值加1,使得

等待该信号量的线程可以被调用(调用Set()),P 将计数值减1,使

当前线程被挂起,进行睡眠(调用Wait())。

当信号量的计数值被初始化为0时,调用P操作,将挂起当前线程。

当信号量被激活,即调用V操作后,被挂起的线程就有机会被重新调度了。

*/

class CMySemaphore: private CSemaphoreImpl

{

public:

    /*

     创建一个信号量,信号量计数值当前值为参数n,最大值为max。

     如果只有n,则n必须大于0;如果同时有n和max,则n必须不小

     于0,且不大于max

    */

    CMySemaphore(int n);

    CMySemaphore(int n, int max);

    /*

     销毁一个信号量

    */

    ~CMySemaphore();

    /*

     对信号量计数值做加1动作,信号量变为有信号状态,使得

     另一个等待该信号量的线程可以被调度

    */

    void Set();

    /*

     对信号量计数值做减1动作,信号量变为无信号状态。若

     计数值变得大于0时,信号量才会变为有信号状态。

    */

    void Wait();

    /*

     在给定的时间间隔里等待信号量变为有信号状态,若成功,

     则将计数值减1,否则将发生超时。

    */

    void Wait(long lMilliseconds);

    /*

     在给定的时间间隔里等待信号量变为有信号状态,若成功,

     则将计数值减1,返回true;否则返回false。

    */

    bool TryWait(long lMilliseconds);

private:

    CMySemaphore();

    CMySemaphore(const CMySemaphore&);

    CMySemaphore& operator = (const CMySemaphore&);

};

inline void CMySemaphore::Set()

{

    SetImpl();

}

inline void CMySemaphore::Wait()

{

    WaitImpl();

}

inline void CMySemaphore::Wait(long lMilliseconds)

{

    if (!WaitImpl(lMilliseconds))

        cout<<"time out"<<endl;

}

inline bool CMySemaphore::TryWait(long lMilliseconds)

{

    return WaitImpl(lMilliseconds);

}

#endif

MySemaphore.cpp

view plain

#include "MySemaphore.h"

#include <sys/time.h>

CSemaphoreImpl::CSemaphoreImpl(int n, int max): m_n(n), m_max(max)

{

    assert (n >= 0 && max > 0 && n <= max);

    if (pthread_mutex_init(&m_mutex, NULL))

        cout<<"cannot create semaphore (mutex)"<<endl;

    if (pthread_cond_init(&m_cond, NULL))

        cout<<"cannot create semaphore (condition)"<<endl;

}

CSemaphoreImpl::~CSemaphoreImpl()

{

    pthread_cond_destroy(&m_cond);

    pthread_mutex_destroy(&m_mutex);

}

void CSemaphoreImpl::WaitImpl()

{

    if (pthread_mutex_lock(&m_mutex))

        cout<<"wait for semaphore failed (lock)"<<endl;

    while (m_n < 1)

    {

        //对互斥体进行原子的解锁工作,然后等待状态信号

        if (pthread_cond_wait(&m_cond, &m_mutex))

        {

            pthread_mutex_unlock(&m_mutex);

            cout<<"wait for semaphore failed"<<endl;

        }

    }

    --m_n;

    pthread_mutex_unlock(&m_mutex);

}

bool CSemaphoreImpl::WaitImpl(long lMilliseconds)

{

    int rc = 0;

    struct timespec abstime;

    struct timeval tv;

    gettimeofday(&tv, NULL);

    abstime.tv_sec  = tv.tv_sec + lMilliseconds / 1000;

    abstime.tv_nsec = tv.tv_usec*1000 + (lMilliseconds % 1000)*1000000;

    if (abstime.tv_nsec >= 1000000000)

    {

        abstime.tv_nsec -= 1000000000;

        abstime.tv_sec++;

    }

    if (pthread_mutex_lock(&m_mutex) != 0)

        cout<<"wait for semaphore failed (lock)"<<endl;

    while (m_n < 1)

    {

        //自动释放互斥体并且等待m_cond状态,并且限制了最大的等待时间

        if ((rc = pthread_cond_timedwait(&m_cond, &m_mutex, &abstime)))

        {

            if (rc == ETIMEDOUT) break;

            pthread_mutex_unlock(&m_mutex);

            cout<<"cannot wait for semaphore"<<endl;

        }

    }

    if (rc == 0) --m_n;

    pthread_mutex_unlock(&m_mutex);

    return rc == 0;

}

CMySemaphore::CMySemaphore(int n): CSemaphoreImpl(n, n)

{

}

CMySemaphore::CMySemaphore(int n, int max): CSemaphoreImpl(n, max)

{

}

CMySemaphore::~CMySemaphore()

{

}

    下边是测试代码

view plain

// pthread_semaphore.cpp : 定义控制台应用程序的入口点。

//

#include "MySemaphore.h"

//创建一个信号量,其计数值当前值为0,最大值为3

CMySemaphore g_MySem(0, 3);

//线程函数

void * StartThread(void *pParam)

{

    //休眠1秒,确保主线程函数main中

    //创建工作线程下一句g_MySem.Set();先执行

    sleep(1);

    g_MySem.Wait(); //信号量计数值减1

    cout<<"Do print StartThread"<<endl;

    return (void *)0;

}

int main(int argc, char* argv[])

{

    pthread_t thread;

    pthread_attr_t attr;

    assert ( !g_MySem.TryWait(10) );

    g_MySem.Set(); //信号量计数值加1

    g_MySem.Wait(); //信号量计数值减1

    try

    {

        g_MySem.Wait(100);

        cout<<"must timeout"<<endl; //此处发生超时

    }

    catch (...)

    {

        cout<<"wrong exception"<<endl;

    }

    g_MySem.Set();

    g_MySem.Set();

    assert ( g_MySem.TryWait(0) );

    g_MySem.Wait();

    assert ( !g_MySem.TryWait(10) );

    //创建工作线程

    pthread_attr_init(&attr);

    pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE);

    if (pthread_create(&thread,&attr, StartThread,NULL) == -1)

    {

        cout<<"StartThread: create failed"<<endl;

    }

    g_MySem.Set();

    //等待线程结束

    void *result;

    pthread_join(thread,&result);

    assert ( !g_MySem.TryWait(10) ); //若将断言中的 ! 去掉,则会发生断言错误

    //关闭线程句柄,释放资源

    pthread_attr_destroy(&attr);

    int iWait;

    cin>>iWait;

    return 0;

}

    编译,运行。可以看到,与Win32平台上的测试结果相同

    由此可见,信号量机制很关键的一点就是计数值 m_n
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息