您的位置:首页 > 其它

同步、异步、阻塞、非阻塞 I/O 及 异步I/O实现

2012-04-18 16:09 169 查看

同步、异步、阻塞、非阻塞I/O

第一部分来自:

http://blog.csdn.net/historyasamirror/archive/2010/07/31/5778378.aspx

Stevens在文章中一共比较了五种IOModel:

blockingIO

nonblockingIO

IOmultiplexing

signaldrivenIO

asynchronousIO

由于signaldrivenIO在实际中并不常用,所以我这只提及剩下的四种IOModel。

再说一下IO发生时涉及的对象和步骤。

对于一个networkIO(这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process(orthread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:

1等待数据准备(Waitingforthedatatobeready)

2将数据从内核拷贝到进程中(Copyingthedatafromthekerneltotheprocess)

记住这两点很重要,因为这些IOModel的区别就是在两个阶段上各有不同的情况。

blockingIO

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:



当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于networkio来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blockingIO的特点就是在IO执行的两个阶段都被block了。

non-blockingIO

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blockingsocket执行读操作时,流程是这个样子:



从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的systemcall,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,用户进程其实是需要不断的主动询问kernel数据好了没有。

IOmultiplexing

IOmultiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为eventdrivenIO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:



当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

这个图和blockingIO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个systemcall(select和recvfrom),而blockingIO只调用了一个systemcall(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的webserver不一定比使用multi-threading+blockingIO的webserver性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IOmultiplexingModel中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socketIO给block。

AsynchronousI/O

linux下的asynchronousIO其实用得很少。先看一下它的流程:



用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronousread之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

到目前为止,已经将四个IOModel都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronousIO和asynchronousIO的区别在哪。

先回答最简单的这个:blockingvsnon-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blockingIO会一直block住对应的进程直到操作完成,而non-blockingIO在kernel还准备数据的情况下会立刻返回。

在说明synchronousIO和asynchronousIO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:

AsynchronousI/OoperationcausestherequestingprocesstobeblockeduntilthatI/Ooperationcompletes;

AnasynchronousI/Ooperationdoesnotcausetherequestingprocesstobeblocked;


两者的区别就在于synchronousIO做”IOoperation”的时候会将process阻塞。按照这个定义,之前所述的blockingIO,non-blockingIO,IOmultiplexing都属于synchronousIO。有人可能会说,non-blockingIO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IOoperation”是指真实的IO操作,就是例子中的recvfrom这个systemcall。non-blockingIO在执行recvfrom这个system
call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronousIO则不一样,当进程发起IO操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

各个IOModel的比较如图所示:



经过上面的介绍,会发现non-blockingIO和asynchronousIO的区别还是很明显的。在non-blockingIO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronousIO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

最后,再举几个不是很恰当的例子来说明这四个IOModel:

有A,B,C,D四个人在钓鱼:

A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;

B的鱼竿有个功能,能够显示是否有鱼上钩,所以呢,B就和旁边的MM聊天,隔会再看看有没有鱼上钩,有的话就迅速拉杆;

C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;

D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信。

异步I/O实现

第二部分来自:

http://www.ibm.com/developerworks/cn/linux/l-async/

其中包含用aio.h的API实现。

下面是用libaio.h的API实现。

初始化异步IO上下文:
intio_queue_init(intmaxevents,io_context_t*ctx);

maxevents:允许提交的最大异步IO请求数目;

ctx:异步IO上下文

返回值:成功返回0,失败返回-errno

初始化IOCB块

异步IO块的数据结构。

structiocb{

void*data;//用于存储回调函数或者字符串

unsignedkey;

shortaio_lio_opcode;//操作码,是读还是写,或者其他

shortaio_reqprio;

intaio_fildes;//


union{

structio_iocb_commonc;

structio_iocb_vectorv;

structio_iocb_pollpoll;

structio_iocb_sockaddrsaddr;

}u;


};

初始化一个异步IO读请求:

inlinevoidio_prep_pread(structiocb*iocb,intfd,void*buf,size_tcount,longlongoffset);


初始化一个异步IO写请求:

inlinevoidio_prep_pwrite(structiocb*iocb,intfd,void*buf,size_tcount,longlongoffset);


对buf进行对齐:

intposix_memalign(void**memptr,size_talignment,size_tsize);


提交异步IO请求:

intio_submit(aio_context_tctx_id,long
nr,structiocb**iocbpp);
ctx_id:异步IO上下文
nr:要提交的异步IO请求的数目
iocbpp:指向异步IO块的指针的指针

获取已完成的异步IO事件:

intio_getevents(aio_context_tctx_id,longmin_nr,longnr,structio_event*events,structtimespec*timeout);

min_nr:等待的最小的事件数

nr:等待的最大事件数

timeout:超时


io_event结构:

#definePADDEDptr(x,y)x;unsignedy

#definePADDEDul(x,y)unsignedlongx;unsignedy


structio_event{

PADDEDptr(void*data,__pad1);//data用于存放回调函数,如果无回调函数,该data也可以存放入字符串这样的信息

PADDEDptr(structiocb*obj,__pad2);//存放iocb对象

PADDEDul(res,__pad3);//实际读到的字符串长度,当所传递的fd是以O_DIRECT打开,但是,buf长度不是AIO_BLKSIZE的时候,该参数被设置为-22,errno为22时,其意义为“Invalidargument”

PADDEDul(res2,__pad4);//出错码,如果出错,则不为0;若未出错,则为0

};


注意的地方:

对于不满足规定的异步IO块大小的文本,使用没有用O_DIRECT打开的文件描述符进行操作


实例:

该程序实现了一个复制文件的功能,运行程序时需要传入两个参数,第一个是原文件名,第二个是目标文件名。

//#define__USE_GNU

#undef_FILE_OFFSET_BITS

#define_FILE_OFFSET_BITS64

#include<stdlib.h>

#include<stdio.h>

#include<unistd.h>

#include<dirent.h>

#include<sys/types.h>

#include<sys/stat.h>

#include<sys/param.h>

#include<fcntl.h>

#include<errno.h>

#include<libaio.h>


#defineAIO_BLKSIZE1024

#defineAIO_MAXIO64


staticintbusy=0;

staticinttocopy=0;

staticintsrcfd=-1;

staticintsrcfd2=-1;

staticintdstfd=-1;

staticintdstfd2=-1;

staticconstchar*dstname=NULL;

staticconstchar*srcname=NULL;


/*fatalerrorhandler*/

staticvoidio_error(constchar*func,intrc)

{

if(rc==-ENOSYS)

fprintf(stdout,"AIOnotinthiskernel/n");

else

fprintf(stdout,"%s:errno%d",func,rc);


if(dstfd>0)

close(dstfd);

if(dstname)

unlink(dstname);

exit(1);

}


/*

*writecompletecallback

*adjustcountsandfreeresources

*/

staticvoidwr_done(io_context_tctx,structiocb*iocb,longres,longres2)

{

if(res2!=0)

{

io_error("aiowrite",res2);

}

if(res!=iocb->u.c.nbytes)

{

fprintf(stdout,"writemissedbytesexpect%dgot%d",iocb->u.c.nbytes,res);

exit(1);

}

//printf("res=%d,res2=%d/n",res,res2);

--tocopy;

--busy;


free(iocb->u.c.buf);

free(iocb);

}


/*

*readcompletecallback

*changereadiocbintoawriteiocbandstartit.

*/

staticvoidrd_done(io_context_tctx,structiocb*iocb,longres,longres2)

{

/*libraryneedsaccessorstolookatiocb*/

intiosize=iocb->u.c.nbytes;

char*buf=(char*)iocb->u.c.buf;

off_toffset=iocb->u.c.offset;

intfd,tmp;

char*wrbuff=NULL;


if(res2!=0)

{

io_error("aioread",res2);

}

if(res!=iosize)

{

fprintf(stdout,"readmissingbytesexpect%dgot%d",iocb->u.c.nbytes,res);

exit(1);

}

if(iocb->aio_fildes==srcfd)

{

fd=dstfd;

}

else

fd=dstfd2;


/*turnreadintowrite*/

tmp=posix_memalign((void**)&wrbuff,getpagesize(),AIO_BLKSIZE);

if(tmp<0)

{

perror("posix_memalign");

exit(1);

}


snprintf(wrbuff,iosize+1,"%s",buf);


//printf("wrbuff-len=%d:%s/n",strlen(wrbuff),wrbuff);

//printf("wrbuff_len=%d/n",strlen(wrbuff));

free(buf);

io_prep_pwrite(iocb,fd,wrbuff,iosize,offset);

io_set_callback(iocb,wr_done);

if(1!=(res=io_submit(ctx,1,&iocb)))

io_error("io_submitwrite",res);

printf("/nsubmit%dwriterequest/n",res);

}

voidm_io_queue_run(io_context_tmyctx)

{

//printf("inm_io_queue_run:/n");


structio_eventevents[AIO_MAXIO];

io_callback_tcb;

intn,i;

n=io_getevents(myctx,1,AIO_MAXIO,events,NULL);

printf("/n%dio_requestcompleted/n/n",n);

for(i=0;i<n;i++)

{

cb=(io_callback_t)events[i].data;

structiocb*io=events[i].obj;

//useepoll_fdtoregisterEPOLLOUT

//heresegmentfault

//printf("events[%d].data=%x,res=%d,res2=%d/n",i,cb,events[i].res,events[i].res2);

cb(myctx,io,events[i].res,events[i].res2);

}

}

intmain(intargc,char*const*argv)

{

structstatst;

//off_tlength=0,offset=0;

longlonglength=0,offset=0;

io_context_tmyctx;

char*buff=NULL;

intleftover;

inttmp;

intlast;

if(argc!=3||argv[1][0]=='-')

{

fprintf(stdout,"Usage:aiocpSOURCEDEST/n");

exit(1);

}

if((srcfd=open(srcname=argv[1],O_RDONLY|O_DIRECT))<0)

{

perror(srcname);

exit(1);

}

if(fstat(srcfd,&st)<0)

{

perror("fstat");

exit(1);

}

length=(longlong)st.st_size;


if((dstfd=open(dstname=argv[2],O_WRONLY|O_CREAT|O_DIRECT,0666))<0)

{

close(srcfd);

perror(dstname);

exit(1);

}

leftover=length%AIO_BLKSIZE;

length-=leftover;

/*initializestatmachine*/

memset(&myctx,0,sizeof(myctx));

io_queue_init(AIO_MAXIO,&myctx);

/*insys/param.h#definehowmany(x,y)(((x)+((y)-1))/(y))*/

tocopy=howmany(length,AIO_BLKSIZE);


while(tocopy>0)

{

inti,rc;

/*submitasmanyreadsasonceaspossibleuptoAIO_MAXIO

*insys/param#defineMIN(a,b)(((a)<(b))?(a):(b))

*/

intn=MIN(MIN(AIO_MAXIO-busy,AIO_MAXIO/2),howmany(length-offset,AIO_BLKSIZE));

if(n>0)

{

structiocb*ioq
;


for(i=0;i<n;i++)

{

structiocb*io=(structiocb*)malloc(sizeof(structiocb));

intiosize=AIO_BLKSIZE;

tmp=posix_memalign((void**)&buff,getpagesize(),AIO_BLKSIZE);

if(tmp<0)

{

perror("posix_memalign");

exit(1);

}

if(NULL==io)

{

fprintf(stdout,"outofmemeory/n");

exit(1);

}


io_prep_pread(io,srcfd,buff,iosize,offset);

io_set_callback(io,rd_done);

ioq[i]=io;

offset+=iosize;


buff=NULL;

}


printf("START.../n/n");

rc=io_submit(myctx,n,ioq);

if(rc<0)

io_error("io_submit",rc);

printf("/nsubmit%dreadrequest/n",rc);

busy+=n;

}

//HandleIO'sthathavecompleted

m_io_queue_run(myctx);



//ifwehavemaximumnumberofi/o'sinflight

//thenwaitforonetocomplete

/*if(busy==AIO_MAXIO)

{

rc=io_queue_wait(myctx,NULL);

if(rc<0)

{

io_error("io_queue_wait",rc);

}

}

*/

}


if(leftover)

{

srcfd2=open(srcname,O_RDONLY);

if(srcfd2<0)

{

perror("srcfd2open");

exit(1);

}

dstfd2=open(dstname,O_WRONLY);

if(dstfd2<0)

{

perror("dstfd2open");

exit(1);

}


structiocb*io=(structiocb*)malloc(sizeof(structiocb));

if(io==NULL)

{

perror("mallocfailed");

exit(1);

}

buff=NULL;

tmp=posix_memalign((void**)&buff,getpagesize(),AIO_BLKSIZE);

if(tmp<0)

{

perror("posix_memalign");

exit(1);

}

io_prep_pread(io,srcfd2,buff,leftover,offset);

io_set_callback(io,rd_done);

if((last=io_submit(myctx,1,&io))<0)

{

perror("io_submit");

exit(1);

}

printf("/nsubmit%diorequest/n",last);

m_io_queue_run(myctx);

}

exit(0);

}


转载自http://blog.csdn.net/linquidx/article/details/5932906

________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________

同步与异步--阻塞与非阻塞型I/O(非常好,转贴)
2010-10-149:53

同步阻塞IO

在这个模型中,应用程序(application)为了执行这个read操作,会调用相应的一个systemcall,将系统控制权交给kernel,然后就进行等待(这其实就是被阻塞了)。kernel开始执行这个systemcall,执行完毕后会向应用程序返回响应,应用程序得到响应后,就不再阻塞,并进行后面的工作。



例如,“在调用read系统调用时,应用程序会阻塞并对内核进行上下文切换。然后会触发读操作,当响应返回时(从我们正在从中读取的设备中返回),数据就被移动到用户空间的缓冲区中。然后应用程序就会解除阻塞(read调用返回)。”

举一个浅显的例子,就好比你去一个银行柜台存钱。首先,你会将存钱的单子填好,然后交给柜员。这里,你就好比是application,单子就是调用的systemcall,柜员就是kernel。提交好单子后,你就坐在柜台前等,相当于开始进行等待。柜员办好以后会给你一个回执,表示办好了,这就是response。然后你就可以拿着回执干其它的事了。注意,这个时候,如果你办完之后马上去查账,存的钱已经打到你的账户上了。后面你会发现,这点很重要。

同步非阻塞IO



在linux下,应用程序可以通过设置文件描述符的属性O_NONBLOCK,I/O操作可以立即返回,但是并不保证I/O操作成功。

也就是说,当应用程序设置了O_NONBLOCK之后,执行write操作,调用相应的systemcall,这个systemcall会从内核中立即返回。但是在这个返回的时间点,数据可能还没有被真正的写入到指定的地方。也就是说,kernel只是很快的返回了这个systemcall(这样,应用程序不会被这个IO操作blocking),但是这个systemcall具体要执行的事情(写数据)可能并没有完成。而对于应用程序,虽然这个IO操作很快就返回了,但是它并不知道这个IO操作是否真的成功了,如果想知道,需要应用程序主动地去问kernel。

这次不是去银行存钱,而是去银行汇款。同样的,你也需要填写汇款单然后交给柜员,柜员进行一些简单的手续处理就能够给你回执。但是,你拿到回执并不意味着钱已经打到了对方的账上。事实上,一般汇款的周期大概是24个小时,如果你要以存钱的模式来汇款的话,意味着你需要在银行等24个小时,这显然是不现实的。所以,同步非阻塞IO在实际生活中也是有它的意义的。

异步阻塞IO



和之前一样,应用程序要执行read操作,因此调用一个systemcall,这个systemcall被传递给了kernel。但在应用程序这边,它调用systemcall之后,并不等待kernel返回response,这一点是和前面两种机制不一样的地方。这也是为什么它被称为异步的原因。但是为什么称其为阻塞呢?这是因为虽然应用程序是一个异步的方式,但是select()函数会将应用程序阻塞住,一直等到这个systemcall有结果返回了,再通知应用程序。也就是说,“在这种模型中,配置的是非阻塞I/O,然后使用阻塞
select系统调用来确定一个I/O描述符何时有操作。”

所以,从IO操作的实际效果来看,异步阻塞IO和第一种同步阻塞IO是一样的,应用程序都是一直等到IO操作成功之后(数据已经被写入或者读取),才开始进行下面的工作。异步阻塞IO的好处在于一个select函数可以为多个描述符提供通知,提高了并发性。

关于提高并发性这点,我们还以银行为例说明。比如说一个银行柜台,现在有10个人想存钱。按照现在银行的做法,一个个排队。第一个人先填存款单,然后提交,然后柜员处理,然后给回执,成功后再轮到下一个人。大家应该都在银行排过对,这样的流程是很痛苦的。如果按照异步阻塞的机制,10个人都填好存款单,然后都提交给柜台,提交完之后所有的10个人就在银行大厅等待。这时候会专门有个人,他会了解存款单处理的情况,一旦有存款单处理完毕,他会将回执交给相应的正在大厅等待的人,这个拿到回执的人就可以去干其他的事情了。而前面提到的这个专人,就对应于select函数。

异步非阻塞IO



如图所示,应用程序提交read请求的systemcall,然后,kernel开始处理相应的IO操作,而同时,应用程序并不等kernel返回响应,就会开始执行其他的处理操作(应用程序没有被IO操作所阻塞)。当kernel执行完毕,返回read的响应,就会产生一个信号或执行一个基于线程的回调函数来完成这次I/O处理过程。

比如银行存钱。现在某银行新开通了一项存钱业务。用户之需要将存款单交给柜台,然后无需等待就可以离开了。柜台办好以后会给用户发送一条短信,告知交易成功。这样用户不需要在柜台前进行长时间的等待,同时,也能够得到确切的消息知道交易完成。

从前面的介绍中可以看出,所谓的同步和异步,在这里指的是application和kernel之间的交互方式。如果application不需要等待kernel的回应,那么它就是异步的。如果application提交完IO请求后,需要等待“回执”,那么它就是同步的。

而阻塞和非阻塞,指的是application是否等待IO操作的完成。如果application必须等到IO操作实际完成以后再执行下面的操作,那么它是阻塞的。反之,如果不等待IO操作的完成就开始执行其它操作,那么它是非阻塞的。

转载自http://hi.baidu.com/commandow/blog/item/6bbc875635c450010df3e321.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: