您的位置:首页 > 理论基础 > 计算机网络

UNIX网络编程:读写锁

2016-06-01 17:13 288 查看
之前我们整理了互斥锁与条件变量问题它保证了共享资源的安全,但在多线程中我们也会经常对共享数据进行读、写操作。也就是说对某些资源的访问会 存在两种可能的情况,一种是访问必须是排查性的,就是独占的意思,这称作写操作;另一种情况就是访问方式可以是共享的,就是说可以有多个线程同时去访问某个资源,这种就称作读操作。这个问题模型是从对文件的读写操作中引申出来的。

读写锁比起mutex具有更高的适用性,具有更高的并行性,可以有多个线程同时占用读模式的读写锁,但是只能有一个线程占用写模式的读写锁,读写锁的三种状态:

1.当读写锁是写加锁状态时,在这个锁被解锁之前,所有试图对这个锁加锁的线程都会被阻塞

2.当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是以写模式对它进行加锁的线程将会被阻塞

3.当读写锁在读模式的锁状态时,如果有另外的线程试图以写模式加锁,读写锁通常会阻塞随后的读模式锁的请求,这样可以避免读模式锁长期占用,而等待的写模式锁请求则长期阻塞。

读写锁最适用于对数据结构的读操作次数多于写操作的场合,因为,读模式锁定时可以共享,而写模式锁定时只能某个线程独占资源,因而,读写锁也可以叫做个共享-独占锁。

处理读-写问题的两种常见策略是强读者同步和强写者同步

强读者同步中,总是给读者更高的优先权,只要写者当前没有进行写操作,读者就可以获得访问权限;而在强写者同步中,则往往将优先权交付给写者,而读者只能等到所有正在等待的或者是正在执行的写者结束以后才能执行。系统中的读写锁时用的强写者,这是为了避免再修改数据时,读操作先执行导致读出的数据无效。

下面分别是强读者和强写者的函数实现代码

强写者实现代码::

pthread_rwlock.h:

#pragma once                  //只编译一次
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>

typedef struct
{
pthread_mutex_t rw_mutex;         //定义一个读写互斥量
pthread_cond_t  rw_condreaders;   //定义一个读者条件变量
pthread_cond_t  rw_condwriters;   //定义一个写着条件变量
int             rw_magic;         //定义一个魔术值,来标记是否空闲
int             rw_nwaitwriters;  //定义一个读等待标记等待读的线程数
int             rw_nwaitreaders;  //定义一个写等待标记等待写的线程数
int             rw_refcount;      //标记现在所使用的个数,只能为  -1(代表有线程进行写操作)、0(没有线程对共享数据进行读写操作)、
//>0(代表现在进行读操作的线程数,因为可以同时多个线程进行读操作,所以可以一个大于0的整数)。
}my_pthread_rwlock_t;

#define RW_MAGIC   0x19283746         //初始魔数值

//初始化my_pthread_rwlock_t。
#define
PTHREAD_RWLOCK_INITIALIZER{PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER,\
PTHREAD_COND_INITIALIZER, RW_MAGIC, 0, 0, 0};

typedef int my_pthread_rwlockattr_t;

int my_pthread_rwlock_init(my_pthread_rwlock_t *, my_pthread_rwlockattr_t *);      //读写锁初始化函数
int my_pthread_rwlock_rdlock(my_pthread_rwlock_t *rw);   //读锁函数
int my_pthread_rwlock_wrlock(my_pthread_rwlock_t *rw);  //写锁函数
int my_pthread_rwlock_tryrdlock(my_pthread_rwlock_t *rw);  //尝试上读锁
int my_pthread_rwlock_trywrlock(my_pthread_rwlock_t *rw);  //尝试上写锁
int my_pthread_rwlock_unlock(my_pthread_rwlock_t *rw);  //解锁函数
int my_pthread_rwlock_destroy(my_pthread_rwlock_t *rw);  //销毁读写锁


pthread_rwlock.cpp:

#include "pthread_rwlock.h"

