ACE_Reactor(五)ACE_TP_Reactor和ACE_Select_Reactor的区别
2015-10-19 11:25
483 查看
在ACE_Select_Reactor_T的handle_events中,进去就会获取Token,调到ACE_Guard直至ACE_Token的share_acquire函数,会调用一个sleepHook,这个hook的代码是:
由于这个Nofity实际是在handle_events
的最初始的地方,之后才是实际的多路复用函数select,后才到分发处理的dispatch。但是TP_Reactor是没有的,造成这个区别是什么呢?对比如下代码
以及
这两个函数功能基本相同,所以后半部分类似,但是前面这个地方对于获取Token,则有较大区别。
ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1);
上述两份代码中使用的token_的声明是同一段代码即:
不过ACE_Select_Reactor_T中的这个Guard宏展开就是
而在这个ACE_Guard的构造函数中,会预先调用acquire函数,在这个函数里会引发调用token_的acquire()函数继而调用到了ACE_Token的shared_acquire,因为其token类型是ACE_REACTOR_TOKEN_T最终调用了的就是这个类中的sleep_hook,其特点就是会调用ACE_Select_Reactor_Notify的notify接口。
好吧,那么现在是不是有另外一个问题,如果按这么说似乎每次ACE_GUARD_RETURN 都会notify,导致select被解除阻塞,那这无效事件是不是就太多了。实际肯定不是这样的,关键肯定是在ACE_Token的shared_acquire函数中是否有去调含有notify的sleep_hook,其代码如下:
从代码中可以看到第212行会判断这个token的owner是否就是本线程,若是则直接返回ile根部就走不到254行的sleep_hook()函数。所以这里的Token是一个可重入的递归锁,reactor所属线程是可以直接获取到锁执行的,而其他线程则会调用notify后加入等待token的线程队列中,reactor主线程会被打断后分配事件处理,在dispatch前释放token,那个其他线程获取到更改reactor的状态,待其他线程释放token后,reactor主线程获取到token又可以开始下一次的检测和处理。
ACE_TP_Reactor没有像 Select_Reactor 一样使用一般的 ACE_GUARD_RETURN 宏,后者间接定义的是 ACE_Guard 模板对象,在构造器中默认会调用 LOCK 的 acquire 方法;而 ACE_TP_Token_Guard 则不会在构造器中调用任何锁的方法,而是对象构造后,使用 grab_token/acquire_token 来显示获取锁,析构器中调用 release 则是一致的。
这里有两个明显区别,一是调用 acqure_read,ACE_Token 有两个等待队列,读队列与写队列,写队列拥有更高的优先级,其它接口一般是调用 acqure 在写队列上等待,也就是说,如果两个线程,一个想运行事件循环,一个想修改 TP_Reactor 状态,那么后者将先被调度;二是提供锁的 sleep_hook 参数,这样一来,当取得锁失败 (被其它线程占有),它将调用这个传入的参数,而不是锁自己的同名回调函数,而传入的这个函数 no_op_sleep_hook 顾名思义是一个空方法,也就是说,它覆盖了默认向 Reactor 发送通知的实现,当多个线程同时运行事件循环时,保持当前线程处在 select 调用中。否则就如同之前对 Select_Reactor 的分析一样,多个线程会交替通知,导致 Reactor 无法正常工作。
上面的流程基本讲清了关于Token使用的区别,可能又有人问了,既然这里ACE_TP_Reactor的handle_events没有使用ACE_GUARD_RETURN去notify,那么这么说来其他线程更改reactor状态时这里还能及时去调整后select吗?需要说明的是,ACE_TP_Reactor的handle_events不用ACE_GUARD_RETURN去notify,
- 一、多线程的各线程调run_event_loop不会因为发Notify,而导致其他线程上的select被频繁无效打断,引起功能不正常。
- 二、无论是否使用ACE_GUARD_RETURN,不同线程仍是互斥的,而同线程仍然是可重入的。
至于任何线程上调整reactor的状态,比如调用register_handler,register_handler的实现中会使用ACE_GUARD_RETURN,发送notify消息,让拿到Token的reactor线程及时打断select去更新状态。
如果此时客户端连接服务器并发送数据,线程 M 从 select 中唤醒,处理 IO 事件,在处理前,先将此句柄从 Reactor 中挂起 (suspend),接着释放 Token,然后开始分派事件,最后从 Reactor 中恢复该句柄。在 Select_Reactor 中挂起句柄就是从侦听的句柄集合中移到备用集合,恢复操作正好相反,这样的处理主要是考虑到当线程 M 还未分派此句柄上的事件时,线程 N 进入 select 发现该句柄仍处在激活状态,并尝试对它继续分派,造成多次重复分派。开始分派事件前,线程 M 的侦测工作就正式结束了,所以它释放锁,这会唤醒在锁上等待的线程 N,线程 N 获取锁后,进入 select 接着再侦测事件,由此完成了一次线程交接班。线程 M 分派完事件后,又会回到事件循环开始,等待进入锁,如果整个线程池只有 M 一个线程,那就只有等它分派完事件后,才可以再次进行事件检测了,这就退化到了和 Select_Reactor 一致的情形。使用 TP_Reactor 后,分派事件的线程确实不在被限制为主线程了。
总之, 站在 Select_Reactor 的肩膀上,TP_Reactor 只做了少量修改,就实现了上述线程模型,具体罗列如下:
1. 固定打开 Reactor Notify 机制;
2. 固定压制分派通知事件的 renew 调用 (通过 supress_notify_renew 接口);
3. 侦测到事件后,一个线程只分派一个事件就返回到事件循环开始处;
4. 在分派事件前,先释放 Token;
5. 对于 IO 事件,分派前先 suspend 之;分派后,再 resume 之;
6. 事件循环中使用 ACE_TP_Token_Guard 取代一般的 ACE_GUARD 宏来获取 Token。
函数中,有如下代码:
最终调用到ACE_Select_Reactor_Notify的handle
_input,在这里
如果是普通的ACE_Select_Reactor的reactor,则renew是可以用的。可以在打断select后,执行事件处理后,将需要检测的事件handles刷新,重新检测。
网络例子代码:
/article/9196399.html
参考文档:
http://ace.acejoy.com/thread-5804-1-1.html
00058 // Used to wakeup the reactor. 00059 00060 template <class ACE_TOKEN_TYPE> void 00061 ACE_Reactor_Token_T<ACE_TOKEN_TYPE>::sleep_hook (void) 00062 { 00063 ACE_TRACE ("ACE_Reactor_Token_T::sleep_hook"); 00064 if (this->reactor_->notify () == -1) 00065 ACE_ERROR ((LM_ERROR, 00066 ACE_LIB_TEXT ("%p\n"), 00067 ACE_LIB_TEXT ("sleep_hook failed"))); 00068 } 00069
由于这个Nofity实际是在handle_events
的最初始的地方,之后才是实际的多路复用函数select,后才到分发处理的dispatch。但是TP_Reactor是没有的,造成这个区别是什么呢?对比如下代码
01385 template <class ACE_SELECT_REACTOR_TOKEN> int 01386 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events 01387 (ACE_Time_Value *max_wait_time) 01388 { 01389 ACE_TRACE ("ACE_Select_Reactor_T::handle_events"); 01390 01391 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0) 01392 01393 // Stash the current time -- the destructor of this object will 01394 // automatically compute how much time elapsed since this method was 01395 // called. 01396 ACE_Countdown_Time countdown (max_wait_time); 01397 01398 ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1); 。。。。。。。
以及
00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time) 00144 { 00145 ACE_TRACE ("ACE_TP_Reactor::handle_events"); 00146 00147 // Stash the current time -- the destructor of this object will 00148 // automatically compute how much time elapsed since this method was 00149 // called. 00150 ACE_Countdown_Time countdown (max_wait_time); 00151 00152 // 00153 // The order of these events is very subtle, modify with care. 00154 // 00155 00156 // Instantiate the token guard which will try grabbing the token for 00157 // this thread. 00158 ACE_TP_Token_Guard guard (this->token_); 00159 00160 int const result = guard.acquire_read_token (max_wait_time); 00161
这两个函数功能基本相同,所以后半部分类似,但是前面这个地方对于获取Token,则有较大区别。
ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN, ace_mon, this->token_, -1);
上述两份代码中使用的token_的声明是同一段代码即:
00681 /// Synchronization token for the MT_SAFE ACE_Select_Reactor. 00682 ACE_SELECT_REACTOR_TOKEN token_;
不过ACE_Select_Reactor_T中的这个Guard宏展开就是
ACE_Guard< ACE_SELECT_REACTOR_TOKEN> ace_mon(this->token_); \ if (ace_mon.locked () != 0) { ;; } \ else { return -1; }
而在这个ACE_Guard的构造函数中,会预先调用acquire函数,在这个函数里会引发调用token_的acquire()函数继而调用到了ACE_Token的shared_acquire,因为其token类型是ACE_REACTOR_TOKEN_T最终调用了的就是这个类中的sleep_hook,其特点就是会调用ACE_Select_Reactor_Notify的notify接口。
好吧,那么现在是不是有另外一个问题,如果按这么说似乎每次ACE_GUARD_RETURN 都会notify,导致select被解除阻塞,那这无效事件是不是就太多了。实际肯定不是这样的,关键肯定是在ACE_Token的shared_acquire函数中是否有去调含有notify的sleep_hook,其代码如下:
00183 int 00184 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *), 00185 void *arg, 00186 ACE_Time_Value *timeout, 00187 ACE_Token_Op_Type op_type) 00188 { 00189 ACE_TRACE ("ACE_Token::shared_acquire"); 00190 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1); 00191 00192 #if defined (DEBUGGING) 00193 this->dump (); 00194 #endif /* DEBUGGING */ 00195 00196 ACE_thread_t thr_id = ACE_Thread::self (); 00197 00198 // Nobody holds the token. 00199 if (!this->in_use_) 00200 { 00201 // Its mine! 00202 this->in_use_ = op_type; 00203 this->owner_ = thr_id; 00204 return 0; 00205 } 00206 00207 // 00208 // Someone already holds the token. 00209 // 00210 00211 // Check if it is us. 00212 if (ACE_OS::thr_equal (thr_id, this->owner_)) 00213 { 00214 ++this->nesting_level_; 00215 return 0; 00216 } 00217 00218 // Do a quick check for "polling" behavior. 00219 if (timeout != 0 && timeout->sec () == 0 && timeout->usec () == 0) 00220 { 00221 errno = ETIME; 00222 return -1; 00223 } 00224 00225 // 00226 // We've got to sleep until we get the token. 00227 // 00228 00229 // Which queue we should end up in... 00230 ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN 00231 ? &this->readers_ 00232 : &this->writers_); 00233 00234 // Allocate queue entry on stack. This works since we don't exit 00235 // this method's activation record until we've got the token. 00236 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_, 00237 thr_id, 00238 this->attributes_); 00239 queue->insert_entry (my_entry, this->queueing_strategy_); 00240 ++this->waiters_; 00241 00242 // Execute appropriate <sleep_hook> callback. (@@ should these 00243 // methods return a success/failure status, and if so, what should 00244 // we do with it?) 00245 int ret = 0; 00246 if (sleep_hook_func) 00247 { 00248 (*sleep_hook_func) (arg); 00249 ++ret; 00250 } 00251 else 00252 { 00253 // Execute virtual method. 00254 this->sleep_hook (); 00255 ++ret; 00256 } 。。。。。。。。
从代码中可以看到第212行会判断这个token的owner是否就是本线程,若是则直接返回ile根部就走不到254行的sleep_hook()函数。所以这里的Token是一个可重入的递归锁,reactor所属线程是可以直接获取到锁执行的,而其他线程则会调用notify后加入等待token的线程队列中,reactor主线程会被打断后分配事件处理,在dispatch前释放token,那个其他线程获取到更改reactor的状态,待其他线程释放token后,reactor主线程获取到token又可以开始下一次的检测和处理。
ACE_TP_Reactor没有像 Select_Reactor 一样使用一般的 ACE_GUARD_RETURN 宏,后者间接定义的是 ACE_Guard 模板对象,在构造器中默认会调用 LOCK 的 acquire 方法;而 ACE_TP_Token_Guard 则不会在构造器中调用任何锁的方法,而是对象构造后,使用 grab_token/acquire_token 来显示获取锁,析构器中调用 release 则是一致的。
这里有两个明显区别,一是调用 acqure_read,ACE_Token 有两个等待队列,读队列与写队列,写队列拥有更高的优先级,其它接口一般是调用 acqure 在写队列上等待,也就是说,如果两个线程,一个想运行事件循环,一个想修改 TP_Reactor 状态,那么后者将先被调度;二是提供锁的 sleep_hook 参数,这样一来,当取得锁失败 (被其它线程占有),它将调用这个传入的参数,而不是锁自己的同名回调函数,而传入的这个函数 no_op_sleep_hook 顾名思义是一个空方法,也就是说,它覆盖了默认向 Reactor 发送通知的实现,当多个线程同时运行事件循环时,保持当前线程处在 select 调用中。否则就如同之前对 Select_Reactor 的分析一样,多个线程会交替通知,导致 Reactor 无法正常工作。
上面的流程基本讲清了关于Token使用的区别,可能又有人问了,既然这里ACE_TP_Reactor的handle_events没有使用ACE_GUARD_RETURN去notify,那么这么说来其他线程更改reactor状态时这里还能及时去调整后select吗?需要说明的是,ACE_TP_Reactor的handle_events不用ACE_GUARD_RETURN去notify,
- 一、多线程的各线程调run_event_loop不会因为发Notify,而导致其他线程上的select被频繁无效打断,引起功能不正常。
- 二、无论是否使用ACE_GUARD_RETURN,不同线程仍是互斥的,而同线程仍然是可重入的。
至于任何线程上调整reactor的状态,比如调用register_handler,register_handler的实现中会使用ACE_GUARD_RETURN,发送notify消息,让拿到Token的reactor线程及时打断select去更新状态。
如果此时客户端连接服务器并发送数据,线程 M 从 select 中唤醒,处理 IO 事件,在处理前,先将此句柄从 Reactor 中挂起 (suspend),接着释放 Token,然后开始分派事件,最后从 Reactor 中恢复该句柄。在 Select_Reactor 中挂起句柄就是从侦听的句柄集合中移到备用集合,恢复操作正好相反,这样的处理主要是考虑到当线程 M 还未分派此句柄上的事件时,线程 N 进入 select 发现该句柄仍处在激活状态,并尝试对它继续分派,造成多次重复分派。开始分派事件前,线程 M 的侦测工作就正式结束了,所以它释放锁,这会唤醒在锁上等待的线程 N,线程 N 获取锁后,进入 select 接着再侦测事件,由此完成了一次线程交接班。线程 M 分派完事件后,又会回到事件循环开始,等待进入锁,如果整个线程池只有 M 一个线程,那就只有等它分派完事件后,才可以再次进行事件检测了,这就退化到了和 Select_Reactor 一致的情形。使用 TP_Reactor 后,分派事件的线程确实不在被限制为主线程了。
总之, 站在 Select_Reactor 的肩膀上,TP_Reactor 只做了少量修改,就实现了上述线程模型,具体罗列如下:
1. 固定打开 Reactor Notify 机制;
2. 固定压制分派通知事件的 renew 调用 (通过 supress_notify_renew 接口);
3. 侦测到事件后,一个线程只分派一个事件就返回到事件循环开始处;
4. 在分派事件前,先释放 Token;
5. 对于 IO 事件,分派前先 suspend 之;分派后,再 resume 之;
6. 事件循环中使用 ACE_TP_Token_Guard 取代一般的 ACE_GUARD 宏来获取 Token。
01271 template <class ACE_SELECT_REACTOR_TOKEN> int 01272 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch 01273 (int active_handle_count, 01274 ACE_Select_Reactor_Handle_Set &dispatch_set)
函数中,有如下代码:
01340 // Next dispatch the notification handlers (if there are any to 01341 // dispatch). These are required to handle multi-threads that 01342 // are trying to update the <Reactor>. 01343 01344 else if (this->dispatch_notification_handlers 01345 (dispatch_set, 01346 active_handle_count, 01347 other_handlers_dispatched) == -1) 01348 // State has changed or a serious failure has occured, so exit 01349 // loop. 01350 break;
最终调用到ACE_Select_Reactor_Notify的handle
_input,在这里
01051 int 01052 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle) 01053 { 01054 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input"); 01055 // Precondition: this->select_reactor_.token_.current_owner () == 01056 // ACE_Thread::self (); 01057 01058 int number_dispatched = 0; 01059 int result = 0; 01060 ACE_Notification_Buffer buffer; 01061 01062 while ((result = this->read_notify_pipe (handle, buffer)) > 0) 01063 { 01064 // Dispatch the buffer 01065 // NOTE: We count only if we made any dispatches ie. upcalls. 01066 if (this->dispatch_notify (buffer) > 0) 01067 ++number_dispatched; 01068 01069 // Bail out if we've reached the <notify_threshold_>. Note that 01070 // by default <notify_threshold_> is -1, so we'll loop until all 01071 // the notifications in the pipe have been dispatched. 01072 if (number_dispatched == this->max_notify_iterations_) 01073 break; 01074 } 01075 01076 // Reassign number_dispatched to -1 if things have gone seriously 01077 // wrong. 01078 if (result < 0) 01079 number_dispatched = -1; 01080 01081 // Enqueue ourselves into the list of waiting threads. When we 01082 // reacquire the token we'll be off and running again with ownership 01083 // of the token. The postcondition of this call is that 01084 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>. 01085 this->select_reactor_->renew (); 01086 return number_dispatched; 01087 }
如果是普通的ACE_Select_Reactor的reactor,则renew是可以用的。可以在打断select后,执行事件处理后,将需要检测的事件handles刷新,重新检测。
网络例子代码:
/article/9196399.html
参考文档:
http://ace.acejoy.com/thread-5804-1-1.html
相关文章推荐
- ACE_Reactor(四):ACE_WFMO_Reactor
- ACE_Reactor(三)ACE_Select_Reactor_T
- ACE_Reactor(二)ACE_Dev_Poll_Reactor
- ACE_Reactor(一)整体理解ACE_Reactor
- React.js model
- React Native For Android 架构初探
- An iOS Developer on React Native一个资深iOS开发者对于React Native具体使用体验
- React Native for Android 实践 — 实现知乎日报客户端
- React Native For Android 架构初探
- React环境搭建
- react native andrroid 学习相关的资料
- React Native 文件模板配置
- ReactiveCocoa基础篇
- Spring reactor框架简介
- ReactiveCocoa 谈谈RACMulticastConnection
- Java Non-Blocking and Asynchronous IO with NIO & NIO.2 (JSR203) - Reactor/Proactor Implementations
- react-native初探
- react-natvie学习
- ReactJs 样式类的使用
- ReactiveCocoa 谈谈concat