ACE_Reactor(四):ACE_WFMO_Reactor
2015-10-19 11:24
363 查看
实现接口不同于类Unix平台上,select在windows上仅仅支持socket句柄的多路分离。而且在Unix平台上,select也不支持同步对象、线程或者SystemV消息队列的多路分离。
所以windows上增加了以WaitForMultipleObjets系统函数替代select的ACE_WFMO_Reactor类。
其新增特性有:
1.因为WaitForMultipleObjets支持多线程同时并发调用,所以不再需要实现owner()函数来进行序列化来按照Leader/Followers的方式轮流进行分发事件。
2.新增了和select相似的对应的Handler_Repository和Reactor_Notify的类,用于处理IO及定时器事件。
3.每一个对handle_events的调用都会等待一个句柄变成活动句柄。但是在获取event时,会遍历获取所有的活动句柄避免其他活动句柄被饿死。
4.ACE_WFMO_Reactor的子类,应用程序能处理所有挂载上的事件以及窗口消息。
ACE_WFMO_Reactor是ACE_Reactor在windows上的默认实现。
event_handling()函数中存在的dowhile循环中会调用wait_for_multiple_events,这个函数是只要有任意句柄触发就会返回,然后会去调用safe_dispatch函数去分发,这个函数中直接调用了dispatch。而dispatch中会调用dispatch_handles函数,这个函数在说明就直说了“
Dispatches any active handles from handles_[] to handles_[active_handles_] using to poll through our handle set looking for active handles.”。即这个函数中不仅会分发要取的这一个活动句柄,还会把所有的句柄集中的所有的活动句柄都获取到。
5.事件分发时会出现,指向相同处理器handler的多线程化分派。
可能会出现线程间的竞争状态。比如线程一上socket正在处理IO事件,另一个IO事件又被其他线程分发处理了。所以需要显式的对竞争状态进行保护。
上述代码是实际分派时,而看到的对于竞争状态的避免,则在如下代码中:
须知这里的 events.lNetworkEvents是直接从WSAEnumNetworkEvents中取出来的。用MSDN的原话”Pointe t a WSANETWORKEVENTS structure that i s filled with a recod of network events that occurred and an associated error codes.”在具体的upcall执行时比TP_Select或者Select_T多了很多关于mask的判断。
需要注意的地方是:
1.waitForMultipleObjects的WRITE_MASK的语义和select不同!如果接受不了,还是切回到ACE_Select_Reactor吧。select可以持续的检测一个socket的可写状态,只要可以就可以一直被检测到。而waitForMultipleObjects则只在第一次被连接时设定WRITE事件,当检测到写事件后就只能一直不停的写或者send直至失败或者返回EWOULDBLOCK.
2.ACE_WFMO_Reactor不再需要在处理事件前,将句柄挂起,然后处理完再恢复。因为当ACE_WFMO_Reactor线程处理事件需要从WSAEnumNetworkEvents()获取该IO事件的掩码时,os会自动清除这个额句柄的内部掩码,最终结果是即使多个线程同时多路分离socket句柄,也只能有一个线程获取IO事件掩码并将分配处理器。
而select没有这样的自动关OS序列化的能力,所以才有挂起和恢复操作。
注:
WSAEventSelect是用来将其他事件对象等和socket进行绑定的函数,然后绑定的socket被用于检测,若socket检测到事件则说明绑定的对象上有事件了。
WSAEnumNetworkEvents用来获取激活的事件IO事件的掩码用于处理。
所以windows上增加了以WaitForMultipleObjets系统函数替代select的ACE_WFMO_Reactor类。
其新增特性有:
1.因为WaitForMultipleObjets支持多线程同时并发调用,所以不再需要实现owner()函数来进行序列化来按照Leader/Followers的方式轮流进行分发事件。
2.新增了和select相似的对应的Handler_Repository和Reactor_Notify的类,用于处理IO及定时器事件。
3.每一个对handle_events的调用都会等待一个句柄变成活动句柄。但是在获取event时,会遍历获取所有的活动句柄避免其他活动句柄被饿死。
4.ACE_WFMO_Reactor的子类,应用程序能处理所有挂载上的事件以及窗口消息。
ACE_WFMO_Reactor是ACE_Reactor在windows上的默认实现。
00848 ACE_INLINE int 00849 ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long) 00850 { 00851 return this->event_handling (&how_long, FALSE); 00852 }
event_handling()函数中存在的dowhile循环中会调用wait_for_multiple_events,这个函数是只要有任意句柄触发就会返回,然后会去调用safe_dispatch函数去分发,这个函数中直接调用了dispatch。而dispatch中会调用dispatch_handles函数,这个函数在说明就直说了“
Dispatches any active handles from handles_[] to handles_[active_handles_] using to poll through our handle set looking for active handles.”。即这个函数中不仅会分发要取的这一个活动句柄,还会把所有的句柄集中的所有的活动句柄都获取到。
5.事件分发时会出现,指向相同处理器handler的多线程化分派。
可能会出现线程间的竞争状态。比如线程一上socket正在处理IO事件,另一个IO事件又被其他线程分发处理了。所以需要显式的对竞争状态进行保护。
01954 int 01955 ACE_WFMO_Reactor::dispatch_handler (DWORD slot, 01956 DWORD max_handlep1) 01957 { 01958 // Check if there are window messages that need to be dispatched 01959 if (slot == max_handlep1) 01960 return this->dispatch_window_messages (); 01961 01962 // Dispatch the handler if it has not been scheduled for deletion. 01963 // Note that this is a very week test if there are multiple threads 01964 // dispatching this slot as no locks are held here. Generally, you 01965 // do not want to do something like deleting the this pointer in 01966 // handle_close() if you have registered multiple times and there is 01967 // more than one thread in WFMO_Reactor->handle_events(). 01968 else if (!this->handler_rep_.scheduled_for_deletion (slot)) 01969 { 01970 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot); 01971 01972 if (this->handler_rep_.current_info ()[slot].io_entry_) 01973 return this->complex_dispatch_handler (slot, 01974 event_handle); 01975 else 01976 return this->simple_dispatch_handler (slot, 01977 event_handle); 01978 } 01979 else 01980 // The handle was scheduled for deletion, so we will skip it. 01981 return 0; 01982 } 01983
上述代码是实际分派时,而看到的对于竞争状态的避免,则在如下代码中:
02020 int 02021 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot, 02022 ACE_HANDLE event_handle) 02023 { 02024 // This dispatch is used for I/O entires. 02025 02026 ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info = 02027 this->handler_rep_.current_info ()[slot]; 02028 02029 WSANETWORKEVENTS events; 02030 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK; 02031 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_, 02032 event_handle, 02033 &events) == SOCKET_ERROR) 02034 problems = ACE_Event_Handler::ALL_EVENTS_MASK; 02035 else 02036 { 02037 // Prepare for upcalls. Clear the bits from <events> representing 02038 // events the handler is not interested in. If there are any left, 02039 // do the upcall(s). upcall will replace events.lNetworkEvents 02040 // with bits representing any functions that requested a repeat 02041 // callback before checking handles again. In this case, continue 02042 // to call back unless the handler is unregistered as a result of 02043 // one of the upcalls. The way this is written, the upcalls will 02044 // keep being done even if one or more upcalls reported problems. 02045 // In practice this may turn out not so good, but let's see. If any 02046 // problems, please notify Steve Huston <shuston@riverace.com> 02047 // before or after you change this code. 02048 events.lNetworkEvents &= current_info.network_events_; 02049 while (events.lNetworkEvents != 0) 02050 { 02051 ACE_Event_Handler *event_handler = 02052 current_info.event_handler_; 02053 02054 int reference_counting_required = 02055 event_handler->reference_counting_policy ().value () == 02056 ACE_Event_Handler::Reference_Counting_Policy::ENABLED; 02057 02058 // Call add_reference() if needed. 02059 if (reference_counting_required) 02060 { 02061 event_handler->add_reference (); 02062 } 02063 02064 // Upcall 02065 problems |= this->upcall (current_info.event_handler_, 02066 current_info.io_handle_, 02067 events); 02068 02069 // Call remove_reference() if needed. 02070 if (reference_counting_required) 02071 { 02072 event_handler->remove_reference (); 02073 } 02074 02075 if (this->handler_rep_.scheduled_for_deletion (slot)) 02076 break; 02077 } 02078 } 02079 02080 if (problems != ACE_Event_Handler::NULL_MASK 02081 && !this->handler_rep_.scheduled_for_deletion (slot) ) 02082 this->handler_rep_.unbind (event_handle, problems); 02083 02084 return 0; 02085 }
须知这里的 events.lNetworkEvents是直接从WSAEnumNetworkEvents中取出来的。用MSDN的原话”Pointe t a WSANETWORKEVENTS structure that i s filled with a recod of network events that occurred and an associated error codes.”在具体的upcall执行时比TP_Select或者Select_T多了很多关于mask的判断。
02087 ACE_Reactor_Mask 02088 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler, 02089 ACE_HANDLE io_handle, 02090 WSANETWORKEVENTS &events) 02091 { 02092 // This method figures out what exactly has happened to the socket 02093 // and then calls appropriate methods. 02094 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK; 02095 02096 // Go through the events and do the indicated upcalls. If the handler 02097 // doesn't want to be called back, clear the bit for that event. 02098 // At the end, set the bits back to <events> to request a repeat call. 02099 02100 long actual_events = events.lNetworkEvents; 02101 int action; 02102 02103 if (ACE_BIT_ENABLED (actual_events, FD_WRITE)) 02104 { 02105 action = event_handler->handle_output (io_handle); 02106 if (action <= 0) 02107 { 02108 ACE_CLR_BITS (actual_events, FD_WRITE); 02109 if (action == -1) 02110 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK); 02111 } 02112 }
需要注意的地方是:
1.waitForMultipleObjects的WRITE_MASK的语义和select不同!如果接受不了,还是切回到ACE_Select_Reactor吧。select可以持续的检测一个socket的可写状态,只要可以就可以一直被检测到。而waitForMultipleObjects则只在第一次被连接时设定WRITE事件,当检测到写事件后就只能一直不停的写或者send直至失败或者返回EWOULDBLOCK.
2.ACE_WFMO_Reactor不再需要在处理事件前,将句柄挂起,然后处理完再恢复。因为当ACE_WFMO_Reactor线程处理事件需要从WSAEnumNetworkEvents()获取该IO事件的掩码时,os会自动清除这个额句柄的内部掩码,最终结果是即使多个线程同时多路分离socket句柄,也只能有一个线程获取IO事件掩码并将分配处理器。
而select没有这样的自动关OS序列化的能力,所以才有挂起和恢复操作。
注:
WSAEventSelect是用来将其他事件对象等和socket进行绑定的函数,然后绑定的socket被用于检测,若socket检测到事件则说明绑定的对象上有事件了。
WSAEnumNetworkEvents用来获取激活的事件IO事件的掩码用于处理。
相关文章推荐
- ACE_Reactor(三)ACE_Select_Reactor_T
- ACE_Reactor(二)ACE_Dev_Poll_Reactor
- ACE_Reactor(一)整体理解ACE_Reactor
- React.js model
- React Native For Android 架构初探
- An iOS Developer on React Native一个资深iOS开发者对于React Native具体使用体验
- React Native for Android 实践 — 实现知乎日报客户端
- React Native For Android 架构初探
- React环境搭建
- react native andrroid 学习相关的资料
- React Native 文件模板配置
- ReactiveCocoa基础篇
- Spring reactor框架简介
- ReactiveCocoa 谈谈RACMulticastConnection
- Java Non-Blocking and Asynchronous IO with NIO & NIO.2 (JSR203) - Reactor/Proactor Implementations
- react-native初探
- react-natvie学习
- ReactJs 样式类的使用
- ReactiveCocoa 谈谈concat
- React-Native学习指南