您的位置:首页 > 运维架构 > Linux

linux线程之间的通信

2016-06-14 11:28 330 查看
线程之间的通信:

1、  互斥量

用的api函数有:pthread_mutex_init、pthread_mutex_lock、pthread_mutex_unlock、pthread_mutex_trylock

示例:
//thread_mutex.cpp

#include <iostream>
#include<queue>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include<signal.h>
using namespace std;

typedef unsigned int u32;
typedef unsigned char byte;
bool flag_exit = false;
char *data ="test pthread mutex!";

typedef struct job
{
u32 id;
u32 len;
byte data[1];
}JOB,*PJOB;

queue<PJOB> g_thrd_queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

typedef void (*sighandler_t)(int);

void handlesignal(int signal)
{
cout << "+++handlesignal+++" << endl;
flag_exit = true;
}

PJOB packjob(u32 id,byte *pdata,u32 len )
{
PJOB pret = NULL;
int size = sizeof(JOB) + len;
pret = (PJOB)malloc(size);
if(pret != NULL)
{
memset(pret,0,size);
pret->id = id;
pret->len = size;
memcpy(pret->data,pdata,len);
}
return pret;
}

typedef void* (*pthrd_fun)(void*);

void * thrd_function_1(void *parg)
{
int iexit;
PJOB pjob;
int count = 0;
cout << "thrd id is:  " << pthread_self() << " run..." << endl;
while(!flag_exit)
{
pjob = packjob(count++,(byte*)data,strlen(data));
//锁定互斥体
pthread_mutex_lock(&mutex);

g_thrd_queue.push(pjob);
//解除互斥体
pthread_mutex_unlock(&mutex);

//睡眠2秒中
sleep(1);

}

cout << "thrd id is:  " << pthread_self() << " exit!" << endl;
iexit = 0;
pthread_exit(&iexit);
}

void * thrd_function_2(void *parg)
{
int iexit;
PJOB pjob;
cout << "thrd id is:  " << pthread_self() << " run..." << endl;
while(!flag_exit)
{
//锁定互斥体
pthread_mutex_lock(&mutex);

pjob = g_thrd_queue.front();
g_thrd_queue.pop();
//解除互斥体
pthread_mutex_unlock(&mutex);

cout << "id :"<<pjob->id << endl;
cout <<"len :"<< pjob->len << endl;
cout <<"data :" << (char*) pjob->data << endl;

//释放内存
free(pjob);

//睡眠2秒中
sleep(1);
}

cout << "thrd id is:  " << pthread_self() << " exit!" << endl;
iexit = 0;
pthread_exit(&iexit);
}

int main(int argc,char *argv[])
{
pthread_t thread_id1;
pthread_t thread_id2;
int iret;

cout << "pid id is:  " << getpid() << endl;

//安置信号处理函数
signal(SIGUSR1,handlesignal);

iret = pthread_create(&thread_id1,NULL,thrd_function_1,NULL);
if(iret != 0)
{
iret = 1;
}

iret = pthread_create(&thread_id2,NULL,thrd_function_2,NULL);
if(iret != 0)
{
iret = 1;
}

//等待子线程的退出
pthread_join(thread_id1,NULL);
pthread_join(thread_id2,NULL);

//释放内存
while(!g_thrd_queue.empty())
{
PJOB pjob;
pjob = g_thrd_queue.front();
g_thrd_queue.pop();

free(pjob);
}
cout << "main thread is exit!" << endl;
return iret;
}


注意:有时候我们需要检测一个互斥体的状态却不希望被阻塞,那么我们就可以pthread_mutex_trylock。当你对一个解锁状态的互斥体调用pthread_mutex_trylock 时,就如调用pthread_mutex_lock 一样会锁定这个互斥体;pthread_mutex_trylock 会返回 0。 而当互斥体已经被其它线程锁定的时候,pthread_mutex_trylock 不会阻塞。相应的,pthread_mutex_trylock 会返回错误码EBUSY。持有锁的其它线程不会受到影响。你可以稍后再次尝试锁定这个互斥体

2、 信号量

用的api函数有:sem_init、sem_wait、sem_post、sem_destory、sem_trywait

示例:
#include<iostream>
#include<queue>
//thread_sem.cc

#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
#include<signal.h>
#include<semaphore.h>

using namespace std;

typedef unsigned int u32;
typedef unsigned char byte;
bool flag_exit = false;
char *data ="test pthread mutex!";

typedef struct job
{
u32 id;
u32 len;
byte data[1];
}JOB,*PJOB;

queue<PJOB> g_thrd_queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

sem_t sem_job_count;

typedef void (*sighandler_t)(int);

void handlesignal(int signal)
{
cout << "+++handlesignal+++" << endl;
flag_exit = true;
}

PJOB packjob(u32 id,byte *pdata,u32 len )
{
PJOB pret = NULL;
int size = sizeof(JOB) + len;
pret = (PJOB)malloc(size);
if(pret != NULL)
{
memset(pret,0,size);
pret->id = id;
pret->len = size;
memcpy(pret->data,pdata,len);
}
return pret;
}

typedef void* (*pthrd_fun)(void*);

void * thrd_function_1(void *parg)
{
int iexit;
PJOB pjob;
int count = 0;
cout << "thrd id is:  " << pthread_self() << " run..." << endl;
while(!flag_exit)
{
pjob = packjob(count++,(byte*)data,strlen(data));
//锁定互斥体
pthread_mutex_lock(&mutex);

g_thrd_queue.push(pjob);

//信号量值+1
sem_post(&sem_job_count);

//解除互斥体
pthread_mutex_unlock(&mutex);

}

cout << "thrd id is:  " << pthread_self() << " exit!" << endl;
iexit = 0;
pthread_exit(&iexit);
}

