您的位置:首页 > 其它

SDL中的互斥量和条件变量

2015-06-24 17:24 274 查看

一、概述

最简单的音视频播放器中,由于解复用和解码是在不同的线程中,存放包的队列是公共资源,需要互斥。解复用向队列添加包,解码从队列取包,也需要同步。所以队列的入队和出队操作,采用了互斥量和条件变量。

二、主要内容

互斥量

1、创建互斥量

SDL_mutex *mutex;
mutex = SDL_CreateMutex();
创建的互斥量默认是未上锁的。

2、上锁和解锁

if (SDL_LockMutex(mutex) == 0) {
  /* Do stuff while mutex is locked */
  SDL_UnlockMutex(mutex);
} else {
  fprintf(stderr, "Couldn't lock mutex\n");
}
成功返回0,否则返回<0

3、销毁锁

SDL_DestroyMutex(mutex);


条件变量

1、创建条件变量

SDL_cond* SDL_CreateCond(void)


2、等待条件变量

int SDL_CondWait(SDL_cond*  cond,
                 SDL_mutex* mutex)


信号激活后,返回0,否则返回错误代码。

SDL_ConWait必须在互斥量锁住之后才能调用。该函数会解锁锁住的互斥量,并等待拥有该锁的线程激活信号。激活后,会重新上锁。

3、激活信号
int SDL_CondSignal(SDL_cond* cond)
成功返回 0,否则返回错误代码。

SDL_ConSignal会激活等待的一个线程(根据优先级),而不是所有等待的线程。

4、销毁条件变量

void SDL_DestroyCond(SDL_cond* cond)


三、一个简单的例子

生产者向队列不断的放入产品,消费者不停的从队列取出产品进行消费。这里不考虑队列满的情况,假设队列容量是无限大。当没有产品时,消费者等待。

<pre name="code" class="cpp">extern"C"{
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libswscale/swscale.h>
#include "include/sdl/SDL.h"
#include "include/sdl/SDL_thread.h"
#include "include/libavutil/time.h"
#include "include/libavutil/avstring.h"
#include "libswresample/swresample.h"
}

#pragma comment(lib, "lib/avformat.lib")
#pragma comment(lib, "lib/avcodec.lib")
#pragma comment(lib, "lib/avutil.lib")
#pragma comment(lib, "lib/swscale.lib")
#pragma comment(lib, "lib/swresample.lib")
#pragma comment(lib, "lib/SDL.lib")
#pragma comment(lib, "lib/SDLmain.lib")

#include <stdio.h>
#include<stdlib.h>
#include<string.h>
#include <assert.h>

/*包队列*/
typedef struct PacketQueue {
	***PacketList *first_pkt, *last_pkt;
	int nb_packets;//包个数
	int size;//包大小
	SDL_mutex *mutex;//互斥量
	SDL_cond *cond;//条件量
} PacketQueue;
/*
初始化队列
*/
void packet_queue_init(PacketQueue *q) {
	memset(q, 0, sizeof(PacketQueue));
	q->mutex = SDL_CreateMutex();
	q->cond = SDL_CreateCond();
}
/*
入队操作
*/
int packet_queue_put(PacketQueue *q, ***Packet *pkt) {

	***PacketList *pkt1;
	if (av_dup_packet(pkt) < 0) {
		return -1;
	}
	pkt1 = (***PacketList*)av_malloc(sizeof(***PacketList));
	if (!pkt1)
		return -1;
	pkt1->pkt = *pkt;
	pkt1->next = NULL;

	SDL_LockMutex(q->mutex);
	if (!q->last_pkt)
		q->first_pkt = pkt1;
	else
		q->last_pkt->next = pkt1;
	q->last_pkt = pkt1;
	q->nb_packets++;
	q->size += pkt1->pkt.size;
	SDL_CondSignal(q->cond);
	SDL_UnlockMutex(q->mutex);
	return 0;
}
/*
出队操作
*/
int packet_queue_get(PacketQueue *q, ***Packet *pkt, int block)
{
	***PacketList *pkt1;
	int ret;

	SDL_LockMutex(q->mutex);

	for (;;) {
		pkt1 = q->first_pkt;
		if (pkt1) {
			q->first_pkt = pkt1->next;
			if (!q->first_pkt)
				q->last_pkt = NULL;
			q->nb_packets--;
			q->size -= pkt1->pkt.size;
			*pkt = pkt1->pkt;
			av_free(pkt1);
			ret = 1;
			break;
		}
		else if (!block) {
			ret = 0;
			break;
		}
		else {
			SDL_CondWait(q->cond, q->mutex);
		}
	}
	SDL_UnlockMutex(q->mutex);
	return ret;
}
//生产线程
int produce(void * qq)
{
	PacketQueue *q = (PacketQueue*)qq;
	while (1)
	{
		***Packet* pkt = (***Packet*)malloc(sizeof(***Packet));
		if (packet_queue_put(q, pkt)==0)
		{
			printf("produce %d product!\n",q->nb_packets);
		} 
		SDL_Delay(2);//等待2ms
	}
	return 0;
}
//消费线程
int consume(void * qq)
{
	PacketQueue *q = (PacketQueue*)qq;
	while (1)
	{
	int ret = packet_queue_get(q, pkt, 0);
<span style="white-space:pre">	</span>if (ret ==1)
<span style="white-space:pre">	</span>{
<span style="white-space:pre">		</span>printf("consume a product\n");
<span style="white-space:pre">	</span>}
<span style="white-space:pre">	</span>else if (ret == 0)
<span style="white-space:pre">	</span>{
<span style="white-space:pre">		</span>printf("no product ,wait!\n");
<span style="white-space:pre">	</span>}
<span style="white-space:pre">	</span>
<span style="white-space:pre">		</span>SDL_Delay(1);
	}
	return 0;
}

int main(int argc, char *argv[]) {

	SDL_Thread * threadA;
	SDL_Thread * threadB;
	PacketQueue * q = (PacketQueue*)malloc(sizeof(PacketQueue));
        packet_queue_init(q);
	threadA = SDL_CreateThread(produce, q);
	threadB = SDL_CreateThread(consume, q);

	SDL_Delay(500);
	SDL_KillThread(threadA);
	SDL_KillThread(threadB);
	return 0;
}



运行结果:



从图中可以看到,当生产者向队列放入1产品后,消费者消费了一个产品。当消费者再次取产品的时候,发现没有产品,所以等待。

在程序中,我让生产者线程生产的速度低于消费的速度(一个delay了2毫秒,一个delay了1毫秒),所以才会出现消费者等待的情形。

现在假设队列的容量为3,并当队列满的时候,不允许再向队列放入产品,等待消费者消费了之后再放入产品。实现起来也不难,只需要在入队时做个判断就可以了。源代码如下:

extern"C"{
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libswscale/swscale.h>
#include "include/sdl/SDL.h"
#include "include/sdl/SDL_thread.h"
#include "include/libavutil/time.h"
#include "include/libavutil/avstring.h"
#include "libswresample/swresample.h"
}

#pragma comment(lib, "lib/avformat.lib")
#pragma comment(lib, "lib/avcodec.lib")
#pragma comment(lib, "lib/avutil.lib")
#pragma comment(lib, "lib/swscale.lib")
#pragma comment(lib, "lib/swresample.lib")
#pragma comment(lib, "lib/SDL.lib")
#pragma comment(lib, "lib/SDLmain.lib")

#include <stdio.h>
#include<stdlib.h>
#include<string.h>
#include <assert.h>

/*包队列*/
typedef struct PacketQueue {
	***PacketList *first_pkt, *last_pkt;
	int nb_packets;//包个数
	int size;//包大小
	SDL_mutex *mutex;//互斥量
	SDL_cond *cond;//条件量
	int max_packets;
} PacketQueue;

void packet_queue_init(PacketQueue *q) {
	memset(q, 0, sizeof(PacketQueue));
	q->max_packets = 3;//设置队列容量
	q->mutex = SDL_CreateMutex();
	q->cond = SDL_CreateCond();
}
int packet_queue_put(PacketQueue *q, ***Packet *pkt) {

	if (q->nb_packets>=q->max_packets)//队列满则返回
	{
		return 1;
	}
	***PacketList *pkt1;
	if (av_dup_packet(pkt) < 0) {
		return -1;
	}
	pkt1 = (***PacketList*)av_malloc(sizeof(***PacketList));
	if (!pkt1)
		return -1;
	pkt1->pkt = *pkt;
	pkt1->next = NULL;

	SDL_LockMutex(q->mutex);
	if (!q->last_pkt)
		q->first_pkt = pkt1;
	else
		q->last_pkt->next = pkt1;
	q->last_pkt = pkt1;
	q->nb_packets++;
	q->size += pkt1->pkt.size;
	SDL_CondSignal(q->cond);
	SDL_UnlockMutex(q->mutex);
	return 0;
}
int packet_queue_get(PacketQueue *q, ***Packet *pkt, int block)
{
	***PacketList *pkt1;
	int ret;

	SDL_LockMutex(q->mutex);

	for (;;) {
		pkt1 = q->first_pkt;
		if (pkt1) {
			q->first_pkt = pkt1->next;
			if (!q->first_pkt)
				q->last_pkt = NULL;
			q->nb_packets--;
			q->size -= pkt1->pkt.size;
			*pkt = pkt1->pkt;
			av_free(pkt1);
			ret = 1;
			break;
		}
		else if (!block) {
			ret = 0;
			break;
		}
		else {
			SDL_CondWait(q->cond, q->mutex);
		}
	}
	SDL_UnlockMutex(q->mutex);
	return ret;
}

int produce(void * qq)
{
	PacketQueue *q = (PacketQueue*)qq;
	while (1)
	{
		***Packet* pkt = (***Packet*)malloc(sizeof(***Packet));
		int ret = packet_queue_put(q, pkt);
		if (ret==0)
		{
			printf("produce %dth product!\n",q->nb_packets);
		} 
		else if (ret==1)
		{
			printf("Queue is full,wait!\n");
			SDL_Delay(1);
		}	
	}
	return 0;
}

int consume(void * qq)
{
	PacketQueue *q = (PacketQueue*)qq;
	while (1)
	{
		***Packet* pkt = (***Packet*)malloc(sizeof(***Packet));
		int ret = packet_queue_get(q, pkt, 0);
	if (ret ==1)
	{
		printf("consume a product\n");
	}
	else if (ret == 0)
	{
		printf("no product ,wait!\n");
		SDL_Delay(1);
	}
	}
	return 0;
}

int main(int argc, char *argv[]) {

	SDL_Thread * threadA;
	SDL_Thread * threadB;
	PacketQueue * q = (PacketQueue*)malloc(sizeof(PacketQueue));
    packet_queue_init(q);
	threadA = SDL_CreateThread(produce, q);
	threadB = SDL_CreateThread(consume, q);

	SDL_Delay(500);
	SDL_KillThread(threadA);
	SDL_KillThread(threadB);
	return 0;
}


运行结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: