ACE_Reactor(六)ACE_TP_Reactor
2015-10-19 11:25
447 查看
源码可以到http://www.aoc.nrao.edu/php/tjuerges/ALMA/ACE-5.5.2/html/ace/上去找
虽然ACE_Select_Reactor是灵活的,但是由于只有拥有者才能调用它的handle_events方法,所以在多线程应用程序中它还是有一定的局限性的,因而ACE_Select_Reactor得事件多路分离层的序列化处理可能对某些场景来讲约束太多了。一种解决方法是多个线程去运行独立ACE_Select_Reactor的实例,但是这样需要开发人员去实现一个代理在各反应器之间均衡的分配事件处理,以便达到均衡负载。另一种即这里的ACE_TP_Reactor,基于线程池的Reactor。
其实现了Learder/Follower模型,从而提供了一种有效的并发性模型,多个线程可以针对一组I/O句柄轮流调用select去检测并处理事件。
最关键的地方在于:
1.分派事件(其实就是该线程自己执行事件处理函数)前,会释放Token,让其他线程能够去select或者处理还未处理的事件
2.获取到token的线程,会先检测是否有未处理的事件,若有则先处理,若没有则占用token进行select阻塞检测事件
这里的keypoint1,可以从如下代码中看出来:
上述的第一点,也即是这里的:
说明里也讲的很清楚,会将token释放让其他线程去等待。
那么keypoint2,又是如何完成的了呢?看下面的代码:
这里的实现ACE_TP_Reactor是直接使用的ACE_Select_Reactor_T模板类的,但是其处理也是这样的,先检测是否已经有触发的事件any_ready ,若有则不再去执行select。
Perform Double-Checked Locking Optimization.?
虽然ACE_Select_Reactor是灵活的,但是由于只有拥有者才能调用它的handle_events方法,所以在多线程应用程序中它还是有一定的局限性的,因而ACE_Select_Reactor得事件多路分离层的序列化处理可能对某些场景来讲约束太多了。一种解决方法是多个线程去运行独立ACE_Select_Reactor的实例,但是这样需要开发人员去实现一个代理在各反应器之间均衡的分配事件处理,以便达到均衡负载。另一种即这里的ACE_TP_Reactor,基于线程池的Reactor。
其实现了Learder/Follower模型,从而提供了一种有效的并发性模型,多个线程可以针对一组I/O句柄轮流调用select去检测并处理事件。
最关键的地方在于:
1.分派事件(其实就是该线程自己执行事件处理函数)前,会释放Token,让其他线程能够去select或者处理还未处理的事件
2.获取到token的线程,会先检测是否有未处理的事件,若有则先处理,若没有则占用token进行select阻塞检测事件
这里的keypoint1,可以从如下代码中看出来:
int ACE_TP_Reactor::handle_socket_events ( int & event_count, ACE_TP_Token_Guard & g ) 00466 { 00467 00468 // We got the lock, lets handle some I/O events. 00469 ACE_EH_Dispatch_Info dispatch_info; 00470 00471 this->get_socket_event_info (dispatch_info); 00472 00473 // If there is any event handler that is ready to be dispatched, the 00474 // dispatch information is recorded in dispatch_info. 00475 if (!dispatch_info.dispatch ()) 00476 { 00477 return 0; 00478 } 00479 00480 // Suspend the handler so that other threads don't start dispatching 00481 // it, if we can't suspend then return directly 00482 // 00483 // NOTE: This check was performed in older versions of the 00484 // TP_Reactor. Looks like it is a waste.. 00485 if (dispatch_info.event_handler_ != this->notify_handler_) 00486 if (this->suspend_i (dispatch_info.handle_) == -1) 00487 return 0; 00488 00489 // Call add_reference() if needed. 00490 if (dispatch_info.reference_counting_required_) 00491 dispatch_info.event_handler_->add_reference (); 00492 00493 // Release the lock. Others threads can start waiting. 00494 guard.release_token (); 00495 00496 int result = 0; 00497 00498 // If there was an event handler ready, dispatch it. 00499 // Decrement the event left 00500 --event_count; 00501 00502 // Dispatched an event 00503 if (this->dispatch_socket_event (dispatch_info) == 0) 00504 ++result; 00505 00506 return result; 00507 }
上述的第一点,也即是这里的:
00493 // Release the lock. Others threads can start waiting. 00494 guard.release_token ();
说明里也讲的很清楚,会将token释放让其他线程去等待。
那么keypoint2,又是如何完成的了呢?看下面的代码:
int ACE_TP_Reactor::get_event_for_dispatching ( ACE_Time_Value * max_wait_time ) 00511 { 00512 // If the reactor handler state has changed, clear any remembered 00513 // ready bits and re-scan from the master wait_set. ............. 00534 return this->wait_for_multiple_events (this->ready_set_, 00535 max_wait_time); 00536 } //看下wait_for_multiple_events的实现 template<class ACE_SELECT_REACTOR_TOKEN> int ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::wait_for_multiple_events ( ACE_Select_Reactor_Handle_Set & , ACE_Time_Value * ) 01074 { 01075 ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events"); 01076 u_long width = 0; 01077 ACE_Time_Value timer_buf (0); 01078 ACE_Time_Value *this_timeout; 01079 01080 int number_of_active_handles = this->any_ready (dispatch_set); 01081 01082 // If there are any bits enabled in the <ready_set_> then we'll 01083 // handle those first, otherwise we'll block in <select>. 01084 01085 if (number_of_active_handles == 0) 01086 { 01087 do 01088 { 01089 this_timeout = 01090 this->timer_queue_->calculate_timeout (max_wait_time, 01091 &timer_buf); 01092 width = (u_long) this->handler_rep_.max_handlep1 (); 01093 01094 dispatch_set.rd_mask_ = this->wait_set_.rd_mask_; 01095 dispatch_set.wr_mask_ = this->wait_set_.wr_mask_; 01096 dispatch_set.ex_mask_ = this->wait_set_.ex_mask_; 01097 number_of_active_handles = ACE_OS::select (int (width), 01098 dispatch_set.rd_mask_, 01099 dispatch_set.wr_mask_, 01100 dispatch_set.ex_mask_, 01101 this_timeout); 01102 } ..........
这里的实现ACE_TP_Reactor是直接使用的ACE_Select_Reactor_T模板类的,但是其处理也是这样的,先检测是否已经有触发的事件any_ready ,若有则不再去执行select。
Perform Double-Checked Locking Optimization.?
相关文章推荐
- ACE_Reactor(五)ACE_TP_Reactor和ACE_Select_Reactor的区别
- ACE_Reactor(四):ACE_WFMO_Reactor
- ACE_Reactor(三)ACE_Select_Reactor_T
- ACE_Reactor(二)ACE_Dev_Poll_Reactor
- ACE_Reactor(一)整体理解ACE_Reactor
- React.js model
- React Native For Android 架构初探
- An iOS Developer on React Native一个资深iOS开发者对于React Native具体使用体验
- React Native for Android 实践 — 实现知乎日报客户端
- React Native For Android 架构初探
- React环境搭建
- react native andrroid 学习相关的资料
- React Native 文件模板配置
- ReactiveCocoa基础篇
- Spring reactor框架简介
- ReactiveCocoa 谈谈RACMulticastConnection
- Java Non-Blocking and Asynchronous IO with NIO & NIO.2 (JSR203) - Reactor/Proactor Implementations
- react-native初探
- react-natvie学习
- ReactJs 样式类的使用