您的位置:首页 > 其它

用信号量和Posix线程操作来实现双线程高速下载

2015-08-12 13:56 501 查看
什么是信号量?

信号量是一个受保护的变量。对两个或多个进程共享的资源可以提供限制访问的方法。

信号量有2个基本操作,一个是释放,一个是获取,如果某个进程想要获取该信号量,但是信号量的值小于等于0时,那么想要获取该信号量的进程,将会被阻塞,直到有进程释放了该信号量,让信号量的值大于0,才会唤醒被阻塞的某个进程。

信号量可以实现线程和进程间的通信,通过它我们可以轻易实现对临界资源和共享资源的访问。

Posix线程操作?

关于posix线程操作,可以去看看我之前写的一篇文章
linux pthread 多线程操作

我们是否有过这样的需求,从一个大文件中读取数据进行处理,或者编写一个下载工具。

我们以编写一个下载工具为例。假设现在要求写一个FTP客户端,我们该如何写呢?

可能我们会很轻易的想到利用ftp协议建立连接,在数据连接中调用socket接收数据,比如像下面的伪代码一样:

while(1){

recv(sockfd,buffer,DATA_BLOCK_LEN,0);

write_to_file(buffer);

if(isOver)

break;

}

不停的接收数据,接收到数据后,写入文件中,直到数据接收完毕。我们分析一下这种方法的时间复杂度,假设下载整个文件所需要的时间的M(只是考虑下载,不考虑写入),假设把sizeof(该文件)大小的数据写入到文件中需要的时间是N,那么整个下载任务所需要的时间是O(M+N)。

当然,这里其实很明显的可以看到数据下载,和数据写入其实是可以并行的。怎么个并行法?就是将下载数据和数据写入文件分离,用两个线程来完成这个工作。比如说我有两个buffer,分别是buf1和buf2。 首先数据下载到buf1后,这时候我们开始把buf1数据写入,而在buf1数据写入的同时,我们已经在下载新的数据到buf2了,当buf2数据下载完成,且buf1数据写入完成,这时候我们开始把buf2的数据写入,与此同时,也正在下载新的数据到buf1,之后进入一个下载-写入-下载-写入的循环中(只是这个循环中下载和写入是并行的),直到下载结束。

这里说的可能有点绕,用个图表示一下吧



那么并行之后的时间复杂度是多少呢?O(max(M,N))。

老规矩,先来介绍一下信号量的用法,再以一个双线程下载和写入文件的例子结束,。主要是介绍如下几个函数

#include <sys/sem.h>

int semget(key_t key,int nsems,int semflg)

通常我们使用它来创建或者获取一个信号量,它会返回对应的信号量描述符

key_t key 用来标识一个信号量,key必须是一个非0值,或者是一个特殊符号,也可以用IPC_PRIVATE代替,因为此时没有提供关键字,它会自己随机生成一个,这样导致别的进程就无法获取到该信号量了,如果只是要求进程内部线程之间自己通信,那么用它也挺合适。

int nsems,创建的信号量数量,1是单个的信号量,为n(大于1)时创建的是信号量数组

int semflg,可以给该信号量指定权限,以及指定semget函数的行为,semget行为通常有3种,一是创建新的信号量,如果信号量存在则返回该信号量的描述符。二是创建新的信号量,如果信号量已经存在则返回错误。三是获取该信号量(待会大家看到代码就知道怎么用了)

int semop(int semid,struct sembuf* buf,unsigned int size)

该函数用来获取和释放信号量

int semid 要操作的信号量的描述符

struct sembuf* buf 里面有对信号量进行操作的3个变量,指定了操作的相关细节,待会说

unsigned int size,是指第二个参数buf的大小,因为我们可以传递一个sembuf的数组过来,也可以理解为信号量要被操作的数目。

我们看看sembuf的定义:

struct sembuf{

unsigned short sem_num;

short sem_op;

short sem_flg;

}

sem_num 是指要操作的信号量的下标,如果只有一个信号量那么传入0即可,如果我们获取的是信号量数组,那么传入你想操作的信号量的对应下标即可

sem_op 要进行的操作,一般我们都是设为1或者-1,1为释放,-1为获取

sem_flg 可以指定semop的行为,相关细节这里就不多说了,一般传入0

int semctl(int semid,int semnum,int cmd,....)

int semid 描述符

int semnum 要操作的信号量的下标,单个的话传入0即可

int cmd 要执行的命令

还可能有第四个参数,视cmd决定

这个函数一般用于获取和设置信号量的值,以及清除信号量,它的行为由cmd参数控制,除了说的这几种,当然还有不少用法,这里不细说

简要介绍完了信号量,我们来看实例代码。这里我模仿了下载数据,和数据写入文件的过程。先给信号量0赋值为2(因为此时2块缓冲区都可写),信号量1赋值为0

,通过信号量来控制下载线程和写入线程能够正确的并行运行。

#include <stdio.h>

#include <stdlib.h>

#include <sys/sem.h>

#include <sys/types.h>

#include <pthread.h>

#include <string.h>

//sem_key

#define MY_SEM_ID 1234

// buf and the data wo need to download

char data[100];

char buf1[11];

char buf2[11];

int down_index;

int write_index;

bool isDownloadOver;

//the thread download and write

void* downloadThread(void* arg);

void* writeThread(void* arg);

int main()

{

pthread_t pid_download,pid_write;

//init the data

for(int i=0;i<100;i++)

data[i]=i+'0';

isDownloadOver=false;

write_index=0;

down_index=0;

//Create semphores

int semid=semget(IPC_PRIVATE,2 ,0600|IPC_CREAT); //0600 是权限,表示该用户拥有读写权限

semctl(semid,0,SETVAL,2);

semctl(semid,1,SETVAL,0);

if(-1== pthread_create(&pid_download,NULL,downloadThread,&semid))

printf("Create DownloadThread Fail\n");

if(-1==pthread_create(&pid_write,NULL,writeThread,&semid))

printf("Create WriteThread Fail\n");

//recovery the resource

pthread_join(pid_download,NULL);

pthread_join(pid_write,NULL);

//remove the semphores

semctl(semid,0,IPC_RMID);

return 0;

}

void* downloadThread(void* arg){

int semid=*(int*)arg;

// the byte have downloaded

int numOfByte=0;

sembuf sb;

sb.sem_flg=0;

int i;

while(1){

sb.sem_num=0;

sb.sem_op= -1;

if(semop(semid,&sb,1)==-1)

printf("down_index semop error\n");

if(down_index%2==0){

for(i=0;i<10&&numOfByte<=100;i++)

buf1[i]=data[numOfByte++];

buf1[i]='\0';

printf("download data is %s\n",buf1);

}

else{

for(i=0;i<10&&numOfByte<=100;i++)

buf2[i]=data[numOfByte++];

buf2[i]='\0';

printf("download data is %s\n",buf2);

}

sb.sem_op=1;

sb.sem_num=1;

if(semop(semid,&sb,1)==-1)

printf("release write_index semop error\n");

down_index++;

if(numOfByte==100)

{

isDownloadOver=true;

break;

}

}

printf("Download Success\nthe final down_index is %d\n",down_index);

return 0;

}

void* writeThread(void* arg){

int semid=*(int*)arg;

sembuf sb;

sb.sem_flg=0;

char file[11];

int i;

int len;

while(1){

sb.sem_num=1;

sb.sem_op= -1;

if(semop(semid,&sb,1)==-1)

printf("Write_index semop error\n");

if(write_index%2==0)

{

len=strlen(buf1);

for(i=0;i<len;i++)

file[i]=buf1[i];

file[i]='\0';

printf("write data is %s\n",file);

}

else

{

len=strlen(buf2);

for(i=0;i<len;i++)

file[i]=buf2[i];

file[i]='\0';

printf("write data is %s\n",file);

}

sb.sem_op=1;

sb.sem_num=0;

if(semop(semid,&sb,1)==-1)

printf("release down_index fail\n");

write_index++;

if(isDownloadOver&&(write_index==down_index))

break;

}

printf("Write Success\nthe final write_index is %d\n",write_index);

return 0;

}

最后再提一点,如果你想看到自己创建的信号量有哪些 可以打开终端 输入 ipcs -s

这里我们依然是用到线程间的通信,下次将为大家带来进程间通信的例子,讲解共享内存的用法。

若有错误的话或者不明白的地方欢迎提出。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: