您的位置:首页 > Web前端 > React

ACE_Reactor(四):ACE_WFMO_Reactor

2015-10-19 11:24 363 查看
实现接口不同于类Unix平台上,select在windows上仅仅支持socket句柄的多路分离。而且在Unix平台上,select也不支持同步对象、线程或者SystemV消息队列的多路分离。

所以windows上增加了以WaitForMultipleObjets系统函数替代select的ACE_WFMO_Reactor类。

其新增特性有:

1.因为WaitForMultipleObjets支持多线程同时并发调用,所以不再需要实现owner()函数来进行序列化来按照Leader/Followers的方式轮流进行分发事件。

2.新增了和select相似的对应的Handler_Repository和Reactor_Notify的类,用于处理IO及定时器事件。

3.每一个对handle_events的调用都会等待一个句柄变成活动句柄。但是在获取event时,会遍历获取所有的活动句柄避免其他活动句柄被饿死。

4.ACE_WFMO_Reactor的子类,应用程序能处理所有挂载上的事件以及窗口消息。

ACE_WFMO_Reactor是ACE_Reactor在windows上的默认实现。

00848 ACE_INLINE int
00849 ACE_WFMO_Reactor::handle_events (ACE_Time_Value &how_long)
00850 {
00851   return this->event_handling (&how_long, FALSE);
00852 }


event_handling()函数中存在的dowhile循环中会调用wait_for_multiple_events,这个函数是只要有任意句柄触发就会返回,然后会去调用safe_dispatch函数去分发,这个函数中直接调用了dispatch。而dispatch中会调用dispatch_handles函数,这个函数在说明就直说了“

Dispatches any active handles from handles_[] to handles_[active_handles_] using to poll through our handle set looking for active handles.”。即这个函数中不仅会分发要取的这一个活动句柄,还会把所有的句柄集中的所有的活动句柄都获取到。

5.事件分发时会出现,指向相同处理器handler的多线程化分派。

可能会出现线程间的竞争状态。比如线程一上socket正在处理IO事件,另一个IO事件又被其他线程分发处理了。所以需要显式的对竞争状态进行保护。

01954 int
01955 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
01956                                     DWORD max_handlep1)
01957 {
01958   // Check if there are window messages that need to be dispatched
01959   if (slot == max_handlep1)
01960     return this->dispatch_window_messages ();
01961
01962   // Dispatch the handler if it has not been scheduled for deletion.
01963   // Note that this is a very week test if there are multiple threads
01964   // dispatching this slot as no locks are held here. Generally, you
01965   // do not want to do something like deleting the this pointer in
01966   // handle_close() if you have registered multiple times and there is
01967   // more than one thread in WFMO_Reactor->handle_events().
01968   else if (!this->handler_rep_.scheduled_for_deletion (slot))
01969     {
01970       ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
01971
01972       if (this->handler_rep_.current_info ()[slot].io_entry_)
01973         return this->complex_dispatch_handler (slot,
01974                                                event_handle);
01975       else
01976         return this->simple_dispatch_handler (slot,
01977                                               event_handle);
01978     }
01979   else
01980     // The handle was scheduled for deletion, so we will skip it.
01981     return 0;
01982 }
01983


上述代码是实际分派时,而看到的对于竞争状态的避免,则在如下代码中:

02020 int
02021 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
02022                                             ACE_HANDLE event_handle)
02023 {
02024   // This dispatch is used for I/O entires.
02025
02026   ACE_WFMO_Reactor_Handler_Repository::Current_Info ¤t_info =
02027     this->handler_rep_.current_info ()[slot];
02028
02029   WSANETWORKEVENTS events;
02030   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02031   if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
02032                               event_handle,
02033                               &events) == SOCKET_ERROR)
02034     problems = ACE_Event_Handler::ALL_EVENTS_MASK;
02035   else
02036     {
02037       // Prepare for upcalls. Clear the bits from <events> representing
02038       // events the handler is not interested in. If there are any left,
02039       // do the upcall(s). upcall will replace events.lNetworkEvents
02040       // with bits representing any functions that requested a repeat
02041       // callback before checking handles again. In this case, continue
02042       // to call back unless the handler is unregistered as a result of
02043       // one of the upcalls. The way this is written, the upcalls will
02044       // keep being done even if one or more upcalls reported problems.
02045       // In practice this may turn out not so good, but let's see. If any
02046       // problems, please notify Steve Huston <shuston@riverace.com>
02047       // before or after you change this code.
02048       events.lNetworkEvents &= current_info.network_events_;
02049       while (events.lNetworkEvents != 0)
02050         {
02051           ACE_Event_Handler *event_handler =
02052             current_info.event_handler_;
02053
02054           int reference_counting_required =
02055             event_handler->reference_counting_policy ().value () ==
02056             ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
02057
02058           // Call add_reference() if needed.
02059           if (reference_counting_required)
02060             {
02061               event_handler->add_reference ();
02062             }
02063
02064           // Upcall
02065           problems |= this->upcall (current_info.event_handler_,
02066                                     current_info.io_handle_,
02067                                     events);
02068
02069           // Call remove_reference() if needed.
02070           if (reference_counting_required)
02071             {
02072               event_handler->remove_reference ();
02073             }
02074
02075           if (this->handler_rep_.scheduled_for_deletion (slot))
02076             break;
02077         }
02078     }
02079
02080   if (problems != ACE_Event_Handler::NULL_MASK
02081       && !this->handler_rep_.scheduled_for_deletion (slot)  )
02082     this->handler_rep_.unbind (event_handle, problems);
02083
02084   return 0;
02085 }


须知这里的 events.lNetworkEvents是直接从WSAEnumNetworkEvents中取出来的。用MSDN的原话”Pointe t a WSANETWORKEVENTS structure that i s filled with a recod of network events that occurred and an associated error codes.”在具体的upcall执行时比TP_Select或者Select_T多了很多关于mask的判断。

02087 ACE_Reactor_Mask
02088 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
02089                           ACE_HANDLE io_handle,
02090                           WSANETWORKEVENTS &events)
02091 {
02092   // This method figures out what exactly has happened to the socket
02093   // and then calls appropriate methods.
02094   ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
02095
02096   // Go through the events and do the indicated upcalls. If the handler
02097   // doesn't want to be called back, clear the bit for that event.
02098   // At the end, set the bits back to <events> to request a repeat call.
02099
02100   long actual_events = events.lNetworkEvents;
02101   int action;
02102
02103   if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
02104     {
02105       action = event_handler->handle_output (io_handle);
02106       if (action <= 0)
02107         {
02108           ACE_CLR_BITS (actual_events, FD_WRITE);
02109           if (action == -1)
02110             ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
02111         }
02112     }


需要注意的地方是:

1.waitForMultipleObjects的WRITE_MASK的语义和select不同!如果接受不了,还是切回到ACE_Select_Reactor吧。select可以持续的检测一个socket的可写状态,只要可以就可以一直被检测到。而waitForMultipleObjects则只在第一次被连接时设定WRITE事件,当检测到写事件后就只能一直不停的写或者send直至失败或者返回EWOULDBLOCK.

2.ACE_WFMO_Reactor不再需要在处理事件前,将句柄挂起,然后处理完再恢复。因为当ACE_WFMO_Reactor线程处理事件需要从WSAEnumNetworkEvents()获取该IO事件的掩码时,os会自动清除这个额句柄的内部掩码,最终结果是即使多个线程同时多路分离socket句柄,也只能有一个线程获取IO事件掩码并将分配处理器。

而select没有这样的自动关OS序列化的能力,所以才有挂起和恢复操作。

注:

WSAEventSelect是用来将其他事件对象等和socket进行绑定的函数,然后绑定的socket被用于检测,若socket检测到事件则说明绑定的对象上有事件了。

WSAEnumNetworkEvents用来获取激活的事件IO事件的掩码用于处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: