您的位置:首页 > 其它

ACE_Future实现了单写多读模式,可以用来异步方法调用的结果返回

2004-12-16 18:22 811 查看
ACE_Future
实现了单写多读模式,可以用来异步方法调用的结果返回

激活对象调度者,继承的线程基类ACE_Task_Base
1) 方法请求出列svr()
2) 方法请求入列work()
3) 方法请求处理work_i()



ACE_Method_call的call函数函数建立了Prime_Scheduler类方法请求处理函数work_i关联.具体是ACE_Method_call的函数Call调用转调Prime_Scheduler函数work_i(),work_i()返回结果值返回到set函数中,然后用了ACE_Future的set函数再到ACE_Future_Rep的set函数来产生期望结果值value_并保存再ACE_Futrue_Rep中.然后ACE_Futrue的函数ready检查是否有结果值,它也是通过ACE_Future_Rep的ready函数检测set函数产生的值指针是否未空来实现,ACE_Furture函数get去获取结果值.

ACE_Method_Request类中包含一个call纯虚函数
class ACE_Export ACE_Method_Request
{
public:
ACE_Method_Request (unsigned long priority = 0);
virtual ~ACE_Method_Request (void);
unsigned long priority (void) const;
void priority (unsigned long prio);
virtual int call (void) = 0;
protected:
unsigned long priority_;
}

class Prime_Scheduler : public ACE_Task_Base
{
public:
// 初始化
virtual int open (void *args = 0);

//终止
virtual int close (u_long flags = 0);

//激活对象的代理接口的部分方法
ACE_Future<u_long> work (u_long param, int count = 1);
ACE_Future<const ACE_TCHAR*> name (void);
void end (void);

protected:
//运行事件循环
virtual int svc (void);

//上面两个方法work,name的实现
u_long work_i (u_long, int);
const ACE_TCHAR *name_i (void);

private:
//激活对象队列
ACE_Activation_Queue activation_queue_;

//激活对象指针
Prime_Scheduler *scheduler_;
}

1)请求出列
//开始处理激活对象队列中的方法请求调用
int Prime_Scheduler::svc (void)
{
for (;;)
{
// 出列下一个方法调用请求 (我们使用智能指针保证异常mo的清除)
auto_ptr<ACE_Method_Request> mo (this->activation_queue_.dequeue ());

ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) calling method request/n")));
// 调用它
if (mo->call () == -1)
break;
}
}

2)请求入列
调度者的需要请求工作入列函数work,主要是将方法请求入列
ACE_Future<u_long>
Prime_Scheduler::work (u_long newparam,
int newcount)
{
// 判断调度者的内部的调度者指针
if (this->scheduler_) {
return this->scheduler_->work (newparam, newcount);
}

// 没有的话直接使用激活对象队列
else {
ACE_Future<u_long> new_future;

//方法请求入列.
this->activation_queue_.enqueue
(new Method_Request_work (this,
newparam,
newcount,
new_future));
return new_future;
}
}

3) 请求处理
// 这里是Work 发生的地方. 该方法在Method_Request_work类中call函数中调用
u_long
Prime_Scheduler::work_i (u_long param,
int count)
{
ACE_UNUSED_ARG (count);
return ACE::is_prime (param, 2, param / 2);
}

方法请求工作类Method_Request_work的实现,构造函数传入了激活对象调度者
class Method_Request_work : public ACE_Method_Request
{
Method_Request_work (Prime_Scheduler *,
u_long,
int,
ACE_Future<u_long> &);

virtual int call (void)
{
// 派发servant的操作和保存结果到Futrue中,我们注意到set的参数1是
// 调度者的work_i函数
return this->future_result_.set (this->scheduler_->work_i
(this->param_,
this->count_));
}
pivate:
Prime_Scheduler *scheduler_;
u_long param_;
int count_;
ACE_Future<u_long> future_result_;
}

int main()
{
Prime_Scheduler *andres, *peter, *helmut, *matias;
创建四个激活对象.

for (int i = 0; i < n_loops; i++)
{
{
ACE_Future<u_long> fresulta;
ACE_Future<u_long> fresultb;
ACE_Future<u_long> fresultc;
ACE_Future<u_long> fresultd;
ACE_Future<u_long> fresulte;
ACE_Future<const ACE_TCHAR *> fname;

//方法请求入列,等待对象激活调用,他们是在调度线程svr中异步执行
fresulta = andres->work (9013);
fresultb = peter->work (9013);
fresultc = helmut->work (9013);
fresultd = matias->work (9013);

//看是否有结果
if (fresulta.ready ())

if (i % 3 == 0)
{
// 三次以上取消futures...
fresulta.cancel (10ul);
fresultb.cancel (20ul);
fresultc.cancel (30ul);
fresultd.cancel (40ul);
}

// 获取结果
u_long resulta = 0, resultb = 0, resultc = 0, resultd = 0, resulte = 0;

fresulta.get (resulta);
fresultb.get (resultb);
fresultc.get (resultc);
fresultd.get (resultd);
fresulte.get (resulte);
}
}

// 关闭激活对象
andres->end ();
peter->end ();
helmut->end ();
matias->end ();

delete andres;
delete peter;
delete helmut;
delete matias;

}

