您的位置:首页 > 理论基础 > 计算机网络

一个基于libcurl的多线程HTTP 请求服务模板

2013-07-29 20:43 405 查看
最近在项目中,多次要去第三方验证,例如苹果的充值服务器验证等等,所以封装了一个模板。使用的是libcurl.

先看看代码。基础思想是,一个HttpService维护任务队列,完成任务队列,以及一组HttpThread队列, HttpThread负责HTTP请求。

工作线程的定义

template<class Task>
class HttpThread
{
public:
HttpThread(const char* url, HttpServiceBase<Task>* father);
virtual ~HttpThread();
void Run();
void ParseData(char* p, size_t len);
bool IsDone();

static size_t HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata);
protected:
private:
CURL* _handle;
boost::thread* _thread;
Task* _cur_task;
HttpServiceBase<Task>* _father;
bool _stop;
};


线程被封装到HttpThread模板类

构造函数里会初始化一些libcurl,并且启用新的工作线程。

template<class Task>
HttpThread<Task>::HttpThread(const char* url, HttpServiceBase<Task>* father)
{
_father = father;
_cur_task = NULL;
_stop = false;
_handle = curl_easy_init();
curl_slist *plist = curl_slist_append(NULL, "Content-Type: application/x-www-form-urlencoded; charset=UTF-8");
curl_easy_setopt(_handle, CURLOPT_URL,              url);
curl_easy_setopt(_handle, CURLOPT_HEADER,           0);
#ifdef DEBUG
curl_easy_setopt(_handle, CURLOPT_VERBOSE,          1);
#else
curl_easy_setopt(_handle, CURLOPT_VERBOSE,          0);
#endif // _DEBUG

curl_easy_setopt(_handle, CURLOPT_POST,             1);
curl_easy_setopt(_handle, CURLOPT_WRITEDATA,        1);
curl_easy_setopt(_handle, CURLOPT_TIMEOUT,          0);
curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION,    &HttpThread<Task>::HttpThreadParseData);
curl_easy_setopt(_handle, CURLOPT_WRITEDATA,        this);
curl_easy_setopt(_handle, CURLOPT_HTTPHEADER,       "Expect:");
curl_easy_setopt(_handle, CURLOPT_HTTPHEADER,       plist);
_thread = new boost::thread(boost::bind(&HttpThread<Task>::Run, this));
}


设置一些curl 的基础数据,

curl_easy_setopt(_handle, CURLOPT_URL,              url);
这个指定服务器的URL

curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION,    &HttpThread<Task>::HttpThreadParseData);


设定HTTP SERVER 有数据返回的回调HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata)函数,

我们通过

curl_easy_setopt(_handle, CURLOPT_WRITEDATA,        this);
这个选项,把工作线程对应的指针绑定进去,在回调的时候回返回这个指针。

然后创建了一个线程。

_thread = new boost::thread(boost::bind(&HttpThread<Task>::Run, this));
}
线程绑定了对象的run 函数。
template<class Task>
void HttpThread<Task>::Run()
{
while(!_stop)
{
if (_handle)
{
if (NULL == _cur_task )
{   _cur_task = _father->GetTask();}

try
{
if (_cur_task)
{
CURLcode code = CURLE_OK;
code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDSIZE, _cur_task->GetLen());
assert (code == CURLE_OK);
code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDS, _cur_task->GetData());
assert (code == CURLE_OK);
unsigned int _tem = 0;
code =  curl_easy_getinfo(_handle, CURLINFO_RESPONSE_CODE, &_tem);
assert (code == CURLE_OK);
code =  curl_easy_perform(_handle);
if (code != CURLE_OK)
{
if (_cur_task)
{
_cur_task->CheckException();
_father->AddDone(_cur_task);
_cur_task = NULL;
}
}
}
}
catch(...)
{
if (_cur_task)
{
_cur_task->CheckException();
_father->AddDone(_cur_task);
_cur_task = NULL;
}
}

}

#ifdef _WIN32
Sleep(1);
#else
usleep(1000);
#endif
}

}


if (NULL == _cur_task )
{   _cur_task = _father->GetTask();}


如果当前任务是空, 就去主线程抓取一个任务。

我们通过下面2段来设置发送HTTP 请求的数据

CURLcode code = CURLE_OK;
code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDSIZE, _cur_task->GetLen());
assert (code == CURLE_OK);
code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDS, _cur_task->GetData());


 _cur_task->GetLen() 这个接口抽取需要发送数据的长度, _cur_task->GetData()指定发送数据,

code =  curl_easy_perform(_handle);
用函数真正执行数据发送,如果出现异常

_cur_task->CheckException();
_father->AddDone(_cur_task);
_cur_task = NULL;


如果没有异常,然后等待服务器返回数据 回调我们前面设置的&HttpThread<Task>::HttpThreadParseData函数。

template<class Task>
size_t HttpThread<Task>::HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata)
{
HttpThread<Task>* p = static_cast<HttpThread<Task>*>(userdata);
if (p)
{
p->ParseData(ptr, size * nmemb);
}
return size * nmemb;
}


template<class Task>
void HttpThread<Task>::ParseData(char* p, size_t len)
{
if (_cur_task)
{
_cur_task->CheckData(p, len);
_father->AddDone(_cur_task);
_cur_task = NULL;
}
}


这里调用到了工作线程的方法,我们调用Task:: CheckData(p, len);来对任务进行验证处理。

_father->AddDone(_cur_task); . 然后把处理好的任务返回主线程的_done_list列表,然后清空当前任务.

上面就是整个工作线程的流程.

看看HttpService 的定义

template<class Task>
class HttpService : public RunObject, public HttpServiceBase<Task>
{
public:
bool Init(u32 count, const char* url);
void Run(u32 diff);
void ShutDown();

void AddTask(Task* t);
void AddDone(Task* t);
void checkStop();
void DoneTask( Task* t);
Task* GetTask();
protected:
bool AddThread(u32 nCount);
private:
std::list<HttpThread<Task>*>  _threads;
std::list<Task*>    _task_list;
std::list<Task*>    _done_list;
std::string         _url;
boost::mutex        _mutex_done;
boost::mutex        _mutex_task;
};
其中bool Init(u32 count, const char* url)负责初始化主线程的任务,参数是工作线程数和URL。libcur的全局初始化。

void Run(u32 diff);负责从已经处理好的任务队列在应用中在次处理。

void ShutDown();负责关闭服务。释放对应的资源。

void AddTask(Task* t); 添加新的任务到工作队列 _task_list

void AddDone(Task* t);工作线程调用,负责把处理好的任务,丢进这_done_list

void checkStop(); 这个是检测应用在需要关闭的时候,子线程的资源的释放等等

void DoneTask( Task* t); 主线程对_done_list里面处理好的任务的再次处理。这个函数回调用Task的方法

Task* GetTask(); 工作线程获取从_task_list提取一个新的任务。

主线程会每个循环去检测_done_list, 然后对每个工作线程处理好的任务,进行在分配

template<class Task>
void HttpService<Task>::Run(u32 diff)
{
if (isObjState(_run_obj_) || isObjState(_wait_stop_))
{
std::list<Task*> done_list;
{
boost::mutex::scoped_lock lock(_mutex_done);
done_list = _done_list;
_done_list.clear();
}

if (done_list.size())
{
std::list<Task*>::iterator it = done_list.begin();
std::list<Task*>::iterator itend = done_list.end();
for (; it != itend; ++it)
{   DoneTask( *it );}

done_list.clear();
}
}

checkStop();
}


然后对每个已经完成的任务,调用

template<class Task>
void HttpService<Task>::DoneTask( Task* t)
{
t->DoneCall();
}


方法,对已经通过工作线程验证过的任务,交给应用去处理。

上面我们调用了Task类一下接口,

//void CheckData(char*p , size_t len);      parse from http server
//void CheckException();                    exception
//void DoneCall();                          task done call
//size_t GetLen();                          buffer send to http server
//char* GetData();                          buffer len


另外RunObject 是一个状态控制类。主要是控制HttpService的工作状态。

template<class Task>
void HttpService<Task>::checkStop()
{
if (isObjState(_wait_stop_))
{
{
boost::mutex::scoped_lock lock(_mutex_task);
if (_task_list.size() > 0)
{   return ;}
}

{
boost::mutex::scoped_lock lock(_mutex_done);
if (_done_list.size() > 0)
{   return ;}
}

std::list<HttpThread<Task>*>::iterator it2 = _threads.begin();
std::list<HttpThread<Task>*>::iterator it2end = _threads.end();
for (it2; it2 != it2end; ++it2)
{
HttpThread<Task>* pkThread = *it2;
if (pkThread)
{
if (!pkThread->IsDone())
{   return ;}
}
}

setObjState(_stop_obj_ );
}
}
当检测到状态是 _wait_stop_等待停止的时候, 我们会检测当前的_task_list 列表, _done_list列表,以及正在处理的任务,如果所有的都处理完成才会真正的去关闭服务。

否则会一直_wait_stop_状态,等待所有任务完成,不过在_wait_stop_的状态的时候, 是无法添加新的任务进列表。可以看

template<class Task>
void HttpService<Task>::AddTask(Task* t)
{
if (isObjState(_run_obj_))
{
boost::mutex::scoped_lock lock(_mutex_task);
_task_list.push_back(t);
}
}


这样我们保证所有的任务都处理完毕。

附上代码

run_object.h

#ifndef __run_object_h__
#define __run_object_h__

struct RunObject
{
enum
{
_init_obj_,		//准备阶段
_run_obj_,		//运行阶段
_wait_stop_,	//准备停止阶段
_stop_obj_,		//停止
};

RunObject()
{
_obj_state = _init_obj_;
}

void setObjState( u8 state)
{
_obj_state = state;
}
virtual void checkStop() = 0;
bool isObjStop()
{
return _obj_state == _stop_obj_;
}
bool isObjState( u8 state)
{
return _obj_state == state;
}
u8 _obj_state;
};

#endif


http_task.h

#ifndef ___http_task_h__
#define ___http_task_h__
#include "G_Common.h"
#include "boost/bind/bind.hpp"
#include "boost/thread/thread.hpp"
#include "curl/curl.h"

// task api
//void CheckData(char*p , size_t len); parse from http server
//void CheckException(); exception
//void DoneCall(); task done call
//size_t GetLen(); buffer send to http server
//char* GetData(); buffer len

template<class Task>
class HttpServiceBase
{
public:
virtual void AddDone(Task* t) = 0;
virtual Task* GetTask() = 0;
};

template<class Task> class HttpThread { public: HttpThread(const char* url, HttpServiceBase<Task>* father); virtual ~HttpThread(); void Run(); void ParseData(char* p, size_t len); bool IsDone(); static size_t HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata); protected: private: CURL* _handle; boost::thread* _thread; Task* _cur_task; HttpServiceBase<Task>* _father; bool _stop; };

template<class Task> class HttpService : public RunObject, public HttpServiceBase<Task> { public: bool Init(u32 count, const char* url); void Run(u32 diff); void ShutDown(); void AddTask(Task* t); void AddDone(Task* t); void checkStop(); void DoneTask( Task* t); Task* GetTask(); protected: bool AddThread(u32 nCount); private: std::list<HttpThread<Task>*> _threads; std::list<Task*> _task_list; std::list<Task*> _done_list; std::string _url; boost::mutex _mutex_done; boost::mutex _mutex_task; };

//////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////
template<class Task> size_t HttpThread<Task>::HttpThreadParseData( char *ptr, size_t size, size_t nmemb, void *userdata) { HttpThread<Task>* p = static_cast<HttpThread<Task>*>(userdata); if (p) { p->ParseData(ptr, size * nmemb); } return size * nmemb; }

template<class Task> void HttpThread<Task>::ParseData(char* p, size_t len) { if (_cur_task) { _cur_task->CheckData(p, len); _father->AddDone(_cur_task); _cur_task = NULL; } }
template<class Task>
HttpThread<Task>::HttpThread(const char* url, HttpServiceBase<Task>* father)
{
_father = father;
_cur_task = NULL;
_stop = false;
_handle = NULL;
_handle = NULL;
_handle = curl_easy_init();
curl_slist *plist = curl_slist_append(NULL, "Content-Type: application/x-www-form-urlencoded; charset=UTF-8");
curl_easy_setopt(_handle, CURLOPT_URL, url);
curl_easy_setopt(_handle, CURLOPT_HEADER, 0);
#ifdef DEBUG
curl_easy_setopt(_handle, CURLOPT_VERBOSE, 1);
#else
curl_easy_setopt(_handle, CURLOPT_VERBOSE, 0);
#endif // _DEBUG

curl_easy_setopt(_handle, CURLOPT_POST, 1);
curl_easy_setopt(_handle, CURLOPT_WRITEDATA, 1);
curl_easy_setopt(_handle, CURLOPT_TIMEOUT, 0);
curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION, &HttpThread<Task>::HttpThreadParseData);
curl_easy_setopt(_handle, CURLOPT_WRITEDATA, this);
curl_easy_setopt(_handle, CURLOPT_HTTPHEADER, "Expect:");
curl_easy_setopt(_handle, CURLOPT_HTTPHEADER, plist);
_thread = new boost::thread(boost::bind(&HttpThread<Task>::Run, this));
}

template<class Task>
HttpThread<Task>::~HttpThread()
{
curl_easy_cleanup( _handle );
_handle = NULL ;
delete _thread;
}

template<class Task>
bool HttpThread<Task>::IsDone()
{
boost::mutex _mutex;
boost::mutex::scoped_lock lock(_mutex);
if ( NULL == _cur_task )
{
_stop = true;
return true ;
}else
{ return false ;}
}

template<class Task> void HttpThread<Task>::Run() { while(!_stop) { if (_handle) { if (NULL == _cur_task ) { _cur_task = _father->GetTask();} try { if (_cur_task) { CURLcode code = CURLE_OK; code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDSIZE, _cur_task->GetLen()); assert (code == CURLE_OK); code = curl_easy_setopt(_handle, CURLOPT_POSTFIELDS, _cur_task->GetData()); assert (code == CURLE_OK); unsigned int _tem = 0; code = curl_easy_getinfo(_handle, CURLINFO_RESPONSE_CODE, &_tem); assert (code == CURLE_OK); code = curl_easy_perform(_handle); if (code != CURLE_OK) { if (_cur_task) { _cur_task->CheckException(); _father->AddDone(_cur_task); _cur_task = NULL; } } } } catch(...) { if (_cur_task) { _cur_task->CheckException(); _father->AddDone(_cur_task); _cur_task = NULL; } } } #ifdef _WIN32 Sleep(1); #else usleep(1000); #endif } }
//////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////
template<class Task>
bool HttpService<Task>::Init(u32 count, const char* url)
{
_threads.clear();
_task_list.clear();
_done_list.clear();
_url = url;
curl_global_init(CURL_GLOBAL_ALL);
setObjState(_run_obj_);
return AddThread(count);
}

template<class Task>
bool HttpService<Task>::AddThread(u32 nCount)
{
for (u16 i = 0 ; i < nCount; ++i)
{
HttpThread<Task>* p = new HttpThread<Task>(_url.c_str(), this) ;
if (p)
{ _threads.push_back( p );}
else
{ return false;}
}
return true;
}
template<class Task> void HttpService<Task>::Run(u32 diff) { if (isObjState(_run_obj_) || isObjState(_wait_stop_)) { std::list<Task*> done_list; { boost::mutex::scoped_lock lock(_mutex_done); done_list = _done_list; _done_list.clear(); } if (done_list.size()) { std::list<Task*>::iterator it = done_list.begin(); std::list<Task*>::iterator itend = done_list.end(); for (; it != itend; ++it) { DoneTask( *it );} done_list.clear(); } } checkStop(); }
template<class Task>
void HttpService<Task>::ShutDown()
{
std::list<HttpThread<Task>*>::iterator it = _threads.begin();
std::list<HttpThread<Task>*>::iterator itend = _threads.end();
for (it; it != itend; ++it)
{
HttpThread<Task>* pkThread = *it;
if (pkThread)
{ delete pkThread ;}
}
_threads.clear();
curl_global_cleanup();
}
template<class Task> void HttpService<Task>::AddTask(Task* t) { if (isObjState(_run_obj_)) { boost::mutex::scoped_lock lock(_mutex_task); _task_list.push_back(t); } }
template<class Task>
void HttpService<Task>::AddDone(Task* t)
{
boost::mutex::scoped_lock lock(_mutex_done);
_done_list.push_back(t);
}
template<class Task>
Task* HttpService<Task>::GetTask()
{
boost::mutex::scoped_lock lock(_mutex_task);
if (_task_list.size() > 0)
{
Task* t = _task_list.front();
_task_list.pop_front();
return t ;
}else
{
return NULL ;
}
}
template<class Task> void HttpService<Task>::DoneTask( Task* t) { t->DoneCall(); }
template<class Task> void HttpService<Task>::checkStop() { if (isObjState(_wait_stop_)) { { boost::mutex::scoped_lock lock(_mutex_task); if (_task_list.size() > 0) { return ;} } { boost::mutex::scoped_lock lock(_mutex_done); if (_done_list.size() > 0) { return ;} } std::list<HttpThread<Task>*>::iterator it2 = _threads.begin(); std::list<HttpThread<Task>*>::iterator it2end = _threads.end(); for (it2; it2 != it2end; ++it2) { HttpThread<Task>* pkThread = *it2; if (pkThread) { if (!pkThread->IsDone()) { return ;} } } setObjState(_stop_obj_ ); } }
#endif
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: