同步、异步、阻塞、非阻塞 I/O 及 异步I/O实现
2012-04-18 16:09
169 查看
同步、异步、阻塞、非阻塞I/O
第一部分来自:Stevens在文章中一共比较了五种IOModel:
blockingIO
nonblockingIO
IOmultiplexing
signaldrivenIO
asynchronousIO
由于signaldrivenIO在实际中并不常用,所以我这只提及剩下的四种IOModel。
再说一下IO发生时涉及的对象和步骤。
对于一个networkIO(这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process(orthread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:
1等待数据准备(Waitingforthedatatobeready)
2将数据从内核拷贝到进程中(Copyingthedatafromthekerneltotheprocess)
记住这两点很重要,因为这些IOModel的区别就是在两个阶段上各有不同的情况。
blockingIO
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于networkio来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blockingIO的特点就是在IO执行的两个阶段都被block了。
non-blockingIO
linux下,可以通过设置socket使其变为non-blocking。当对一个non-blockingsocket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的systemcall,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
IOmultiplexing
IOmultiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为eventdrivenIO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blockingIO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个systemcall(select和recvfrom),而blockingIO只调用了一个systemcall(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的webserver不一定比使用multi-threading+blockingIO的webserver性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IOmultiplexingModel中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socketIO给block。
AsynchronousI/O
linux下的asynchronousIO其实用得很少。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronousread之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
到目前为止,已经将四个IOModel都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronousIO和asynchronousIO的区别在哪。
先回答最简单的这个:blockingvsnon-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blockingIO会一直block住对应的进程直到操作完成,而non-blockingIO在kernel还准备数据的情况下会立刻返回。
在说明synchronousIO和asynchronousIO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
AsynchronousI/OoperationcausestherequestingprocesstobeblockeduntilthatI/Ooperationcompletes;
AnasynchronousI/Ooperationdoesnotcausetherequestingprocesstobeblocked;
两者的区别就在于synchronousIO做”IOoperation”的时候会将process阻塞。按照这个定义,之前所述的blockingIO,non-blockingIO,IOmultiplexing都属于synchronousIO。有人可能会说,non-blockingIO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IOoperation”是指真实的IO操作,就是例子中的recvfrom这个systemcall。non-blockingIO在执行recvfrom这个system
call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronousIO则不一样,当进程发起IO操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
各个IOModel的比较如图所示:
经过上面的介绍,会发现non-blockingIO和asynchronousIO的区别还是很明显的。在non-blockingIO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronousIO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。
最后,再举几个不是很恰当的例子来说明这四个IOModel:
有A,B,C,D四个人在钓鱼:
A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;
B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B就和旁边的MM聊天,隔会再看看有没有鱼上钩,有的话就迅速拉杆;
C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;
D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信。
异步I/O实现
第二部分来自:其中包含用aio.h的API实现。
下面是用libaio.h的API实现。
初始化异步IO上下文:
intio_queue_init(intmaxevents,io_context_t*ctx);
maxevents:允许提交的最大异步IO请求数目;
ctx:异步IO上下文
返回值:成功返回0,失败返回-errno
初始化IOCB块
异步IO块的数据结构。
structiocb{
void*data;//用于存储回调函数或者字符串
unsignedkey;
shortaio_lio_opcode;//操作码,是读还是写,或者其他
shortaio_reqprio;
intaio_fildes;//
union{
structio_iocb_commonc;
structio_iocb_vectorv;
structio_iocb_pollpoll;
structio_iocb_sockaddrsaddr;
}u;
};
初始化一个异步IO读请求:
inlinevoidio_prep_pread(structiocb*iocb,intfd,void*buf,size_tcount,longlongoffset);
初始化一个异步IO写请求:
inlinevoidio_prep_pwrite(structiocb*iocb,intfd,void*buf,size_tcount,longlongoffset);
对buf进行对齐:
intposix_memalign(void**memptr,size_talignment,size_tsize);
提交异步IO请求:
intio_submit(aio_context_tctx_id,long
nr,structiocb**iocbpp);
ctx_id:异步IO上下文
nr:要提交的异步IO请求的数目
iocbpp:指向异步IO块的指针的指针
获取已完成的异步IO事件:
intio_getevents(aio_context_tctx_id,longmin_nr,longnr,structio_event*events,structtimespec*timeout);
min_nr:等待的最小的事件数
nr:等待的最大事件数
timeout:超时
io_event结构:
#definePADDEDptr(x,y)x;unsignedy
#definePADDEDul(x,y)unsignedlongx;unsignedy
structio_event{
PADDEDptr(void*data,__pad1);//data用于存放回调函数,如果无回调函数,该data也可以存放入字符串这样的信息
PADDEDptr(structiocb*obj,__pad2);//存放iocb对象
PADDEDul(res,__pad3);//实际读到的字符串长度,当所传递的fd是以O_DIRECT打开,但是,buf长度不是AIO_BLKSIZE的时候,该参数被设置为-22,errno为22时,其意义为“Invalidargument”
PADDEDul(res2,__pad4);//出错码,如果出错,则不为0;若未出错,则为0
};
注意的地方:
对于不满足规定的异步IO块大小的文本,使用没有用O_DIRECT打开的文件描述符进行操作
实例:
该程序实现了一个复制文件的功能,运行程序时需要传入两个参数,第一个是原文件名,第二个是目标文件名。
//#define__USE_GNU
#undef_FILE_OFFSET_BITS
#define_FILE_OFFSET_BITS64
#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>
#include<dirent.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<sys/param.h>
#include<fcntl.h>
#include<errno.h>
#include<libaio.h>
#defineAIO_BLKSIZE1024
#defineAIO_MAXIO64
staticintbusy=0;
staticinttocopy=0;
staticintsrcfd=-1;
staticintsrcfd2=-1;
staticintdstfd=-1;
staticintdstfd2=-1;
staticconstchar*dstname=NULL;
staticconstchar*srcname=NULL;
/*fatalerrorhandler*/
staticvoidio_error(constchar*func,intrc)
{
if(rc==-ENOSYS)
fprintf(stdout,"AIOnotinthiskernel/n");
else
fprintf(stdout,"%s:errno%d",func,rc);
if(dstfd>0)
close(dstfd);
if(dstname)
unlink(dstname);
exit(1);
}
/*
*writecompletecallback
*adjustcountsandfreeresources
*/
staticvoidwr_done(io_context_tctx,structiocb*iocb,longres,longres2)
{
if(res2!=0)
{
io_error("aiowrite",res2);
}
if(res!=iocb->u.c.nbytes)
{
fprintf(stdout,"writemissedbytesexpect%dgot%d",iocb->u.c.nbytes,res);
exit(1);
}
//printf("res=%d,res2=%d/n",res,res2);
--tocopy;
--busy;
free(iocb->u.c.buf);
free(iocb);
}
/*
*readcompletecallback
*changereadiocbintoawriteiocbandstartit.
*/
staticvoidrd_done(io_context_tctx,structiocb*iocb,longres,longres2)
{
/*libraryneedsaccessorstolookatiocb*/
intiosize=iocb->u.c.nbytes;
char*buf=(char*)iocb->u.c.buf;
off_toffset=iocb->u.c.offset;
intfd,tmp;
char*wrbuff=NULL;
if(res2!=0)
{
io_error("aioread",res2);
}
if(res!=iosize)
{
fprintf(stdout,"readmissingbytesexpect%dgot%d",iocb->u.c.nbytes,res);
exit(1);
}
if(iocb->aio_fildes==srcfd)
{
fd=dstfd;
}
else
fd=dstfd2;
/*turnreadintowrite*/
tmp=posix_memalign((void**)&wrbuff,getpagesize(),AIO_BLKSIZE);
if(tmp<0)
{
perror("posix_memalign");
exit(1);
}
snprintf(wrbuff,iosize+1,"%s",buf);
//printf("wrbuff-len=%d:%s/n",strlen(wrbuff),wrbuff);
//printf("wrbuff_len=%d/n",strlen(wrbuff));
free(buf);
io_prep_pwrite(iocb,fd,wrbuff,iosize,offset);
io_set_callback(iocb,wr_done);
if(1!=(res=io_submit(ctx,1,&iocb)))
io_error("io_submitwrite",res);
printf("/nsubmit%dwriterequest/n",res);
}
voidm_io_queue_run(io_context_tmyctx)
{
//printf("inm_io_queue_run:/n");
structio_eventevents[AIO_MAXIO];
io_callback_tcb;
intn,i;
n=io_getevents(myctx,1,AIO_MAXIO,events,NULL);
printf("/n%dio_requestcompleted/n/n",n);
for(i=0;i<n;i++)
{
cb=(io_callback_t)events[i].data;
structiocb*io=events[i].obj;
//useepoll_fdtoregisterEPOLLOUT
//heresegmentfault
//printf("events[%d].data=%x,res=%d,res2=%d/n",i,cb,events[i].res,events[i].res2);
cb(myctx,io,events[i].res,events[i].res2);
}
}
intmain(intargc,char*const*argv)
{
structstatst;
//off_tlength=0,offset=0;
longlonglength=0,offset=0;
io_context_tmyctx;
char*buff=NULL;
intleftover;
inttmp;
intlast;
if(argc!=3||argv[1][0]=='-')
{
fprintf(stdout,"Usage:aiocpSOURCEDEST/n");
exit(1);
}
if((srcfd=open(srcname=argv[1],O_RDONLY|O_DIRECT))<0)
{
perror(srcname);
exit(1);
}
if(fstat(srcfd,&st)<0)
{
perror("fstat");
exit(1);
}
length=(longlong)st.st_size;
if((dstfd=open(dstname=argv[2],O_WRONLY|O_CREAT|O_DIRECT,0666))<0)
{
close(srcfd);
perror(dstname);
exit(1);
}
leftover=length%AIO_BLKSIZE;
length-=leftover;
/*initializestatmachine*/
memset(&myctx,0,sizeof(myctx));
io_queue_init(AIO_MAXIO,&myctx);
/*insys/param.h#definehowmany(x,y)(((x)+((y)-1))/(y))*/
tocopy=howmany(length,AIO_BLKSIZE);
while(tocopy>0)
{
inti,rc;
/*submitasmanyreadsasonceaspossibleuptoAIO_MAXIO
*insys/param#defineMIN(a,b)(((a)<(b))?(a):(b))
*/
intn=MIN(MIN(AIO_MAXIO-busy,AIO_MAXIO/2),howmany(length-offset,AIO_BLKSIZE));
if(n>0)
{
structiocb*ioq
;
for(i=0;i<n;i++)
{
structiocb*io=(structiocb*)malloc(sizeof(structiocb));
intiosize=AIO_BLKSIZE;
tmp=posix_memalign((void**)&buff,getpagesize(),AIO_BLKSIZE);
if(tmp<0)
{
perror("posix_memalign");
exit(1);
}
if(NULL==io)
{
fprintf(stdout,"outofmemeory/n");
exit(1);
}
io_prep_pread(io,srcfd,buff,iosize,offset);
io_set_callback(io,rd_done);
ioq[i]=io;
offset+=iosize;
buff=NULL;
}
printf("START.../n/n");
rc=io_submit(myctx,n,ioq);
if(rc<0)
io_error("io_submit",rc);
printf("/nsubmit%dreadrequest/n",rc);
busy+=n;
}
//HandleIO'sthathavecompleted
m_io_queue_run(myctx);
//ifwehavemaximumnumberofi/o'sinflight
//thenwaitforonetocomplete
/*if(busy==AIO_MAXIO)
{
rc=io_queue_wait(myctx,NULL);
if(rc<0)
{
io_error("io_queue_wait",rc);
}
}
*/
}
if(leftover)
{
srcfd2=open(srcname,O_RDONLY);
if(srcfd2<0)
{
perror("srcfd2open");
exit(1);
}
dstfd2=open(dstname,O_WRONLY);
if(dstfd2<0)
{
perror("dstfd2open");
exit(1);
}
structiocb*io=(structiocb*)malloc(sizeof(structiocb));
if(io==NULL)
{
perror("mallocfailed");
exit(1);
}
buff=NULL;
tmp=posix_memalign((void**)&buff,getpagesize(),AIO_BLKSIZE);
if(tmp<0)
{
perror("posix_memalign");
exit(1);
}
io_prep_pread(io,srcfd2,buff,leftover,offset);
io_set_callback(io,rd_done);
if((last=io_submit(myctx,1,&io))<0)
{
perror("io_submit");
exit(1);
}
printf("/nsubmit%diorequest/n",last);
m_io_queue_run(myctx);
}
exit(0);
}
转载自http://blog.csdn.net/linquidx/article/details/5932906
________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________
同步与异步--阻塞与非阻塞型I/O(非常好,转贴)
2010-10-149:53
同步阻塞IO
在这个模型中,应用程序(application)为了执行这个read操作,会调用相应的一个systemcall,将系统控制权交给kernel,然后就进行等待(这其实就是被阻塞了)。kernel开始执行这个systemcall,执行完毕后会向应用程序返回响应,应用程序得到响应后,就不再阻塞,并进行后面的工作。
例如,“在调用read系统调用时,应用程序会阻塞并对内核进行上下文切换。然后会触发读操作,当响应返回时(从我们正在从中读取的设备中返回),数据就被移动到用户空间的缓冲区中。然后应用程序就会解除阻塞(read调用返回)。”
举一个浅显的例子,就好比你去一个银行柜台存钱。首先,你会将存钱的单子填好,然后交给柜员。这里,你就好比是application,单子就是调用的systemcall,柜员就是kernel。提交好单子后,你就坐在柜台前等,相当于开始进行等待。柜员办好以后会给你一个回执,表示办好了,这就是response。然后你就可以拿着回执干其它的事了。注意,这个时候,如果你办完之后马上去查账,存的钱已经打到你的账户上了。后面你会发现,这点很重要。
同步非阻塞IO在linux下,应用程序可以通过设置文件描述符的属性O_NONBLOCK,I/O操作可以立即返回,但是并不保证I/O操作成功。
也就是说,当应用程序设置了O_NONBLOCK之后,执行write操作,调用相应的systemcall,这个systemcall会从内核中立即返回。但是在这个返回的时间点,数据可能还没有被真正的写入到指定的地方。也就是说,kernel只是很快的返回了这个systemcall(这样,应用程序不会被这个IO操作blocking),但是这个systemcall具体要执行的事情(写数据)可能并没有完成。而对于应用程序,虽然这个IO操作很快就返回了,但是它并不知道这个IO操作是否真的成功了,如果想知道,需要应用程序主动地去问kernel。
这次不是去银行存钱,而是去银行汇款。同样的,你也需要填写汇款单然后交给柜员,柜员进行一些简单的手续处理就能够给你回执。但是,你拿到回执并不意味着钱已经打到了对方的账上。事实上,一般汇款的周期大概是24个小时,如果你要以存钱的模式来汇款的话,意味着你需要在银行等24个小时,这显然是不现实的。所以,同步非阻塞IO在实际生活中也是有它的意义的。
异步阻塞IO
和之前一样,应用程序要执行read操作,因此调用一个systemcall,这个systemcall被传递给了kernel。但在应用程序这边,它调用systemcall之后,并不等待kernel返回response,这一点是和前面两种机制不一样的地方。这也是为什么它被称为异步的原因。但是为什么称其为阻塞呢?这是因为虽然应用程序是一个异步的方式,但是select()函数会将应用程序阻塞住,一直等到这个systemcall有结果返回了,再通知应用程序。也就是说,“在这种模型中,配置的是非阻塞I/O,然后使用阻塞
select系统调用来确定一个I/O描述符何时有操作。”
所以,从IO操作的实际效果来看,异步阻塞IO和第一种同步阻塞IO是一样的,应用程序都是一直等到IO操作成功之后(数据已经被写入或者读取),才开始进行下面的工作。异步阻塞IO的好处在于一个select函数可以为多个描述符提供通知,提高了并发性。
关于提高并发性这点,我们还以银行为例说明。比如说一个银行柜台,现在有10个人想存钱。按照现在银行的做法,一个个排队。第一个人先填存款单,然后提交,然后柜员处理,然后给回执,成功后再轮到下一个人。大家应该都在银行排过对,这样的流程是很痛苦的。如果按照异步阻塞的机制,10个人都填好存款单,然后都提交给柜台,提交完之后所有的10个人就在银行大厅等待。这时候会专门有个人,他会了解存款单处理的情况,一旦有存款单处理完毕,他会将回执交给相应的正在大厅等待的人,这个拿到回执的人就可以去干其他的事情了。而前面提到的这个专人,就对应于select函数。
异步非阻塞IO如图所示,应用程序提交read请求的systemcall,然后,kernel开始处理相应的IO操作,而同时,应用程序并不等kernel返回响应,就会开始执行其他的处理操作(应用程序没有被IO操作所阻塞)。当kernel执行完毕,返回read的响应,就会产生一个信号或执行一个基于线程的回调函数来完成这次I/O处理过程。
比如银行存钱。现在某银行新开通了一项存钱业务。用户之需要将存款单交给柜台,然后无需等待就可以离开了。柜台办好以后会给用户发送一条短信,告知交易成功。这样用户不需要在柜台前进行长时间的等待,同时,也能够得到确切的消息知道交易完成。
从前面的介绍中可以看出,所谓的同步和异步,在这里指的是application和kernel之间的交互方式。如果application不需要等待kernel的回应,那么它就是异步的。如果application提交完IO请求后,需要等待“回执”,那么它就是同步的。
而阻塞和非阻塞,指的是application是否等待IO操作的完成。如果application必须等到IO操作实际完成以后再执行下面的操作,那么它是阻塞的。反之,如果不等待IO操作的完成就开始执行其它操作,那么它是非阻塞的。
转载自http://hi.baidu.com/commandow/blog/item/6bbc875635c450010df3e321.html相关文章推荐
- java的nio之:java的bio流下实现的socket服务器同步阻塞模型和socket的伪异步的socket服务器的通信模型
- 同步、异步、阻塞、非阻塞 I/O 及 异步I/O实现
- 同步,异步,阻塞,非阻塞
- 异步,同步,阻塞,非阻塞,并行,并发,
- Windows I/O模型、同步/异步、阻塞/非阻塞
- 阻塞与非阻塞,同步与异步I/O
- IO - 同步,异步,阻塞,非阻塞 (亡羊补牢篇) -- 保存link
- socket阻塞非阻塞,同步异步
- 同步、异步、阻塞和非阻塞的概念
- 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO
- 直击阻塞,非阻塞,异步,同步四者联系与区别
- 同步异步,阻塞非阻塞 和nginx的IO模型
- 并发 并行 同步 异步 多线程 阻塞 非阻塞的区别
- 关于IO的同步,异步,阻塞,非阻塞
- IO中同步、异步与阻塞、非阻塞的区别
- IO - 同步,异步,阻塞,非阻塞
- 知乎上对理解阻塞非阻塞与同步异步的有趣解释
- 使用C++11实现一个半同步半异步线程池
- 异步、同步与阻塞、非阻塞的区别
- 同步与异步、阻塞与非阻塞