void * thrd_function_2(void *parg)
{
int iexit;
PJOB pjob;
cout << "thrd id is:  " << pthread_self() << " run..." << endl;
while(!flag_exit)
{
//等待信号量+1
sem_wait(&sem_job_count);

//锁定互斥体
pthread_mutex_lock(&mutex);

pjob = g_thrd_queue.front();
g_thrd_queue.pop();
//解除互斥体
pthread_mutex_unlock(&mutex);

cout << "id :"<<pjob->id << endl;
cout <<"len :"<< pjob->len << endl;
cout <<"data :" << (char*) pjob->data << endl;

//释放内存
free(pjob);
}

cout << "thrd id is:  " << pthread_self() << " exit!" << endl;
iexit = 0;
pthread_exit(&iexit);
}

int main(int argc,char *argv[])
{
pthread_t thread_id1;
pthread_t thread_id2;
int iret;

cout << "pid id is:  " << getpid() << endl;

//安置信号处理函数
signal(SIGUSR1,handlesignal);

//初始化信号量
sem_init(&sem_job_count,0,0);

iret = pthread_create(&thread_id1,NULL,thrd_function_1,NULL);

if(iret != 0)
{
iret = 1;
}

iret = pthread_create(&thread_id2,NULL,thrd_function_2,NULL);
if(iret != 0)
{
iret = 1;
}

//等待子线程的退出
pthread_join(thread_id1,NULL);
pthread_join(thread_id2,NULL);

//释放内存
while(!g_thrd_queue.empty())
{
PJOB pjob;
pjob = g_thrd_queue.front();
g_thrd_queue.pop();

free(pjob);
}
cout << "main thread is exit!" << endl;
return iret;
}


注意:sem_trywait和pthread_mutex_trylock用法类似。

3、  条件变量

用的api函数有:pthread_cond_init、用pthread_cond_signal、pthread_cond_wait、,pthread_cond_broadcast、pthread_cond_destroy

示例:
//thread_con.cc

#include<iostream>
#include<queue>
#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
#include<signal.h>
#include<semaphore.h>

using namespace std;

typedef unsigned int u32;
typedef unsigned char byte;
bool flag_exit = false;
char *data ="test pthread mutex!";

typedef struct job
{
u32 id;
u32 len;
byte data[1];
}JOB,*PJOB;

queue<PJOB> g_thrd_queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t g_thread_flag_cv;

typedef void (*sighandler_t)(int);

void handlesignal(int signal)
{
cout << "+++handlesignal+++" << endl;
flag_exit = true;
}

PJOB packjob(u32 id,byte *pdata,u32 len )
{
PJOB pret = NULL;
int size = sizeof(JOB) + len;
pret = (PJOB)malloc(size);
if(pret != NULL)
{
memset(pret,0,size);
pret->id = id;
pret->len = size;
memcpy(pret->data,pdata,len);
}
return pret;
}

typedef void* (*pthrd_fun)(void*);

void * thrd_function_1(void *parg)
{
int iexit;
PJOB pjob;
int count = 0;
cout << "thrd id is:  " << pthread_self() << " run..." << endl;

//锁定互斥体
pthread_mutex_lock(&mutex);

while(count <10)
{
pjob = packjob(count++,(byte*)data,strlen(data));

g_thrd_queue.push(pjob);
}

flag_exit = true;

//触发条件变量
pthread_cond_signal(&g_thread_flag_cv);

//解除互斥体
pthread_mutex_unlock(&mutex);

cout << "thrd id is:  " << pthread_self() << " exit!" << endl;
iexit = 0;
pthread_exit(&iexit);
}

void * thrd_function_2(void *parg)
{
int iexit;
PJOB pjob;
cout << "thrd id is:  " << pthread_self() << " run..." << endl;

//锁定互斥体
pthread_mutex_lock(&mutex);

//等待条件变量
while(!flag_exit)
pthread_cond_wait(&g_thread_flag_cv,&mutex);

while(!g_thrd_queue.empty())
{

pjob = g_thrd_queue.front();
g_thrd_queue.pop();

cout << "id :"<<pjob->id << endl;
cout <<"len :"<< pjob->len << endl;
cout <<"data :" << (char*) pjob->data << endl;

//释放内存
free(pjob);
}

//解除互斥体
pthread_mutex_unlock(&mutex);

cout << "thrd id is:  " << pthread_self() << " exit!" << endl;
iexit = 0;
pthread_exit(&iexit);
}

int main(int argc,char *argv[])
{
pthread_t thread_id1;
pthread_t thread_id2;
int iret;

cout << "pid id is:  " << getpid() << endl;

//安置信号处理函数
signal(SIGUSR1,handlesignal);

//初始化条件量
pthread_cond_init(&g_thread_flag_cv,NULL);

iret = pthread_create(&thread_id1,NULL,thrd_function_1,NULL);

if(iret != 0)
{
iret = 1;
}

iret = pthread_create(&thread_id2,NULL,thrd_function_2,NULL);
if(iret != 0)
{
iret = 1;
}

//等待子线程的退出
pthread_join(thread_id1,NULL);
pthread_join(thread_id2,NULL);

//释放内存
while(!g_thrd_queue.empty())
{
PJOB pjob;
pjob = g_thrd_queue.front();
g_thrd_queue.pop();

free(pjob);
}

pthread_cond_destroy(&g_thread_flag_cv);

cout << "main thread is exit!" << endl;
return iret;
}


注意:pthread_cond_broadcast 函数会将所有等待该条件变量的线程解锁而不是

仅仅解锁一个线程。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: