python 3下基于select模型的事件驱动机制程序
2017-10-01 13:17
627 查看
它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。这个图和blockingIO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个systemcall(select和recvfrom),而blockingIO只调用了一个systemcall(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的webserver不一定比使用multi-threading+blockingIO的webserver性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IOmultiplexingModel中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socketIO给block。
所以,IOmultiplexingModel的特点就是两个阶段都阻塞,但是等待数据阻塞在select上,拷贝数据阻塞在recfrom上。
在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socketIO给block。因此select()与非阻塞IO类似。
大部分Unix/Linux都支持select函数,该函数用于探测多个文件句柄的状态变化。下面给出select接口的原型:
FD_ZERO(intfd,fd_set*fds)
FD_SET(intfd,fd_set*fds)
FD_ISSET(intfd,fd_set*fds)
FD_CLR(intfd,fd_set*fds)
intselect(intnfds,fd_set*readfds,fd_set*writefds,fd_set*exceptfds,
structtimeval*timeout)
这里,fd_set类型可以简单的理解为按bit位标记句柄的队列,例如要在某fd_set中标记一个值为16的句柄,则该fd_set的第16个bit位被标记为1。具体的置位、验证可使用FD_SET、FD_ISSET等宏实现。在select()函数中,readfds、writefds和exceptfds同时作为输入参数和输出参数。如果输入的readfds标记了16号句柄,则select()将检测16号句柄是否可读。在select()返回后,可以通过检查readfds有否标记16号句柄,来判断该“可读”事件是否发生。另外,用户可以设置timeout时间。
下面将重新模拟上例中从多个客户端接收数据的模型。
图7使用select()的接收数据模型
述模型只是描述了使用select()接口同时从多个客户端接收数据的过程;由于select()接口可以同时对多个句柄进行读状态、写状态和错误状态的探测,所以可以很容易构建为多个客户端提供独立问答服务的服务器系统。如下图。
图8使用select()接口的基于事件驱动的服务器模型
这里需要指出的是,客户端的一个connect()操作,将在服务器端激发一个“可读事件”,所以select()也能探测来自客户端的connect()行为。
上述模型中,最关键的地方是如何动态维护select()的三个参数readfds、writefds和exceptfds。作为输入参数,readfds应该标记所有的需要探测的“可读事件”的句柄,其中永远包括那个探测connect()的那个“母”句柄;同时,writefds和exceptfds应该标记所有需要探测的“可写事件”和“错误事件”的句柄(使用FD_SET()标记)。
作为输出参数,readfds、writefds和exceptfds中的保存了select()捕捉到的所有事件的句柄值。程序员需要检查的所有的标记位(使用FD_ISSET()检查),以确定到底哪些句柄发生了事件。
上述模型主要模拟的是“一问一答”的服务流程,所以如果select()发现某句柄捕捉到了“可读事件”,服务器程序应及时做recv()操作,并根据接收到的数据准备好待发送数据,并将对应的句柄值加入writefds,准备下一次的“可写事件”的select()探测。同样,如果select()发现某句柄捕捉到“可写事件”,则程序应及时做send()操作,并准备好下一次的“可读事件”探测准备。下图描述的是上述模型中的一个执行周期。
图9多路复用模型的一个执行周期
这种模型的特征在于每一个执行周期都会探测一次或一组事件,一个特定的事件会触发某个特定的响应。我们可以将这种模型归类为“事件驱动模型”。
相比其他模型,使用select()的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
但这个模型依旧有着很多问题。首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。如下例,庞大的执行体1的将直接导致响应事件2的执行体迟迟得不到执行,并在很大程度上降低了事件探测的及时性。
图10庞大的执行体对使用select()的事件驱动模型的影响
幸运的是,有很多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有libevent库,还有作为libevent替代者的libev库。这些库会根据操作系统的特点选择最合适的事件探测接口,并且加入了信号(signal)等技术以支持异步响应,这使得这些库成为构建事件驱动模型的不二选择。下章将介绍如何使用libev库替换select或epoll接口,实现高效稳定的服务器模型。
实际上,Linux内核从2.6开始,也引入了支持异步响应的IO操作,如aio_read,aio_write,这就是异步IO。
python下则是将其封装了,对返回值做了修改,相比较原来在C下的返回值(一个整型,判断是否调用成功),python下的调用返回值则是直接返回的可读,可写,异常状态序列。C中的可读,可写,异常状态的序列,则是直接将其写入了参数里面,也就是说输入输出参数都是一样的,python这样的封装设计还是很不错的。我设计了一个粗陋的基于事件机制的select调用:
服务器端:
客户端:
从结果可以看到,客户端会发送空字符过来,查了下资料,这是客户端在确定是否断开,服务器端加个判断就行。
运行截图:
参考:
http://www.cnblogs.com/diegodu/p/3977739.htmlUnix网络编程I/O模型第六章
http://www.cnblogs.com/lancidie/archive/2011/12/13/2286408.html【解决】Select网络模型问题——奇怪的发送接收问题
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。这个图和blockingIO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个systemcall(select和recvfrom),而blockingIO只调用了一个systemcall(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的webserver不一定比使用multi-threading+blockingIO的webserver性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IOmultiplexingModel中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socketIO给block。
所以,IOmultiplexingModel的特点就是两个阶段都阻塞,但是等待数据阻塞在select上,拷贝数据阻塞在recfrom上。
在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socketIO给block。因此select()与非阻塞IO类似。
大部分Unix/Linux都支持select函数,该函数用于探测多个文件句柄的状态变化。下面给出select接口的原型:
FD_ZERO(intfd,fd_set*fds)
FD_SET(intfd,fd_set*fds)
FD_ISSET(intfd,fd_set*fds)
FD_CLR(intfd,fd_set*fds)
intselect(intnfds,fd_set*readfds,fd_set*writefds,fd_set*exceptfds,
structtimeval*timeout)
这里,fd_set类型可以简单的理解为按bit位标记句柄的队列,例如要在某fd_set中标记一个值为16的句柄,则该fd_set的第16个bit位被标记为1。具体的置位、验证可使用FD_SET、FD_ISSET等宏实现。在select()函数中,readfds、writefds和exceptfds同时作为输入参数和输出参数。如果输入的readfds标记了16号句柄,则select()将检测16号句柄是否可读。在select()返回后,可以通过检查readfds有否标记16号句柄,来判断该“可读”事件是否发生。另外,用户可以设置timeout时间。
下面将重新模拟上例中从多个客户端接收数据的模型。
图7使用select()的接收数据模型
述模型只是描述了使用select()接口同时从多个客户端接收数据的过程;由于select()接口可以同时对多个句柄进行读状态、写状态和错误状态的探测,所以可以很容易构建为多个客户端提供独立问答服务的服务器系统。如下图。
图8使用select()接口的基于事件驱动的服务器模型
这里需要指出的是,客户端的一个connect()操作,将在服务器端激发一个“可读事件”,所以select()也能探测来自客户端的connect()行为。
上述模型中,最关键的地方是如何动态维护select()的三个参数readfds、writefds和exceptfds。作为输入参数,readfds应该标记所有的需要探测的“可读事件”的句柄,其中永远包括那个探测connect()的那个“母”句柄;同时,writefds和exceptfds应该标记所有需要探测的“可写事件”和“错误事件”的句柄(使用FD_SET()标记)。
作为输出参数,readfds、writefds和exceptfds中的保存了select()捕捉到的所有事件的句柄值。程序员需要检查的所有的标记位(使用FD_ISSET()检查),以确定到底哪些句柄发生了事件。
上述模型主要模拟的是“一问一答”的服务流程,所以如果select()发现某句柄捕捉到了“可读事件”,服务器程序应及时做recv()操作,并根据接收到的数据准备好待发送数据,并将对应的句柄值加入writefds,准备下一次的“可写事件”的select()探测。同样,如果select()发现某句柄捕捉到“可写事件”,则程序应及时做send()操作,并准备好下一次的“可读事件”探测准备。下图描述的是上述模型中的一个执行周期。
图9多路复用模型的一个执行周期
这种模型的特征在于每一个执行周期都会探测一次或一组事件,一个特定的事件会触发某个特定的响应。我们可以将这种模型归类为“事件驱动模型”。
相比其他模型,使用select()的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
但这个模型依旧有着很多问题。首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。如下例,庞大的执行体1的将直接导致响应事件2的执行体迟迟得不到执行,并在很大程度上降低了事件探测的及时性。
图10庞大的执行体对使用select()的事件驱动模型的影响
幸运的是,有很多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有libevent库,还有作为libevent替代者的libev库。这些库会根据操作系统的特点选择最合适的事件探测接口,并且加入了信号(signal)等技术以支持异步响应,这使得这些库成为构建事件驱动模型的不二选择。下章将介绍如何使用libev库替换select或epoll接口,实现高效稳定的服务器模型。
实际上,Linux内核从2.6开始,也引入了支持异步响应的IO操作,如aio_read,aio_write,这就是异步IO。
python下则是将其封装了,对返回值做了修改,相比较原来在C下的返回值(一个整型,判断是否调用成功),python下的调用返回值则是直接返回的可读,可写,异常状态序列。C中的可读,可写,异常状态的序列,则是直接将其写入了参数里面,也就是说输入输出参数都是一样的,python这样的封装设计还是很不错的。我设计了一个粗陋的基于事件机制的select调用:
服务器端:
importselect importsocket importqueue fromtimeimportsleep classTCPServer: def__init__(self,server,server_address,inputs,outputs,message_queues): #CreateaTCP/IP self.server=server self.server.setblocking(False) #Bindthesockettotheport self.server_address=server_address print('startingupon%sport%s'%self.server_address) self.server.bind(self.server_address) #Listenforincomingconnections self.server.listen(5) #Socketsfromwhichweexpecttoread self.inputs=inputs #Socketstowhichweexpecttowrite #处理要发送的消息 self.outputs=outputs #Outgoingmessagequeues(socket:Queue) self.message_queues=message_queues defhandler_recever(self,readable): #Handleinputs #循环判断是否有客户端连接进来,当有客户端连接进来时select将触发 forsinreadable: #判断当前触发的是不是服务端对象,当触发的对象是服务端对象时,说明有新客户端连接进来了 #表示有新用户来连接 ifsisself.server: #A"readable"socketisreadytoacceptaconnection connection,client_address=s.accept() self.client_address=client_address print('connectionfrom',client_address) #thisisconnectionnotserver connection.setblocking(0) #将客户端对象也加入到监听的列表中,当客户端发送消息时select将触发 self.inputs.append(connection) #Givetheconnectionaqueuefordatawewanttosend #为连接的客户端单独创建一个消息队列,用来保存客户端发送的消息 self.message_queues[connection]=queue.Queue() else: #有老用户发消息,处理接受 #由于客户端连接进来时服务端接收客户端连接请求,将客户端加入到了监听列表中(input_list),客户端发送消息将触发 #所以判断是否是客户端对象触发 data=s.recv(1024) #客户端未断开 ifdata!=b'': #Areadableclientsockethasdata print('received"%s"from%s'%(data,s.getpeername())) #将收到的消息放入到相对应的socket客户端的消息队列中 self.message_queues[s].put(data) #Addoutputchannelforresponse #将需要进行回复操作socket放到output列表中,让select监听 ifsnotinself.outputs: self.outputs.append(s) else: #客户端断开了连接,将客户端的监听从input列表中移除 #Interpretemptyresultasclosedconnection print('closing',s.getpeername())#获取客户端的socket信息 #Stoplisteningforinputontheconnection ifsinself.outputs: self.outputs.remove(s) self.inputs.remove(s) s.close() #Removemessagequeue #移除对应socket客户端对象的消息队列 delself.message_queues[s] return"gotit" defhandler_send(self,writable): #Handleoutputs #如果现在没有客户端请求,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息 #存储哪个客户端发送过消息 forsinwritable: try: #如果消息队列中有消息,从消息队列中获取要发送的消息 message_queue=self.message_queues.get(s) send_data='' ifmessage_queueisnotNone: send_data=message_queue.get_nowait() exceptqueue.Empty: #客户端连接断开了 self.outputs.remove(s) else: #print"sending%sto%s"%(send_data,s.getpeername) #print"sendsomething" ifmessage_queueisnotNone: s.send(send_data) else: print("clienthasclosed") #delmessage_queues[s] #writable.remove(s) #print"Client%sdisconnected"%(client_address) return"gotit" defhandler_exception(self,exceptional): ##Handle"exceptionalconditions" #处理异常的情况 forsinexceptional: print('exceptionconditionon',s.getpeername()) #Stoplisteningforinputontheconnection self.inputs.remove(s) ifsinself.outputs: self.outputs.remove(s) s.close() #Removemessagequeue delself.message_queues[s] return"gotit" defevent_loop(tcpserver,inputs,outputs): whileinputs: #Waitforatleastoneofthesocketstobereadyforprocessing print('waitingforthenextevent') #开始select监听,对input_list中的服务器端server进行监听 #当socket调用send,recv等函数时,就会再次调用此函数,这时返回的第二个参数就会有值 readable,writable,exceptional=select.select(inputs,outputs,inputs) ifreadableisnotNone: tcp_recever=tcpserver.handler_recever(readable) iftcp_recever=='gotit': print("serverhavereceived") ifwritableisnotNone: tcp_send=tcpserver.handler_send(writable) iftcp_send=='gotit': print("serverhavesend") ifexceptionalisnotNone: tcp_exception=tcpserver.handler_exception(exceptional) iftcp_exception=='gotit': print("serverhaveexception") sleep(0.8) if__name__=='__main__': server_address=('localhost',8090) server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) inputs=[server] outputs=[] message_queues={} tcpserver=TCPServer(server,server_address,inputs,outputs,message_queues) #开启事件循环 event_loop(tcpserver,inputs,outputs)
客户端:
importsocket
messages=['Thisisthemessage','Itwillbesent','inparts',]
server_address=('localhost',8090)
#CreateaTCP/IPsocket
socks=[socket.socket(socket.AF_INET,socket.SOCK_STREAM),socket.socket(socket.AF_INET,socket.SOCK_STREAM),]
#Connectthesockettotheportwheretheserverislistening
print('connectingto%sport%s'%server_address)
#连接到服务器
forsinsocks:
s.connect(server_address)
forindex,messageinenumerate(messages):
#Sendmessagesonbothsockets
forsinsocks:
print('%s:sending"%s"'%(s.getsockname(),message+str(index)))
send_data=message+str(index)
s.sendall(bytes(send_data,encoding='utf-8'))
#Readresponsesonbothsockets
forsinsocks:
data=s.recv(1024)
print('%s:received"%s"'%(s.getsockname(),data))
ifdata!="":
print('closingsocket',s.getsockname())
s.close()
从结果可以看到,客户端会发送空字符过来,查了下资料,这是客户端在确定是否断开,服务器端加个判断就行。
运行截图:
参考:
相关文章推荐
- 一种基于任务(事件驱动)的程序运行模型研究
- Python自动化之select、greenlet和gevent和事件驱动模型初探
- 详解Linux2.6内核中基于platform机制的驱动模型
- 基于python yield机制的异步操作同步化编程模型
- WinSock IO模型三: WSAEventSelect 事件机制
- 基于管道化和事件驱动模型的Web请求处理(二)
- Win32基于事件驱动的消息机制(ZZ)
- 详解Linux2.6内核中基于platform机制的驱动模型 (经典)
- 基于管道化和事件驱动模型的Web请求处理(二)
- [转]Linux2.6内核中基于platform机制的驱动模型
- 基于事件的select:WSAEventSelect模型
- 详解Linux2.6内核中基于platform机制的驱动模型<强烈推荐,这是一篇学习驱动模型千年一遇的好文章>
- 详解Linux2.6内核中基于platform机制的驱动模型
- Nginx 之三:nginx服务器模块、web请求处理机制及事件驱动模型、进程功能和进程间通信
- 基于管道化和事件驱动模型的Web请求处理(二)
- 详解Linux2.6内核中基于platform机制的驱动模型 (经典)
- Python:使用基于事件驱动的SAX解析XML
- 【嵌入式Linux学习七步曲之第四篇 Linux内核移植】详解Linux2.6内核中基于platform机制的驱动模型
- 详解Linux2.6内核中基于platform机制的驱动模型-
- 详解Linux2.6内核中基于platform机制的驱动模型