//读写锁初始化函数
int my_pthread_rwlock_init(my_pthread_rwlock_t *rw, my_pthread_rwlockattr_t *attr)
{
int res;
if(attr != NULL){       //如果创建属性不为NULL,就返回不可用
return(EINVAL);
}
if(res = pthread_mutex_init(&rw->rw_mutex, NULL) != 0){            //初始化互斥锁,如果成功则返回0,否则初始化互斥锁失败
goto err1;      //初始化失败调转到err1处理情况
}
if(res = pthread_cond_init(&rw->rw_condreaders, NULL) != 0){        //初始化读条件变量,如果成功则返回0,否则初始化读条件变量失败
goto err2;    //初始化失败后调转到err2处理方法
}
if(res = pthread_cond_init(&rw->rw_condwriters, NULL) != 0){       //初始化写条件变量,如果成功返回0,
goto err3;   //初始化失败后跳转到err3处理方法。
}

rw->rw_nwaitreaders = 0;      //初始化等待读操作的线程数为0
rw->rw_nwaitwriters = 0;      //初始化等待写操作的线程数为0
rw->rw_refcount = 0;          //初始化正在操作共享数据的线程数为0
rw->rw_magic = RW_MAGIC;      //将初始值赋给检验变量魔数

return 0;

err3:  //因为执行err3是创建了读条件变量和互斥锁,锁以销毁读条件变量,然后顺序执行进入err2销毁互斥量,
//最后顺序执行err1,返回错误参数。这就是如此设计的原因。
pthread_cond_destroy(&rw->rw_condreaders);
err2:  //如果是直接跳入err2的,那此时就只有互斥量了,直接销毁它然后返回错误参数。
pthread_mutex_destroy(&rw->rw_mutex);
err1:  //此时还没有创建成功任何变量或所创建变量已在err3和err2中销毁,直接返回错误参数。
return res;
}

//销毁读写锁函数
int my_pthread_rwlock_destroy(my_pthread_rwlock_t *rw)
{
if(rw->rw_magic != RW_MAGIC){       //如果rw_magic不等于初始值,说明有线程在使用,直接返回
return EINVAL;
}
//如果有线程等待执行读操作或有线程在等待执行写操作,则返回EBUSY错误,并结束函数
if(rw->rw_nwaitreaders != 0 || rw->rw_nwaitwriters != 0 || rw->rw_refcount != 0){
return  EBUSY;
}
//如果rw_magic为初始值,且没有线程再等待读操作或写操作时,将互斥量、读条件变量、写条件变量分别销毁
pthread_mutex_destroy(&rw->rw_mutex);
pthread_cond_destroy(&rw->rw_condreaders);
pthread_cond_destroy(&rw->rw_condwriters);

rw->rw_magic = 0;   //最后将rw_magic置为0

return 0;
}

//读锁的实现函数
int my_pthread_rwlock_rdlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){   //如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}
//查看rw_refcount的值看是否小于0(即是否有线程对其进行写操作),或是否有线程正在等待对其进行写操作,
//如果有则将rw_nwaitreaders值加1,并将程序阻塞等待读条件变量后在对其上所进行读操作
//(因为此程序实现的是写优先,所以要等到没有写操作线程或等待写操作的线程时,才能让进行读的线程获得资源)
while(rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0){
rw->rw_nwaitreaders++;
res = pthread_cond_wait(&rw->rw_condreaders, &rw->rw_mutex);
if(res != 0){
break;
}
}
//如果读锁加成功后,给refcount加1,因为正数值代表的时进行读操作的线程个数
if(res == 0){
rw->rw_refcount++;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,
//保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unlock(&rw->rw_mutex);
return res;
}

//写操作上锁函数
int my_pthread_rwlock_wrlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){   //如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if(res = pthread_mutex_lock(&rw->rw_mutex) != 0){
return res;
}

//查看rw_refcount的值看是否为0(即是否有线程对其进行操作),如果rw_refcount不为0,则将rw_nwaitwriters值加1,
//并将程序阻塞等待接收到写条件变量后再对其上所进行读操作(因为此程序实现的是写优先,所以只要没有线程操作时,就让该线程进行写操作)
while(rw->rw_refcount != 0){
rw->rw_nwaitwriters ++;
res = pthread_cond_wait(&rw->rw_condwriters, &rw->rw_mutex);
rw->rw_nwaitwriters--;
if(res != 0){
break;
}
}
//如果读锁加成功后,给refcount置为-1,因为-1代表写操作(因为只能同时有且仅有一个线程进行写操作,所以,加读锁后rw_refcount只能为-1)
if(res == 0){
rw->rw_refcount = -1;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unlock(&rw->rw_mutex);

return res;
}

int my_pthread_rwlock_tryrdlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){   //如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}
//此处是尝试上所函数中读写优先的不同地方。
//如果有写线程正在执行或有写线程在等待,则返回EBUSY
if(rw->rw_refcount < 0 || rw->nwaitwriters > 0){
res = EBUSY;
}else{    //否则,让rw_refcount加1,让其准确的记录当前正在执行的读进程个数
rw->rw_refcount ++;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,
//保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unclock(&rw->rw_mutex);
return res;
}

int my_pthread_rwlock_trywrlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){
//如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}
//如果正在操作的线程数不为0,则返回EBUSY
if(rw->rw_refcount != 0){
res = EBUSY;
}else{    //否则,让rw_refcount = -1,让其状态为有写进程操作
rw->rw_refcount = -1;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,
//保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unclock(&rw->rw_mutex);
return res;
}
//解锁函数
int my_pthread_rwlock_unlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){
//如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}
//如果此时rw_refcount大于0,所以现在再执行的操作为读操作,给rw_refcount减1,解除读线程的锁
if(rw->rw_refcount > 0){
rw->rw_refcount--;
}else if(rw->rw_refcount == -1){  //如果rw_refcount = -1,说明执行的是写操作,将其值赋为0,解除写线程的锁
rw->rw_refcount = 0;
}else{
rw->rw_refcount = 0;
}
//此处是写着优先和读者优先实现的第二处不同
if(rw->rw_nwaitwriters > 0){    //当等待写操作的线程数不为0
if(rw->rw_refcount == 0){   //rw_refcount正在执行的线程数为0
//发送写条件变量,通知写线程有空闲资源
res = pthread_cond_signal(&rw->rw_condwriters);
}
}else if(rw->rw_nwaitreaders > 0){
//再没有写线程等待操作且有可用资源的前提下,广播发送读条件变量,通知所有等待读操作的线程有空闲资源
res = pthread_cond_broadcast(&rw->rw_condreaders);
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,
//保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unlock(&rw->rw_mutex);
return res;
}


读者优先程序代码:

pthread_rwlock.cpp:

#include "pthread_rwlock.h"

//读写锁初始化函数
int my_pthread_rwlock_init(my_pthread_rwlock_t *rw, my_pthread_rwlockattr_t *attr)
{
int res;
if(attr != NULL){       //如果创建属性不为NULL,就返回不可用
return(EINVAL);
}
if(res = pthread_mutex_init(&rw->rw_mutex, NULL) != 0){            /初始化互斥锁,如果成功则返回0,否则初始化互斥锁失败
goto err1;      //初始化失败调转到err1处理情况
}
if(res = pthread_cond_init(&rw->rw_condreaders, NULL) != 0){        //初始化读条件变量,如果成功则返回0,否则初始化读条件变量失败
goto err2;    //初始化失败后调转到err2处理方法
}
if(res = pthread_cond_init(&rw->rw_condwriters, NULL) != 0){       //初始化写条件变量,如果成功返回0,
goto err3;   //初始化失败后跳转到err3处理方法。
}

rw->rw_nwaitreaders = 0;      //初始化等待读操作的线程数为0
rw->rw_nwaitwriters = 0;      //初始化等待写操作的线程数为0
rw->rw_refcount = 0;          //初始化正在操作共享数据的线程数为0
rw->rw_magic = RW_MAGIC;      //将初始值赋给检验变量魔数

return 0;

err3:  //因为执行err3是创建了读条件变量和互斥锁,锁以销毁读条件变量,然后顺序执行进入err2销毁互斥量,
//最后顺序执行err1,返回错误参数。这就是如此设计的原因。
pthread_cond_destroy(&rw->rw_condreaders);
err2:  //如果是直接跳入err2的,那此时就只有互斥量了,直接销毁它然后返回错误参数。
pthread_mutex_destroy(&rw->rw_mutex);
err1:  //此时还没有创建成功任何变量或所创建变量已在err3和err2中销毁,直接返回错误参数。
return res;
}

//销毁读写锁函数
int my_pthread_rwlock_destroy(my_pthread_rwlock_t *rw)
{
if(rw->rw_magic != RW_MAGIC){       //如果rw_magic不等于初始值,说明有线程在使用,直接返回
return EINVAL;
}
//如果有线程等待执行读操作或有线程在等待执行写操作,则返回EBUSY错误,并结束函数
if(rw->rw_nwaitreaders != 0 || rw->rw_nwaitwriters != 0 || rw->rw_refcount != 0){
return  EBUSY;
}
//如果rw_magic为初始值,且没有线程再等待读操作或写操作时,将互斥量、读条件变量、写条件变量分别销毁
pthread_mutex_destroy(&rw->rw_mutex);
pthread_cond_destroy(&rw->rw_condreaders);
pthread_cond_destroy(&rw->rw_condwriters);

rw->rw_magic = 0;   //最后将rw_magic置为0

return 0;
}

//读锁的实现函数
int my_pthread_rwlock_rdlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){   //如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}
//查看rw_refcount的值看是否小于0(即是否有线程对其进行写操作),如果有写线程正在执行,则将rw_nwaitreaders值加1,
//并将程序阻塞等待读条件变量后在对其上所进行读操作
//(因为此程序实现的是读优先,所以只要没有写线程正在执行,就让进行读的线程获得资源)
while(rw->rw_refcount < 0){
rw->rw_nwaitreaders++;
res = pthread_cond_wait(&rw->rw_condreaders, &rw->rw_mutex);
if(res != 0){
break;
}
}
//如果读锁加成功后,给refcount加1,因为正数值代表的时进行读操作的线程个数
if(res == 0){
rw->rw_refcount++;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unlock(&rw->rw_mutex);
return res;
}

//写操作上锁函数
int my_pthread_rwlock_wrlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){
//如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if(res = pthread_mutex_lock(&rw->rw_mutex) != 0){
return res;
}

//查看rw_refcount的值看是否为0(即是否有线程对其进行操作),如果rw_refcount不为0或有读线程在等待,
//则将rw_nwaitwriters值加1,并将程序阻塞等待接收到写条件变量后再对其上锁进行读操作
//(因为此程序实现的是读优先,所以只有当没有线程操作且没有读线程等待时,才能让写线程获得资源进行写操作)
while(rw->rw_refcount != 0 || rw->rw_nwaitreaders > 0){
rw->rw_nwaitwriters ++;
res = pthread_cond_wait(&rw->rw_condwriters, &rw->rw_mutex);
rw->rw_nwaitwriters--;
if(res != 0){
break;
}
}
//如果读锁加成功后,给refcount置为-1,因为-1代表写操作
//(因为只能同时有且仅有一个线程进行写操作,所以,加读锁后rw_refcount只能为-1)
if(res == 0){
rw->rw_refcount = -1;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unlock(&rw->rw_mutex);

return res;
}

int my_pthread_rwlock_tryrdlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){
//如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}
//此处是尝试上所函数中读写优先的不同地方。
//如果有写线程正在执行,则返回EBUSY
if(rw->rw_refcount < 0){
res = EBUSY;
}else{    //否则,让rw_refcount加1,让其准确的记录当前正在执行的读进程个数
rw->rw_refcount ++;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unclock(&rw->rw_mutex);
return res;
}

int my_pthread_rwlock_trywrlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){   //如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}
//如果正在操作的线程数不为0或等待读操作的线程个数不为0,则返回EBUSY
if(rw->rw_refcount != 0 || rw->nwaitreaders > 0){
res = EBUSY;
}else{    //否则,让rw_refcount = -1,让其状态为有写进程操作
rw->rw_refcount = -1;
}
//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unclock(&rw->rw_mutex);
return res;
}
//解锁函数
int my_pthread_rwlock_unlock(my_pthread_rwlock_t *rw)
{
int res;
if(rw->rw_magic != RW_MAGIC){
//如果rw_magic不为初始值说明有线程正在进行操作,直接返回EINVAL错误并结束函数
return EINVAL;
}
//无论何时操作my_pthread_rwlock结构体都要对rw_mutex成员上锁
if((res = pthread_mutex_lock(&rw->rw_mutex)) != 0){
return res;
}

if(rw->rw_refcount == -1){  //如果rw_refcount = -1,说明执行的是写操作,将其值赋为0,解除写线程的锁
rw->rw_refcount = 0;
}else if(rw->rw_refcount > 0){  //如果此时rw_refcount大于0,所以现在再执行的操作为读操作,给rw_refcount减1,解除读线程的锁
rw->rw_refcount--;
}else{
rw->rw_refcount = 0;
}
if(rw->rw_nwaitreaders > 0){  //如果有读线程等待则广播发送读条件变量,通知所有等待读操作的线程有空闲资源
res = pthread_cond_broadcast(&rw->rw_condreaders);
}else if(rw->rw_nwaitwriters > 0){  //如果没有读线程等待且等待写操作的线程数不为0时,
//发送写条件变量,通知写线程有空闲资源
if(rw->rw_refcount == 0){
res = pthread_cond_signal(&rw->rw_condwriters);
}
}

//对控制my_pthread_rwlock结构体操作的rw_mutex成员解锁,保证让其他线程可以操作my_pthread_rwlock结构体
pthread_mutex_unlock(&rw->rw_mutex);
return res;
}


