您的位置:首页 > 编程语言 > C语言/C++

Windows下C++实现多线程之线程同步

2017-08-15 17:23 387 查看
转自: http://blog.csdn.net/leonwei/article/details/8956632

上一篇文章windows编程 使用C++实现多线程类仅仅是介绍了怎样用类来实现多线程,这篇文章则重点介绍多线程中数据同步的问题。好了,废话不多说,进入主题。

    问题场景:这里我们假设有这样一个工作流水线(CWorkPipeline),它不断的生成一个SquareInfo的对象,这个对象包含x和y坐标,同时包括一个未得到结果的平方和(squareSum),这些流水线类似于现实世界的工厂不断产出产品(SquareInfo对象),很多个这样的流水线把产生的SquareInfo对象汇集给处理中心。

    处理中心(CProcessCenter)是一个多线程类,它不断接收流水线上的数据,并计算每个SquareInfo对象的squareSum,同时把对应的信息打印出来。

    我们看CWorkPipeline的定义:

[cpp]
view plain
copy

print?

/************************************************************************/  
/* FileName: WorkPipeline.h 
 * Date: 2015-5-13 
 * Author: huangtianba 
 * Description: 模拟工作流水线类 
 */  
/************************************************************************/  
  
#pragma once  
  
#include <Windows.h>  
#include <functional>  
  
struct SquareInfo  {  
    int x;  
    int y;  
    int squareSum;  
};  
  
typedef std::tr1::function<void (const SquareInfo &squareInfo)> FoundItemFn;  
  
class CWorkPipeline  
{  
public:  
    CWorkPipeline();  
    ~CWorkPipeline();  
  
  
    bool Init(const FoundItemFn &foundItemFn);  
    bool UnInit();  
  
    static DWORD CALLBACK WorkThread(LPVOID lpParam);  
    DWORD WorkProc();  
  
private:  
    HANDLE m_hThread;  
    FoundItemFn m_foundItemFn;  
};  

/************************************************************************/
/* FileName: WorkPipeline.h
* Date: 2015-5-13
* Author: huangtianba
* Description: 模拟工作流水线类
*/
/************************************************************************/

#pragma once

#include <Windows.h>
#include <functional>

struct SquareInfo
{
int x;
int y;
int squareSum;
};

typedef std::tr1::function<void (const SquareInfo &squareInfo)> FoundItemFn;

class CWorkPipeline
{
public:
CWorkPipeline();
~CWorkPipeline();

bool Init(const FoundItemFn &foundItemFn);
bool UnInit();

static DWORD CALLBACK WorkThread(LPVOID lpParam);
DWORD WorkProc();

private:
HANDLE m_hThread;
FoundItemFn m_foundItemFn;
};


    这里需要注意的是Init()函数接受一个FoundItemFn的函数对象,这个对象是CProcessCenter出给它的回调,用来接收流水线产生的SquareInfo对象。

    下面是CWorkPipeline类各个函数的实现:

[cpp]
view plain
copy

print?

/************************************************************************/  
/* FileName: WorkPipeline.cpp 
 * Date: 2015-5-14 
 * Author: chenzba 
 * Description: 
 */  
/************************************************************************/  
  
#include "stdafx.h"  
#include "WorkPipeline.h"  
#include <time.h>  
  
CWorkPipeline::CWorkPipeline(): m_hThread(NULL)  
{  
  
}  
  
  
CWorkPipeline::~CWorkPipeline()  
{  
  
}  
  
  
bool CWorkPipeline::Init(const FoundItemFn &foundItemFn)  
{  
    srand(unsigned int(time(NULL)));  
  
    // 创建线程  
    m_hThread = CreateThread(NULL, 0, &CWorkPipeline::WorkThread, this, CREATE_SUSPENDED, NULL);  
    if (NULL == m_hThread) {  
        return false;  
    }  
  
    m_foundItemFn = foundItemFn;  
    ResumeThread(m_hThread);  
    return true;  
}  
  
  
bool CWorkPipeline::UnInit()  
{  
    if (m_hThread != NULL)   
    {  
        CloseHandle(m_hThread);  
        m_hThread = NULL;  
    }  
  
    m_foundItemFn = FoundItemFn();  
    return true;  
}  
  
  
DWORD CALLBACK CWorkPipeline::WorkThread(LPVOID lpParam)  
{  
    if (NULL == lpParam) {  
        return 0;  
    }  
  
    CWorkPipeline *lpThis = reinterpret_cast<CWorkPipeline *> (lpParam);  
    return lpThis->WorkProc();  
}  
  
  
// 线程处理函数  
DWORD CWorkPipeline::WorkProc()  
{  
    while (true)  
    {  
        // 声明一个SquareInfo对象,给x和y随机赋值  
        SquareInfo squareInfo = {};  
        squareInfo.x = rand() % 10000;  
        squareInfo.y = rand() % 10000;  
  
        // 将squreInfo通知给回调  
        m_foundItemFn(squareInfo);  
  
        // Sleep一段时间  
        Sleep(max(20, rand() % 2000));  
    }  
          
    return 0;  
}  

/************************************************************************/
/* FileName: WorkPipeline.cpp
* Date: 2015-5-14
* Author: chenzba
* Description:
*/
/************************************************************************/

#include "stdafx.h"
#include "WorkPipeline.h"
#include <time.h>

CWorkPipeline::CWorkPipeline(): m_hThread(NULL)
{

}

CWorkPipeline::~CWorkPipeline()
{

}

bool CWorkPipeline::Init(const FoundItemFn &foundItemFn)
{
srand(unsigned int(time(NULL)));

// 创建线程
m_hThread = CreateThread(NULL, 0, &CWorkPipeline::WorkThread, this, CREATE_SUSPENDED, NULL);
if (NULL == m_hThread) {
return false;
}

m_foundItemFn = foundItemFn;
ResumeThread(m_hThread);
return true;
}

bool CWorkPipeline::UnInit()
{
if (m_hThread != NULL)
{
CloseHandle(m_hThread);
m_hThread = NULL;
}

m_foundItemFn = FoundItemFn();
return true;
}

DWORD CALLBACK CWorkPipeline::WorkThread(LPVOID lpParam)
{
if (NULL == lpParam) {
return 0;
}

CWorkPipeline *lpThis = reinterpret_cast<CWorkPipeline *> (lpParam);
return lpThis->WorkProc();
}

// 线程处理函数
DWORD CWorkPipeline::WorkProc()
{
while (true)
{
// 声明一个SquareInfo对象,给x和y随机赋值
SquareInfo squareInfo = {};
squareInfo.x = rand() % 10000;
squareInfo.y = rand() % 10000;

// 将squreInfo通知给回调
m_foundItemFn(squareInfo);

// Sleep一段时间
Sleep(max(20, rand() % 2000));
}

return 0;
}


    每个流水线对象都有一个线程,线程的工作就是没隔一个不确定的时间产生一个SquareInfo对象,并把它抛给外面(CProcessCenter类对象)。这段代码就是m_foundItemFn(squareInfo)。

    这里值得注意的一点是:UnInit()函数中必须将m_foundItemFn对象置为空对象,假如不这样做,如果CProcessCenter对象已经从栈释放,而流水线线程还没退出,继续调用m_foundItemFn(squareInfo)将导致不可预料的错误。

    下面看看CProcessCenter类的定义:

[cpp]
view plain
copy

print?

/************************************************************************/  
/* FileName: ProcessCenter.h 
 * Date: 2015-5-14 
 * Author: chenzba 
 * Description: 数据处理汇集中心,处理多个线程汇集过来的数据 
 */  
/************************************************************************/  
  
#pragma once  
#include <atlbase.h>  
#include <atlsync.h>  
#include <vector>  
#include <list>  
  
struct SquareInfo;  
class CWorkPipeline;  
  
class CProcessCenter  
{  
public:  
    typedef ATL::CComAutoCriticalSection DataLock;  
  
public:  
    CProcessCenter();  
    ~CProcessCenter();  
  
    bool Init();  
    bool UnInit();  
    void DoWork();  
  
    static DWORD CALLBACK ProcessThread(LPVOID lpParam);  
    DWORD ProcessProc();  
  
    //  
    // 被CWorkPileline回调的函数  
    //  
    void NotifyFoundItem(const SquareInfo &squareInfo);  
  
    //  
    // 通知线程退出  
    //  
    void NotifyExit();  
  