//实现了但写多读模式,可以用来异步方法调用的结果返回,ACE_Future重载了=操作符.
template <class T>
class ACE_Future
{
public:
// 多个构造函数
ACE_Future (void);
ACE_Future (const ACE_Future<T> &r);
ACE_Future (const T &r);
~ACE_Future (void);

// 操作符重载
void operator = (const ACE_Future<T> &r);
int operator == (const ACE_Future<T> &r) const;
int operator != (const ACE_Future<T> &r) const;
operator T ();

int cancel (const T &r);
int cancel (void);
int set (const T &r);
int get (T &value, ACE_Time_Value *tv = 0) const;
int ready (void) const;
int attach (ACE_Future_Observer<T> *observer);
int detach (ACE_Future_Observer<T> *observer);
void dump (void) const;

ACE_Future_Rep<T> *get_rep (void);
ACE_ALLOC_HOOK_DECLARE;

private:
/// 不允许new,delete,&操作符
void *operator new (size_t nbytes);
void operator delete (void *);
void operator & ();

/// 保护<Future>.的操作
typedef ACE_Future_Rep<T> FUTURE_REP;
FUTURE_REP *future_rep_;
};

template <class T>
class ACE_Future_Observer
{
public:
virtual ~ACE_Future_Observer (void);
virtual void update (const ACE_Future<T> &future) = 0;
ACE_ALLOC_HOOK_DECLARE;
protected:
ACE_Future_Observer (void);
};

分配this->future_rep_,并将计数++
template <class T> void
ACE_Future<T>::operator = (const ACE_Future<T> &rhs)
{
ACE_Future<T> &r = (ACE_Future<T> &) rhs;
FUTURE_REP::assign (this->future_rep_,
FUTURE_REP::attach (r.future_rep_));
}

attach函数是将引用++
template <class T> ACE_Future_Rep<T> *
ACE_Future_Rep<T>::attach (ACE_Future_Rep<T>*& rep)
{
ACE_ASSERT (rep != 0);
// Use value_ready_mutex_ for both condition and ref count management
ACE_MT (ACE_Guard<ACE_Thread_Mutex> r_mon (rep->value_ready_mutex_));
++rep->ref_count_;
return rep;
}

指派ACE_Furture的成员ACE_Furture_Rep
template <class T> void
ACE_Future_Rep<T>::assign (ACE_Future_Rep<T>*& rep, ACE_Future_Rep<T>* new_rep)
{
ACE_ASSERT (rep != 0);
ACE_ASSERT (new_rep != 0);
// Use value_ready_mutex_ for both condition and ref count management
ACE_MT (ACE_GUARD (ACE_Thread_Mutex, r_mon, rep->value_ready_mutex_));

ACE_Future_Rep<T>* old = rep;
rep = new_rep;

// detached old last for exception safety
if (old->ref_count_-- == 0)
{
ACE_MT (r_mon.release ());
// We do not need the lock when deleting the representation.
// There should be no side effects from deleting rep and we don
// not want to release a deleted mutex.
delete old;
}
}

ACE_Future函数set间接调用成员的ACE_Future_Rep的set函数
template <class T> int
ACE_Future<T>::set (const T &r)
{
// 给结果指针到ACE_Future_Rep.
return this->future_rep_->set (r, *this);
}

template <class T> int
ACE_Future_Rep<T>::set (const T &r,
ACE_Future<T> &caller)
{
// 如果值已经产生,忽略它.
if (this->value_ == 0)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex,
ace_mon,
this->value_ready_mutex_,
-1));

// 不是的话则new一个结果值,注意这里使用双检查锁模式避免多次分配
if (this->value_ == 0) //还没有值,产生
{
ACE_NEW_RETURN (this->value_,
T (r),
-1);

// Remove and notify all subscribed observers.
ACE_TYPENAME OBSERVER_COLLECTION::iterator iterator =
this->observer_collection_.begin ();

ACE_TYPENAME OBSERVER_COLLECTION::iterator end =
this->observer_collection_.end ();

while (iterator != end)
{
OBSERVER *observer = *iterator++;
observer->update (caller);
}
// 对所以等待的线程广播信号
return this->value_ready_.broadcast ();
}
}
return 0;
}

检查结果值
template <class T> int
ACE_Future<T>::ready (void) const
{
// We're ready if the ACE_Future_rep is ready...
return this->future_rep_->ready ();
}

简单判断value指针是否未空返回
template <class T> int
ACE_Future_Rep<T>::ready (void) const
{
return this->value_ != 0;
}

获取结果值,更新所以的ACE_Future_Observer
template <class T> int
ACE_Future_Rep<T>::get (T &value,
ACE_Time_Value *tv) const
{
// If the value is already produced, return it.
if (this->value_ == 0)
{
ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon,
ACE_const_cast (ACE_Thread_Mutex &, this->value_ready_mutex_),
-1));
// If the value is not yet defined we must block until the
// producer writes to it.

while (this->value_ == 0)
// Perform a timed wait.
if ((ACE_const_cast (ACE_Condition_Thread_Mutex &, this->value_ready_)).wait (tv) == -1)
return -1;

// Destructor releases the lock.
}

value = *this->value_;
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息