测试代码:

test.cpp:

#include<unistd.h>
#include<stdio.h>
#include<pthread.h>
#include "pthread_rwlock.h"

my_pthread_rwlock_t rwlock;
//
void* thread_fun1(void *arg)
{
my_pthread_rwlock_wrlock(&rwlock);
printf("This is fun1.\n");
sleep(5);
my_pthread_rwlock_unlock(&rwlock);
}
void* thread_fun2(void *arg)
{
printf("thread2 is run......\n");
my_pthread_rwlock_rdlock(&rwlock);
printf("This is fun2.\n");
my_pthread_rwlock_unlock(&rwlock);
}
void* thread_fun3(void *arg)
{
printf("thread3 is run......\n");
my_pthread_rwlock_rdlock(&rwlock);
printf("This is fun3.\n");
my_pthread_rwlock_unlock(&rwlock);
}
void* thread_fun4(void *arg)
{
printf("thread4 is run......\n");
my_pthread_rwlock_wrlock(&rwlock);
printf("This is fun4.\n");
my_pthread_rwlock_unlock(&rwlock);
}

int main()
{
my_pthread_rwlock_init(&rwlock, NULL);
pthread_t tid[4];
pthread_create(&tid[0], NULL, thread_fun1, NULL);
sleep(1);
pthread_create(&tid[1], NULL, thread_fun2, NULL);
pthread_create(&tid[2], NULL, thread_fun3, NULL);
pthread_create(&tid[3], NULL, thread_fun4, NULL);

pthread_join(tid[0], NULL);
pthread_join(tid[1], NULL);
pthread_join(tid[2], NULL);
pthread_join(tid[3], NULL);
my_pthread_rwlock_destroy(&rwlock);
return 0;
}


运行结果:

用写者优先程序测试结果:



用读者优先程序测试结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息