    void ProcessData();  
private:  
    std::vector<CWorkPipeline *>    m_workLineList;     // 工作流水线线程列表  
    std::list<SquareInfo>           m_dataList;         // 流水线产生的数据列表  
    DataLock                        m_dataLock;         // m_dataList数据锁  
    HANDLE                          m_hThread;  
    ATL::CEvent                     m_pipeEvent;        // 通知处理数据事件  
    bool                            m_isExit;  
};  

/************************************************************************/
/* FileName: ProcessCenter.h
* Date: 2015-5-14
* Author: chenzba
* Description: 数据处理汇集中心,处理多个线程汇集过来的数据
*/
/************************************************************************/

#pragma once
#include <atlbase.h>
#include <atlsync.h>
#include <vector>
#include <list>

struct SquareInfo;
class CWorkPipeline;

class CProcessCenter
{
public:
typedef ATL::CComAutoCriticalSection DataLock;

public:
CProcessCenter();
~CProcessCenter();

bool Init();
bool UnInit();
void DoWork();

static DWORD CALLBACK ProcessThread(LPVOID lpParam);
DWORD ProcessProc();

//
// 被CWorkPileline回调的函数
//
void NotifyFoundItem(const SquareInfo &squareInfo);

//
// 通知线程退出
//
void NotifyExit();

void ProcessData();
private:
std::vector<CWorkPipeline *>    m_workLineList;     // 工作流水线线程列表
std::list<SquareInfo>           m_dataList;         // 流水线产生的数据列表
DataLock                        m_dataLock;         // m_dataList数据锁
HANDLE                          m_hThread;
ATL::CEvent                     m_pipeEvent;        // 通知处理数据事件
bool                            m_isExit;
};


    首先我们看Init()和UnInit()函数的实现:

[cpp]
view plain
copy

print?

/************************************************************************/  
/* FileName: ProcessCenter.cpp 
 * Date: 2015-5-14 
 * Author: chenzba 
 * Description: The implement of CProcessCenter class. 
 */  
/************************************************************************/  
  
#include "stdafx.h"  
#include "ProcessCenter.h"  
#include "WorkPipeline.h"  
#include <functional>  
#include <iostream>  
#include "../boost/tr1/functional.hpp"  
  
CProcessCenter::CProcessCenter(): m_hThread(NULL), m_isExit(false)  
{  
  
}  
  
  
CProcessCenter::~CProcessCenter()  
{  
  
}  
  
  
bool CProcessCenter::Init()  
{  
    // 创建事件  
    BOOL created = m_pipeEvent.Create(NULL, TRUE, FALSE, NULL);  
    if (!created) {  
        return false;  
    }  
  
    m_hThread = CreateThread(NULL, 0, &CProcessCenter::ProcessThread, this, CREATE_SUSPENDED, NULL);  
    if (NULL == m_hThread)  
    {  
        UnInit();  
        return false;  
    }  
  
    // 创建10个工作流水线  
    for (int i = 0; i < 10; i++)  
    {  
        CWorkPipeline *pipeLine = new(std::nothrow) CWorkPipeline();  
        if (pipeLine != NULL)  
        {  
            pipeLine->Init(std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1));  
            m_workLineList.push_back(pipeLine);  
        }  
    }  
  
    m_isExit = false;  
    ResumeThread(m_hThread);  
     
    return true;  
}  
  
  
bool CProcessCenter::UnInit()  
{  
    // 释放流水线资源  
    std::vector<CWorkPipeline *>::iterator it;  
    for (it = m_workLineList.begin(); it != m_workLineList.end(); it++)  
    {  
        if ((*it) != NULL)  
        {  
            (*it)->UnInit();  
            delete (*it);  
            (*it) = NULL;  
        }  
    }  
    m_workLineList.clear();  
  
    if (m_pipeEvent != NULL) {  
        m_pipeEvent.Set();  
    }  
  
    if (m_hThread != NULL)  
    {  
        WaitForSingleObject(m_hThread, 100);  
        CloseHandle(m_hThread);  
        m_hThread = NULL;  
    }  
    m_pipeEvent.Close();  

/************************************************************************/
/* FileName: ProcessCenter.cpp
* Date: 2015-5-14
* Author: chenzba
* Description: The implement of CProcessCenter class.
*/
/************************************************************************/

#include "stdafx.h"
#include "ProcessCenter.h"
#include "WorkPipeline.h"
#include <functional>
#include <iostream>
#include "../boost/tr1/functional.hpp"

CProcessCenter::CProcessCenter(): m_hThread(NULL), m_isExit(false)
{

}

CProcessCenter::~CProcessCenter()
{

}

bool CProcessCenter::Init()
{
// 创建事件
BOOL created = m_pipeEvent.Create(NULL, TRUE, FALSE, NULL);
if (!created) {
return false;
}

m_hThread = CreateThread(NULL, 0, &CProcessCenter::ProcessThread, this, CREATE_SUSPENDED, NULL);
if (NULL == m_hThread)
{
UnInit();
return false;
}

// 创建10个工作流水线
for (int i = 0; i < 10; i++)
{
CWorkPipeline *pipeLine = new(std::nothrow) CWorkPipeline();
if (pipeLine != NULL)
{
pipeLine->Init(std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1));
m_workLineList.push_back(pipeLine);
}
}

m_isExit = false;
ResumeThread(m_hThread);

return true;
}

bool CProcessCenter::UnInit()
{
// 释放流水线资源
std::vector<CWorkPipeline *>::iterator it;
for (it = m_workLineList.begin(); it != m_workLineList.end(); it++)
{
if ((*it) != NULL)
{
(*it)->UnInit();
delete (*it);
(*it) = NULL;
}
}
m_workLineList.clear();

if (m_pipeEvent != NULL) {
m_pipeEvent.Set();
}

if (m_hThread != NULL)
{
WaitForSingleObject(m_hThread, 100);
CloseHandle(m_hThread);
m_hThread = NULL;
}
m_pipeEvent.Close();


[cpp]
view plain
copy

print?

    m_isExit = true;  
  
    return true;  
}  

m_isExit = true;

return true;
}


    这里模拟数据处理中心拥有10个工作流水线,将它们分配在堆上,并用一个vector的指针列表存放。注意事件对象和线程的创建顺序,ResumeThread要放到最后,这意味着当所有资源创建完成后才启动线程。

    要说明一点,

[cpp]
view plain
copy

print?

std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1)  

std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1)
    使用bind方法产生一个函数对象,有些人编译的时候可能找不到bind函数。如果编译环境支持c++11(vs2012以上),bind函数属于c++标准库,直接在std::中就能找到;但是在vs2008或者以下,不支持c++11的编译环境(也就是我现在写代码的环境),bind函数是属于boost库的函数。Boost库可以跟C++标准库完美共同工作,并且为其提供扩展功能,因此,这里需要引用Boost库,相关库大家可以到网上了解相关信息。

    下面我们继续看NotifyFoundItem函数的实现:

[cpp]
view plain
copy

print?

void CProcessCenter::NotifyFoundItem(const SquareInfo &squareInfo)  
{  
    // 数据同步 加锁  
    CCritSecLock locker(m_dataLock.m_sec);  
    m_dataList.push_back(squareInfo);  
    m_pipeEvent.Set();  
}  

void CProcessCenter::NotifyFoundItem(const SquareInfo &squareInfo)
{
// 数据同步 加锁
CCritSecLock locker(m_dataLock.m_sec);
m_dataList.push_back(squareInfo);
m_pipeEvent.Set();
}
    我们回到CWorkPipeline中的线程函数,m_foundItemFn(squareInfo),这句代码就进入NotifyFoundItem函数,在这里找个函数被10线程调用,m_dataList是共享数据,这里给m_dataList加锁。下面继续,看看加锁的原因。

[cpp]
view plain
copy

print?

DWORD CALLBACK CProcessCenter::ProcessThread(LPVOID lpParam)  
{  
    if (NULL == lpParam) {  
        return 0;  
    }  
  
    CProcessCenter *lpThis = reinterpret_cast<CProcessCenter *> (lpParam);  
    return lpThis->ProcessProc();  
}  
  
  
DWORD CProcessCenter::ProcessProc()  
{  
    while (true)  
    {  
        DWORD dwResult = WaitForSingleObject(m_pipeEvent, 2000);  
        if (dwResult == WAIT_TIMEOUT && (!m_isExit)) {  
            continue;  
        }  
  
        // 处理数据  
        ProcessData();  
  
        if (m_isExit) {  
            break;  
        }  
    }  
  
    return 0;  
}  

DWORD CALLBACK CProcessCenter::ProcessThread(LPVOID lpParam)
{
if (NULL == lpParam) {
return 0;
}

CProcessCenter *lpThis = reinterpret_cast<CProcessCenter *> (lpParam);
return lpThis->ProcessProc();
}

DWORD CProcessCenter::ProcessProc()
{
while (true)
{
DWORD dwResult = WaitForSingleObject(m_pipeEvent, 2000);
if (dwResult == WAIT_TIMEOUT && (!m_isExit)) {
continue;
}

// 处理数据
ProcessData();

if (m_isExit) {
break;
}
}

return 0;
}


[cpp]
view plain
copy

print?

void CProcessCenter::ProcessData()  
{  
    std::list<SquareInfo> tempList;  
  
    m_dataLock.Lock();  
    tempList = m_dataList;  
    m_dataList.clear();  
    m_pipeEvent.Reset();  
    m_dataLock.Unlock();  
  
    int counter = 0;  
    std::list<SquareInfo>::iterator it;  
    for (it = tempList.begin(); it != tempList.end(); it++)  
    {  
        counter++;  
        (*it).squareSum = (*it).x * (*it).x + (*it).y * (*it).y;  
        std::cout <<"x: " <<(*it).x <<" y: " <<(*it).y <<" square sum: " <<(*it).squareSum <<std::endl;  
    }  
  
    std::cout <<"-------------------------------------------------------" <<counter <<std::endl;  
}  

void CProcessCenter::ProcessData()
{
std::list<SquareInfo> tempList;

m_dataLock.Lock();
tempList = m_dataList;
m_dataList.clear();
m_pipeEvent.Reset();
m_dataLock.Unlock();

int counter = 0;
std::list<SquareInfo>::iterator it;
for (it = tempList.begin(); it != tempList.end(); it++)
{
counter++;
(*it).squareSum = (*it).x * (*it).x + (*it).y * (*it).y;
std::cout <<"x: " <<(*it).x <<" y: " <<(*it).y <<" square sum: " <<(*it).squareSum <<std::endl;
}

std::cout <<"-------------------------------------------------------" <<counter <<std::endl;
}


[cpp]
view plain
copy

print?

void CProcessCenter::NotifyExit()  
{  
    m_isExit = true;  
}  
  
  
void CProcessCenter::DoWork()  
{  
    Sleep(20000);  
}  

void CProcessCenter::NotifyExit()
{
m_isExit = true;
}

void CProcessCenter::DoWork()
{
Sleep(20000);
}


    NotifyItemFound函数接收到有数据加入到m_dataList中,就设置m_pipeEvent事件,通知线程处理数据。ProcessProc判断m_isExit为false并且WaitForSingleObject返回不是WAIT_TIMEOUT,进入ProcessData()函数。
    ProcessData声明一个tempList临时对象,然后把m_dataList的值赋给tempList,最后把m_dataList清空。

    首先我们先探讨线程同步问题,假如不给m_dataList加锁,当线程执行到tempList = m_dataList之后,另外一个CWorkPipeline线程获得cpu时间片,执行m_dataList.push(),这时候m_dataList新增一个未处理的数据,接下来返回到tempList = m_dataList后,执行m_dataList.clear(),这样新增的未处理的数据就丢失了,因此这里必须给m_dataList上锁,保证任何时刻只有一个线程获得m_dataList的使用权限。

    这里有人可能会疑问,为什么要另外声明一个tempList对象呢?为什么不直接使用m_dataList用于计算呢?


    原因是,在多线程同步中,我们为了提高程序的执行效率,尽可能的让线程获得共享数据(m_dataList)的使用权限时间最短,假如我们直接使用m_dataList计算数据,那么我们必须对一大堆计算逻辑的代码进行加锁或者在每次访问m_dataList的时候加锁,这样效率是相当慢的。我想大家应该很容易理解明白。

    好了,多线程就暂时讲到这里,语言冗余,请大家多多包涵,也希望大家多多指正其中可能的错误。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: